generated from CivicDataLab/open-publising-data-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_demon.py
67 lines (55 loc) · 1.83 KB
/
worker_demon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
import os
import sys
import django
import pika
# from pipeline.model_to_pipeline import *
import log_utils
print('inside ----')
try:
from pipeline.model_to_pipeline import *
pass
except Exception as e:
# raise e
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
print('exception ----', e)
# pipeline_object = Pipeline.objects.get(pk=196)
# print(pipeline_object.pipeline_name)
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', heartbeat=65535))
channel = connection.channel()
channel.queue_declare(queue='pipeline_ui_queue')
def callback(ch, method, properties, body):
body_json = json.loads(body.decode())
print("Recieved..", body_json)
p_id = body_json['p_id']
logger = log_utils.get_logger_for_existing_file(p_id)
data_url = body_json['data_path']
project = body_json['project']
try:
task_executor(p_id, data_url, project)
except Exception as e:
logger.error(
f"""ERROR: Worker demon failed with an error {str(e)}""")
print(e)
# print("got temp_file name as ", body_json['temp_file_name'])
# os.remove('./'+temp_file_name)
print(" [x] Done")
logger.info(f"""INFO: Worker demon finished successfully """)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='pipeline_ui_queue',
on_message_callback=callback)
channel.start_consuming()
print("inside demon main...")
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)