diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 217c26d..1d9b8c2 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -12,7 +12,7 @@ sq = importlib.import_module("_synthesis_queue_RM") de = importlib.import_module("_data_export") da = importlib.import_module("_data_analysis") -pc = importlib.import_module("_pdf_calculator") +# pc = importlib.import_module("_pdf_calculator") # from diffpy.pdfgetx import PDFConfig # gp = importlib.import_module("_get_pdf") @@ -33,7 +33,8 @@ def _qserver_inputs(): 'name_by_prefix', 'prefix', 'pump_list', 'precursor_list', 'syringe_mater_list', 'syringe_list', 'target_vol_list', 'sample', - 'wait_dilute', 'mixer', 'wash_tube', 'resident_t_ratio', + 'wait_dilute', 'if_wash', 'wash_loop', 'wash_sapphire', + 'mixer', 'wash_tube', 'resident_t_ratio', 'rate_unit', 'uvvis_config', 'perkin_config', 'auto_set_target_list', 'set_target_list', 'infuse_rates', ] @@ -736,7 +737,7 @@ def macro_10_good_bad(self, stream_name): distance=self.inputs.distance[0], height=self.inputs.height[0], dummy_test=self.inputs.dummy_kafka[0], - percent_range=[30, 100]) + percent_range=[40, 100]) self.PL_goodbad = {} PL_goodbad = { 'wavelength':np.asarray(x0), 'percentile_mean':np.asarray(y0), @@ -765,7 +766,7 @@ def macro_11_absorbance(self, stream_name): ## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85] abs_per = da.percentile_abs(self.qepro_dic['QEPro_x_axis'], self.qepro_dic['QEPro_output'], - percent_range=[15, 85]) + percent_range=[5, 65]) print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') # abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) diff --git a/scripts/_synthesis_queue_RM.py b/scripts/_synthesis_queue_RM.py index 7422a60..1cd9b1c 100644 --- a/scripts/_synthesis_queue_RM.py +++ b/scripts/_synthesis_queue_RM.py @@ -33,7 +33,10 @@ def synthesis_queue_xlsx(parameter_obj): resident_t_ratio = qsp.resident_t_ratio prefix = qsp.prefix sample = qsp.sample - wash_tube = qsp.wash_tube + wait_dilute = qsp.wait_dilute + if_wash = qsp.if_wash + wash_loop = qsp.wash_loop + wash_sapphire = qsp.wash_sapphire rate_unit = qsp.rate_unit[0] name_by_prefix = qsp.name_by_prefix[0] det2 = qsp.uvvis_config[0] @@ -72,19 +75,20 @@ def synthesis_queue_xlsx(parameter_obj): else: set_target_list = set_target_list.tolist() - + num_pumps = int(len(wash_loop)/3) # 0. stop infuese for all pumps - flowplan = BPlan('stop_group', pump_list + [wash_tube[2], wash_tube[5]]) + flowplan = BPlan('stop_group', pump_list + [wash_loop[1+i*3] for i in range(num_pumps)]) RM.item_add(flowplan, pos=pos) - + for i in range(len(rate_list)): # for i in range(2): ## 1. Set i infuese rates + rate_list2 = rate_list[i][:-1] + [60] for sl, pl, ir, tvl, stl, sml in zip( syringe_list, pump_list, - rate_list[i], + rate_list2, target_vol_list, set_target_list[i], syringe_mater_list @@ -92,7 +96,6 @@ def synthesis_queue_xlsx(parameter_obj): # ir = float(ir) # stl = int(stl) - flowplan = BPlan('set_group_infuse2', [sl], [pl], rate_list = [ir], target_vol_list = [tvl], @@ -102,13 +105,14 @@ def synthesis_queue_xlsx(parameter_obj): RM.item_add(flowplan, pos=pos) - ## 2. Start infuese - if precursor_list[-1] == 'Toluene': - flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1]) + # ## 2. Start infuese + # if precursor_list[-1] == 'Toluene': + # flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1]) - else: - flowplan = BPlan('start_group_infuse', pump_list, rate_list[i]) + # else: + # flowplan = BPlan('start_group_infuse', pump_list, rate_list[i]) + flowplan = BPlan('start_group_infuse', pump_list, rate_list[i]) RM.item_add(flowplan, pos=pos) @@ -145,7 +149,13 @@ def synthesis_queue_xlsx(parameter_obj): ## 3.1 Wait for 30 secpnds for post dilute if precursor_list[-1] == 'Toluene': - flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]]) + # flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]]) + flowplan = BPlan('set_group_infuse2', [syringe_list[-1]], [pump_list[-1]], + rate_list = [rate_list[i][-1]], + target_vol_list = [target_vol_list[-1]], + set_target_list = [set_target_list[i][-1]], + syringe_mater_list = [syringe_mater_list[-1]], + rate_unit = rate_unit) RM.item_add(flowplan, pos=pos) restplan = BPlan('sleep_sec_q', qsp.wait_dilute[0]) @@ -222,12 +232,12 @@ def synthesis_queue_xlsx(parameter_obj): ###### Kafka analyze data here. ####### ## 7. Wash the loop and mixer - if wash_tube[0] == 0: - wash_tube_queue2(pump_list, wash_tube, rate_unit, + if if_wash[0] == 1: + wash_tube_queue2(pump_list, if_wash, wash_loop, rate_unit, pos=[pos,pos,pos,pos,pos], zmq_control_addr=zmq_control_addr, zmq_info_addr=zmq_info_addr) - elif wash_tube[0] == 1: + elif wash_tube[0] == 0: inst1 = BInst("queue_stop") RM.item_add(inst1, pos='front') @@ -716,7 +726,7 @@ def wash_tube_queue(pump_list, wash_tube, rate_unit, ## wash loop with two solvents -def wash_tube_queue2(pump_list, wash_tube, rate_unit, +def wash_tube_queue2(pump_list, if_wash, wash_loop, rate_unit, pos=[0,1,2,3,4], zmq_control_addr='tcp://localhost:60615', zmq_info_addr='tcp://localhost:60625'): @@ -727,10 +737,10 @@ def wash_tube_queue2(pump_list, wash_tube, rate_unit, flowplan = BPlan('stop_group', pump_list) RM.item_add(flowplan, pos=pos[0]) - + num_pumps = int(len(wash_loop)/3) ### Set up washing tube/loop - flowplan = BPlan('set_group_infuse2', [wash_tube[1], wash_tube[4]], [wash_tube[2], wash_tube[5]], - rate_list=[wash_tube[3], wash_tube[6]], + flowplan = BPlan('set_group_infuse2', [wash_loop[0+i*3] for i in range(num_pumps)], [wash_loop[1+i*3] for i in range(num_pumps)], + rate_list=[wash_loop[2+i*3] for i in range(num_pumps)], target_vol_list=['30 ml', '15 ml'], set_target_list=[False, False], syringe_mater_list = ['steel', 'plastic_BD'], @@ -739,18 +749,18 @@ def wash_tube_queue2(pump_list, wash_tube, rate_unit, ### Start washing tube/loop - flowplan = BPlan('start_group_infuse', [wash_tube[2], wash_tube[5]], [wash_tube[3], wash_tube[6]]) + flowplan = BPlan('start_group_infuse', [wash_loop[1+i*3] for i in range(num_pumps)], [wash_loop[2+i*3] for i in range(num_pumps)]) RM.item_add(flowplan, pos=pos[2]) ### Wash loop/tube for xxx seconds - restplan = BPlan('sleep_sec_q', wash_tube[7]) + restplan = BPlan('sleep_sec_q', if_wash[1]) RM.item_add(restplan, pos=pos[3]) ### Stop washing - flowplan = BPlan('stop_group', [wash_tube[2], wash_tube[5]]) + flowplan = BPlan('stop_group', [wash_loop[1+i*3] for i in range(num_pumps)]) RM.item_add(flowplan, pos=pos[4]) diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index 36cf5fb..a9f7ac3 100644 Binary files a/scripts/inputs_qserver_kafka_v2.xlsx and b/scripts/inputs_qserver_kafka_v2.xlsx differ diff --git a/scripts/kafka_consumer_iterate_1LL09_v2.py b/scripts/kafka_consumer_iterate_1LL09_v2.py index 99b487f..eb974c1 100644 --- a/scripts/kafka_consumer_iterate_1LL09_v2.py +++ b/scripts/kafka_consumer_iterate_1LL09_v2.py @@ -144,39 +144,10 @@ def print_message(consumer, doctype, doc): ################################################### if name == 'start': print( - f"{datetime.datetime.now().isoformat()} documents {name}\n" + f"\n\n{datetime.datetime.now().isoformat()} documents {name}\n" f"document keys: {list(message.keys())}") - - if 'uid' in message.keys(): - print(f"uid: {message['uid']}") - if 'plan_name' in message.keys(): - print(f"plan name: {message['plan_name']}") - if 'detectors' in message.keys(): - print(f"detectors: {message['detectors']}") - if 'pumps' in message.keys(): - print(f"pumps: {message['pumps']}") - if 'detectors' in message.keys(): - print(f"detectors: {message['detectors']}") - if 'uvvis' in message.keys() and message['plan_name']!='count': - print(f"uvvis mode:\n" - f" integration time: {message['uvvis'][0]} ms\n" - f" num spectra averaged: {message['uvvis'][1]}\n" - f" buffer capacity: {message['uvvis'][2]}" - ) - elif 'uvvis' in message.keys() and message['plan_name']=='count': - print(f"uvvis mode:\n" - f" spectrum type: {message['uvvis'][0]}\n" - f" integration time: {message['uvvis'][2]} ms\n" - f" num spectra averaged: {message['uvvis'][3]}\n" - f" buffer capacity: {message['uvvis'][4]}" - ) - if 'mixer' in message.keys(): - print(f"mixer: {message['mixer']}") - if 'sample_type' in message.keys(): - print(f"sample type: {message['sample_type']}") - - ## Reset kafka_process.uid to an empty list - kafka_process.uid = [] + + kafka_process.macro_00_print_start(message) ######### While document name == 'stop' ########### @@ -195,7 +166,6 @@ def print_message(consumer, doctype, doc): print(f"{datetime.datetime.now().isoformat()} documents {name}\n" f"contents: {pprint.pformat(message)}" ) - # num_events = len(message['num_events']) ## wait 2 seconds for databroker to save data time.sleep(2)