diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 1c8cdd8..c263157 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -12,17 +12,20 @@ # import _get_pdf as gp # import torch -from prepare_agent_pdf import build_agen +# from prepare_agent_pdf import build_agen # from diffpy.pdfgetx import PDFConfig -from tiled.client import from_uri +# from tiled.client import from_uri def _qserver_inputs(): qserver_list=[ - 'dummy_qserver', 'name_by_prefix', 'prefix', - 'pump_list', 'precursor_list', 'syringe_mater_list', 'syringe_list', - 'target_vol_list', 'sample', 'mixer', 'wash_tube', 'resident_t_ratio', + 'zmq_control_addr', 'zmq_info_addr', + 'dummy_qserver', 'is_iteration', 'pos', + 'name_by_prefix', 'prefix', 'pump_list', 'precursor_list', + 'syringe_mater_list', 'syringe_list', 'target_vol_list', + 'sample', + 'wait_dilute', 'mixer', 'wash_tube', 'resident_t_ratio', 'rate_unit', 'uvvis_config', 'perkin_config', 'set_target_list', 'infuse_rates', ] @@ -71,34 +74,38 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): ## Every attribute in self.inputs is a list!!! self.inputs = dic_to_inputs(self.print_dic, self.parameters_list) - ## Append agent in the list of self.inputs.agent - if self.inputs.agent==[]: - self.inputs.agent.append( - build_agen( - peak_target=self.inputs.peak_target[0], - agent_data_path=self.inputs.agent_data_path[0]) - ) - - ## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox - ## so, turn uri into client and append it in self.inputs.sandbox_tiled_client - if type(self.inputs.sandbox_tiled_client[0]) is str: - self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0])) - - - ## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn - # fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn'] - for fn in self.inputs.fn_TBD: - - path = getattr(self.inputs, fn)[0] - if path in self.parameters_list: - path = getattr(self.inputs, path)[0] - - ff = getattr(self.inputs, fn)[1] - - fn_glob = glob.glob(os.path.join(path, ff)) - - for i in fn_glob: - getattr(self.inputs, fn).append(i) + try: + # ## Append agent in the list of self.inputs.agent + # if self.inputs.agent==[]: + # self.inputs.agent.append( + # build_agen( + # peak_target=self.inputs.peak_target[0], + # agent_data_path=self.inputs.agent_data_path[0]) + # ) + + # ## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox + # ## so, turn uri into client and append it in self.inputs.sandbox_tiled_client + # if type(self.inputs.sandbox_tiled_client[0]) is str: + # self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0])) + + + ## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn + # fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn'] + for fn in self.inputs.fn_TBD: + + path = getattr(self.inputs, fn)[0] + if path in self.parameters_list: + path = getattr(self.inputs, path)[0] + + ff = getattr(self.inputs, fn)[1] + + fn_glob = glob.glob(os.path.join(path, ff)) + + for i in fn_glob: + getattr(self.inputs, fn).append(i) + + except AttributeError: + pass diff --git a/scripts/_synthesis_queue_RM.py b/scripts/_synthesis_queue_RM.py index e44fe22..819707e 100644 --- a/scripts/_synthesis_queue_RM.py +++ b/scripts/_synthesis_queue_RM.py @@ -5,6 +5,235 @@ from bluesky_queueserver_api.zmq import REManagerAPI from bluesky_queueserver_api import BPlan, BInst from ophyd.sim import det, noisy_det +from _LDRD_Kafka import xlsx_to_inputs + +## Pass qsever parameters by xlsx_to_inputs +## Arrange tasks of for PQDs synthesis +def synthesis_queue_xlsx(parameter_obj): + """ + Pass qsever parameters by xlsx_to_inputs + Arrange tasks of for PQDs synthesis + + Args: + parameter_obj (xlsx_to_inputs): parameters passing to qserver + (example: pump_list = parameter_obj.inputs.pump_list) + """ + + qsp = parameter_obj.inputs + + syringe_list = qsp.syringe_list + pump_list = qsp.pump_list + set_target_list = qsp.set_target_list + target_vol_list = qsp.target_vol_list + rate_list = qsp.infuse_rates + syringe_mater_list = qsp.syringe_mater_list + precursor_list = qsp.precursor_list + mixer = qsp.mixer + resident_t_ratio = qsp.resident_t_ratio + prefix = qsp.prefix + sample = qsp.sample + wash_tube = qsp.wash_tube + rate_unit = qsp.rate_unit[0] + name_by_prefix = qsp.name_by_prefix[0] + det2 = qsp.uvvis_config[0] + num_abs = qsp.uvvis_config[1] + num_flu = qsp.uvvis_config[2] + det1 = qsp.perkin_config[0] + det1_frame_rate = qsp.perkin_config[1] + det1_time = qsp.perkin_config[2] + pos = qsp.pos[0] + dummy_qserver = qsp.dummy_qserver[0] + is_iteration = qsp.is_iteration[0] + zmq_control_addr = qsp.zmq_control_addr[0] + zmq_info_addr = qsp.zmq_info_addr[0] + + + RM = REManagerAPI(zmq_control_addr=zmq_control_addr, zmq_info_addr=zmq_info_addr) + + if name_by_prefix: + sample = de._auto_name_sample(rate_list, prefix=prefix) + + rate_list = np.asarray(rate_list, dtype=np.float32) + if len(rate_list.shape) == 1: + rate_list = rate_list.reshape(1, rate_list.shape[0]) + rate_list = rate_list.tolist() + else: + rate_list = rate_list.tolist() + + set_target_list = np.asarray(set_target_list, dtype=np.int8) + if len(set_target_list.shape) == 1: + set_target_list = set_target_list.reshape(1, set_target_list.shape[0]) + set_target_list = set_target_list.tolist() + else: + set_target_list = set_target_list.tolist() + + + # 0. stop infuese for all pumps + flowplan = BPlan('stop_group', pump_list + [wash_tube[2], wash_tube[5]]) + RM.item_add(flowplan, pos=pos) + + + for i in range(len(rate_list)): + # for i in range(2): + ## 1. Set i infuese rates + for sl, pl, ir, tvl, stl, sml in zip( + syringe_list, + pump_list, + rate_list[i], + target_vol_list, + set_target_list[i], + syringe_mater_list + ): + + # ir = float(ir) + # stl = int(stl) + + flowplan = BPlan('set_group_infuse2', [sl], [pl], + rate_list = [ir], + target_vol_list = [tvl], + set_target_list = [stl], + syringe_mater_list = [sml], + rate_unit = rate_unit) + RM.item_add(flowplan, pos=pos) + + + ## 2. Start infuese + if precursor_list[-1] == 'Toluene': + flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1]) + + else: + flowplan = BPlan('start_group_infuse', pump_list, rate_list[i]) + + RM.item_add(flowplan, pos=pos) + + + ## 3. Wait for equilibrium + if len(mixer) == 1: + if precursor_list[-1] == 'Toluene': + mixer_pump_list = [[mixer[0], *pump_list[:2]]] + else: + mixer_pump_list = [[mixer[0], *pump_list]] + elif len(mixer) == 2: + if precursor_list[-1] == 'Toluene': + mixer_pump_list = [[mixer[0], *pump_list[:2]], [mixer[1], *pump_list[:-1]]] + else: + mixer_pump_list = [[mixer[0], *pump_list[:2]], [mixer[1], *pump_list]] + + if dummy_qserver: + restplan = BPlan('sleep_sec_q', qsp.dummy_qserver[1]) + RM.item_add(restplan, pos=pos) + + else: + if is_iteration: + rest_time = resident_t_ratio[-1] + + elif len(resident_t_ratio) == 1: + rest_time = resident_t_ratio[0] + elif len(resident_t_ratio) > 1 and i==0: + rest_time = resident_t_ratio[0] + elif len(resident_t_ratio) > 1 and i>0: + rest_time = resident_t_ratio[-1] + + restplan = BPlan('wait_equilibrium2', mixer_pump_list, ratio=rest_time) + RM.item_add(restplan, pos=pos) + + + ## 3.1 Wait for 30 secpnds for post dilute + if precursor_list[-1] == 'Toluene': + flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]]) + RM.item_add(flowplan, pos=pos) + + restplan = BPlan('sleep_sec_q', qsp.wait_dilute[0]) + RM.item_add(restplan, pos=pos) + + + + ## 4.0 Configure area detector in Qserver + if det1 == 'pe1c': + scanplan = BPlan('configure_area_det', + det='pe1c', + exposure=1, + acq_time=det1_frame_rate) + RM.item_add(scanplan, pos=pos) + + + ## 4-1. Take a fluorescence peak to check reaction + scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i], + spectrum_type='Corrected Sample', + correction_type='Dark', + pump_list=pump_list, + precursor_list=precursor_list, + mixer=mixer) + RM.item_add(scanplan, pos=pos) + + + # ## 4-2. Take a Absorption spectra to check reaction + # scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i], + # spectrum_type='Absorbtion', + # correction_type='Reference', + # pump_list=pump_list, + # precursor_list=precursor_list, + # mixer=mixer) + # RM.item_add(scanplan, pos=pos) + + + #### Kafka check data here. + + ## 5. Sleep for 5 seconds for Kafak to check good/bad data + restplan = BPlan('sleep_sec_q', 5) + RM.item_add(restplan, pos=pos) + + + # ## 6.0 Print global parameters in Qserver + # scanplan = BPlan('print_glbl_qserver') + # RM.item_add(scanplan, pos=pos) + + if det1 == 'pe1c': + ## 6.1 Configure area detector in Qserver + scanplan = BPlan('configure_area_det', + det=det1, + exposure=det1_time, + acq_time=det1_frame_rate) + RM.item_add(scanplan, pos=pos) + + + ## 6. Start xray_uvvis bundle plan to take real data ('pe1c' or 'det') + scanplan = BPlan('xray_uvvis_plan', det1, det2, + num_abs=num_abs, + num_flu=num_flu, + sample_type=sample[i], + spectrum_type='Absorbtion', + correction_type='Reference', + pump_list=pump_list, + precursor_list=precursor_list, + mixer=mixer, + dilute_pump=pump_list[-1]) + RM.item_add(scanplan, pos=pos) + + ## 6.1 sleep 20 seconds for stopping + restplan = BPlan('sleep_sec_q', qsp.wait_dilute[1]) + RM.item_add(restplan, pos=pos) + + + ###### Kafka analyze data here. ####### + + ## 7. Wash the loop and mixer + if wash_tube[0] == 0: + wash_tube_queue2(pump_list, wash_tube, rate_unit, + pos=[pos,pos,pos,pos,pos], + zmq_control_addr=zmq_control_addr, + zmq_info_addr=zmq_info_addr) + elif wash_tube[0] == 1: + inst1 = BInst("queue_stop") + RM.item_add(inst1, pos='front') + + + # 8. stop infuese for all pumps + flowplan = BPlan('stop_group', pump_list) + RM.item_add(flowplan, pos=pos) + + + ## Arrange tasks of for PQDs synthesis def synthesis_queue(