From ff2461df4bb1c2edd00ffc7b9d017084148c8c4e Mon Sep 17 00:00:00 2001 From: cheng-hung Date: Thu, 8 Aug 2024 19:09:44 -0400 Subject: [PATCH] Organize macros into kafka v2 --- scripts/_LDRD_Kafka.py | 795 ++++++++++++++++++--- scripts/inputs_qserver_kafka_v2.xlsx | Bin 24144 -> 24205 bytes scripts/kafka_consumer_iterate_1LL09_v2.py | 29 +- scripts/kafka_consumer_iterate_XPD_v2.py | 507 ++++--------- 4 files changed, 839 insertions(+), 492 deletions(-) diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 1cdbe95..9f5fba6 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -10,14 +10,15 @@ # import _synthesis_queue_RM as sq import _pdf_calculator as pc import _get_pdf as gp +from scipy import integrate -# import torch +import torch # from prepare_agent_pdf import build_agen from diffpy.pdfgetx import PDFConfig # from tiled.client import from_uri # from bluesky_queueserver_api.zmq import REManagerAPI -# from bluesky_queueserver_api import BPlan, BInst +from bluesky_queueserver_api import BPlan, BInst @@ -36,24 +37,38 @@ def _qserver_inputs(): return qserver_list -def _kafka_process(): - kafka_list=[ +def _kafka_inputs(): + inputs_list=[ 'dummy_kafka', 'csv_path', 'key_height', 'height', 'distance', 'PLQY', 'rate_label_dic_key', 'rate_label_dic_value', 'new_points_label', - 'use_good_bad', 'post_dilute', 'write_agent_data', 'agent_data_path', - 'USE_AGENT_iterate', 'peak_target', 'agent', + 'use_good_bad', 'post_dilute', 'fix_Br_ratio', 'write_agent_data', 'agent_data_path', + 'USE_AGENT_iterate', 'peak_target', 'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn', 'search_and_match', 'mystery_path', 'results_path', 'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn', - 'dummy_pdf', 'write_to_sandbox', 'sandbox_uri', - 'sandbox_tiled_client', 'tiled_client', + 'dummy_pdf', 'write_to_sandbox', 'sandbox_uri', 'beamline_acronym', 'fn_TBD', - 'entry', 'iq_data', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream', + ] + + return inputs_list + + + +def _kafka_process(): + process_list=[ + 'agent', 'sandbox_tiled_client', 'tiled_client', + 'entry', 'iq_data', 'stream_list', + 'uid', 'uid_catalog', 'uid_pdfstream', 'uid_sandbox', 'gr_data', 'pdf_property', 'gr_fitting', + 'qepro_dic', 'metadata_dic', 'sample_type', + 'PL_goodbad', 'PL_fitting', 'abs_data', 'abs_fitting', + 'plqy_dic', 'optical_property', 'agent_data', 'rate_label_dic', + 'good_data', 'bad_data', 'agent_iteration', 'finished', ] - return kafka_list + return process_list + @@ -76,24 +91,26 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): self.parameters_list = parameters_list self.from_xlsx = xlsx_fn self.sheet_name = sheet_name + + ## set attributes of keys in _kafka_process() for processing data + for key in _kafka_process(): + setattr(self, key, []) + + ## set attributes of keys in self.parameters_list for input parameters self.print_dic = de._read_input_xlsx(self.from_xlsx, sheet_name=self.sheet_name) - - ## Every attribute in self.inputs is a list!!! + ## Every attribute in self.inputs is default to a list!!! self.inputs = dic_to_inputs(self.print_dic, self.parameters_list) try: - ## Append agent in the list of self.inputs.agent - if self.inputs.agent==[]: - self.inputs.agent.append( - build_agen( - peak_target=self.inputs.peak_target[0], - agent_data_path=self.inputs.agent_data_path[0]) - ) + ## Assign Agent to self.agent + self.agent = build_agen( + peak_target=self.inputs.peak_target[0], + agent_data_path=self.inputs.agent_data_path[0]) ## self.inputs.sandbox_uri[0] is just the uri of sandbox - ## so, turn uri into client and assign it to self.inputs.sandbox_tiled_client + ## so, turn uri into client and assign it to self.sandbox_tiled_client if type(self.inputs.sandbox_uri[0]) is str: - self.inputs.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0]) + self.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0]) ## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn @@ -110,47 +127,170 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): for i in fn_glob: getattr(self.inputs, fn).append(i) + + ## Making rate_label_dic by rate_label_dic_key and rate_label_dic_value + self.rate_label_dic = {} + for key, value in zip(self.inputs.rate_label_dic_key, rate_label_dic_value): + self.rate_label_dic.update({key: value}) except AttributeError: pass + def auto_rate_list(self, pump_list, new_points, fix_Br_ratio): + """Auto transfer the predicted rates in new_points to a rate_list for qserver + + Args: + pump_list (list): pump list for qserver + new_points (dict): new_points predicted by self.agent + fix_Br_ratio (list): if fix ratio of CsPb/Br. If fixed, the ratio is fix_Br_ratio[1] + + Returns: + list: rate list for importing to qserver + """ + set_target_list = [0 for i in range(len(pump_list))] + rate_list = [] + for i in self.inputs.new_points_label: + if i in new_points['points'].keys(): + rate_list.append(new_points['points'][i][0]) + else: + pass + + if fix_Br_ratio[0]: + rate_list.insert(1, rate_list[0]*fix_Br_ratio[1]) + + return rate_list + + + + def macro_agent(self, qserver_process, RM, check_target=False): + """macro to build agent, make optimization, and update agent_data + + This macro will + 1. Build agent from agent_data_path = self.inputs.agent_data_path[0] + 2. Make optimization + 2. Update self.agent_data with target, predicted mean & standard deviation + self.agent_data['agent_target']: agent_target + self.agent_data['posterior_mean']: post_mean + self.agent_data['posterior_stddev']: post_stddev + 3. Check if meet target. If meet, wash loop; if not, keep iteration. + 4. Update self.agent_iteration + + Args: + qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. + RM (REManagerAPI): Run Engine Manager API. + check_target (bool, optional): Check if peak emission reaches peak target. Defaults to False. + + Returns: + dict: new_points predicted by self.agent + """ + + qin = qserver_process.inputs + peak_target = self.inputs.peak_target[0] + peak_tolerance = self.inputs.peak_target[1] + + self.agent = build_agen( + peak_target = peak_target, + agent_data_path = self.inputs.agent_data_path[0], + use_OAm = True) + + if len(self.agent.table) < 2: + acq_func = "qr" + else: + acq_func = "qei" + + new_points = self.agent.ask(acq_func, n=1) + + ## Get target of agent.ask() + agent_target = self.agent.objectives.summary['target'].tolist() + + ## Get mean and standard deviation of agent.ask() + res_values = [] + for i in self.inputs.new_points_label: + if i in new_points['points'].keys(): + res_values.append(new_points['points'][i][0]) + x_tensor = torch.tensor(res_values) + posterior = agent.posterior(x_tensor) + post_mean = posterior.mean.tolist()[0] + post_stddev = posterior.stddev.tolist()[0] + + ## apply np.exp for log-transform objectives + if_log = self.agent.objectives.summary['transform'] + for j in range(if_log.shape[0]): + if if_log[j] == 'log': + post_mean[j] = np.exp(post_mean[j]) + post_stddev[j] = np.exp(post_stddev[j]) + + ## Update target, mean, and standard deviation in agent_data + self.agent_data = {} + self.agent_data.update({'agent_target': agent_target}) + self.agent_data.update({'posterior_mean': post_mean}) + self.agent_data.update({'posterior_stddev': post_stddev}) + + peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target) + meet_target = (peak_diff <= peak_tolerance) + + if check_target and meet_target: + print(f'\nTarget peak: {self.inputs.peak_target[0]} nm vs. Current peak: {self.PL_fitting['peak_emission']} nm\n') + print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n') + + ### Stop all infusing pumps and wash loop + sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', + zmq_control_addr=qin.zmq_control_addr[0], + zmq_info_addr=qin.zmq_info_addr[0]) + + inst1 = BInst("queue_stop") + RM.item_add(inst1, pos='front') + self.agent_iteration.append(False) + + else: + self.agent_iteration.append(True) + + return new_points + + def macro_01_get_iq(self, iq_I_uid): """macro to get iq data, used in kafka consumer whiel taking xray_uvvis_plan and analysis of pdfstream finished This macro will - 0. Reset self.inputs.iq_data as an empty list - 1. Assgin sandbox entry to self.inputs.entry - 2. Append 4 elements into self.inputs.iq_data - self.inputs.iq_data[0]: chi_Q - self.inputs.iq_data[1]: chi_I - self.inputs.iq_data[2]: np.array([chi_Q, chi_I]) - self.inputs.iq_data[3]: pd.DataFrame([chi_Q, chi_I]) - 3. Reset self.inputs.uid to an empty list + 0. Reset self.iq_data as an empty {} + 1. Assgin sandbox entry to self.entry + 2. Update iq_data as a dict into self.iq_data + self.iq_data['Q']: chi_Q + self.iq_data['I']: chi_I + self.iq_data['array']: np.array([chi_Q, chi_I]) + self.iq_data['df']: pd.DataFrame([chi_Q, chi_I]) + 3. Reset self.uid to an empty list Args: iq_I_uid (str): uid of analysis data, read from doc[1]['data']['chi_I'] """ - self.inputs.uid_pdfstream.append(iq_I_uid) - self.inputs.entry = self.inputs.sandbox_tiled_client[iq_I_uid] - df = self.inputs.entry.read() - # Before appending I(Q) data, reset self.inputs.iq_data as an empty list - self.inputs.iq_data = [] - self.inputs.iq_data.append(df['chi_Q'].to_numpy()) - self.inputs.iq_data.append(df['chi_I'].to_numpy()) + self.uid_pdfstream.append(iq_I_uid) + self.entry = self.sandbox_tiled_client[iq_I_uid] + df = self.entry.read() + # Before updating I(Q) data, reset self.iq_data as an empty dict + self.iq_data = {} + # self.iq_data.append(df['chi_Q'].to_numpy()) + # self.iq_data.append(df['chi_I'].to_numpy()) iq_array = np.asarray([df['chi_Q'].to_numpy(), df['chi_I'].to_numpy()]) - self.inputs.iq_data.append(iq_array) + # self.iq_data.append(iq_array) iq_df = pd.DataFrame() iq_df['q'] = df['chi_Q'].to_numpy() iq_df['I(q)'] = df['chi_I'].to_numpy() - self.inputs.iq_data.append(iq_df) + # self.iq_data.append(iq_df) - ## Reset self.inputs.uid to an empty list - self.inputs.uid = [] + iq_data = { 'Q':df['chi_Q'].to_numpy(), + 'I':df['chi_I'].to_numpy(), + 'array':iq_array, + 'df':iq_df} + self.iq_data.update(iq_data) + + ## Reset self.uid to an empty list + self.uid = [] @@ -158,19 +298,19 @@ def macro_02_get_uid(self): """macro to get raw data uid, used in kafka consumer This macro will - 1. Assign raw data uid to self.inputs.uid - 2. Append raw data uid to self.inputs.uid_catalog - 3. Update self.inputs.stream_list + 1. Assign raw data uid to self.uid + 2. Append raw data uid to self.uid_catalog + 3. Update self.stream_list """ ## wait 1 second for databroker to save data time.sleep(1) - self.inputs.uid = self.inputs.entry.metadata['run_start'] - self.inputs.uid_catalog.append(self.inputs.uid) - stream_list = self.inputs.tiled_client[self.inputs.uid].metadata['summary']['stream_names'] - ## Reset self.inputs.stream_list to an empty list - self.inputs.stream_list = [] + self.uid = self.entry.metadata['run_start'] + self.uid_catalog.append(self.uid) + stream_list = self.tiled_client[self.uid].metadata['summary']['stream_names'] + ## Reset self.stream_list to an empty list + self.stream_list = [] for stream_name in syringe_list: - self.inputs.stream_list.append(stream_name) + self.stream_list.append(stream_name) @@ -180,24 +320,24 @@ def macro_03_stop_queue_uid(sefl, RM): This macro will 1. Stop queue - 2. Assign raw data uid to self.inputs.uid - 3. Append raw data uid to self.inputs.uid_catalog - 4. Update self.inputs.stream_list + 2. Assign raw data uid to self.uid + 3. Append raw data uid to self.uid_catalog + 4. Update self.stream_list Args: - RM (REManagerAPI): Run Engine Manager API. Defaults to RM. + RM (REManagerAPI): Run Engine Manager API. """ inst1 = BInst("queue_stop") RM.item_add(inst1, pos='front') ## wait 1 second for databroker to save data time.sleep(1) - self.inputs.uid = message['run_start'] - self.inputs.uid_catalog.append(self.inputs.uid) + self.uid = message['run_start'] + self.uid_catalog.append(self.uid) stream_list = list(message['num_events'].keys()) - ## Reset self.inputs.stream_list to an empty list - self.inputs.stream_list = [] + ## Reset self.stream_list to an empty list + self.stream_list = [] for stream_name in syringe_list: - self.inputs.stream_list.append(stream_name) + self.stream_list.append(stream_name) @@ -206,21 +346,27 @@ def macro_04_dummy_pdf(sefl): while self.inputs.dummy_pdf[0] is True This macro will - 0. Reset self.inputs.iq_data as an empty list - 1. Read pdf data from self.inputs.iq_fn[-1] - 2. Append 4 elements into self.inputs.iq_data - self.inputs.iq_data[0]: chi_Q - self.inputs.iq_data[1]: chi_I - self.inputs.iq_data[2]: np.array([chi_Q, chi_I]) - self.inputs.iq_data[3]: pd.DataFrame([chi_Q, chi_I]) + 0. Reset self.iq_data as an empty dict + 1. Read pdf data from self.iq_fn[-1] + 2. Update iq_data as a dict into self.iq_data + self.iq_data['Q']: iq_array[0] + self.iq_data['I']: iq_array[1] + self.iq_data['array']: iq_array + self.iq_data['df']: iq_df """ - self.inputs.iq_data = [] - iq_array = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T - self.inputs.iq_data.append(iq_array[0]) - self.inputs.iq_data.append(iq_array[1]) - self.inputs.iq_data.append(iq_array) - iq_df = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ') - self.inputs.iq_data.append(iq_df) + self.iq_data = {} + iq_array = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T + # self.iq_data.append(iq_array[0]) + # self.iq_data.append(iq_array[1]) + # self.iq_data.append(iq_array) + iq_df = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ') + # self.iq_data.append(iq_df) + + iq_data = { 'Q':iq_array[0], + 'I':iq_array[1], + 'array':iq_array, + 'df':iq_df} + self.iq_data.update(iq_data) @@ -233,15 +379,15 @@ def macro_05_iq_to_gr(self, beamline_acronym): 3. Read pdf background file from self.inputs.bkg_fn[-1] 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0] 5. Read saved g(r) into pd.DataFrame and save again to remove the headers - 6. Update g(r) data path and data frame to self.inputs.gr_data - self.inputs.gr_data[0]: gr_data (path) - self.inputs.gr_data[1]: gr_df + 6. Update g(r) data path and data frame to self.gr_data + self.gr_data[0]: gr_data (path) + self.gr_data[1]: gr_df Args: beamline_acronym (str): catalog name for tiled to access data """ # Grab metadat from stream_name = fluorescence for naming gr file - fn_uid = de._fn_generator(self.inputs.uid, beamline_acronym=beamline_acronym) + fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) gr_fn = f'{fn_uid}_scattering.gr' ### dummy test, e.g., CsPbBr2 @@ -252,7 +398,7 @@ def macro_05_iq_to_gr(self, beamline_acronym): pdfconfig = PDFConfig() pdfconfig.readConfig(self.inputs.cfg_fn[-1]) pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1] - sqfqgr_path = gp.transform_bkg(pdfconfig, self.inputs.iq_array[2], output_dir=self.inputs.iq_to_gr_path[0], + sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0], plot_setting={'marker':'.','color':'green'}, test=True, gr_fn=gr_fn) gr_data = sqfqgr_path['gr'] @@ -261,9 +407,9 @@ def macro_05_iq_to_gr(self, beamline_acronym): gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ') gr_df.to_csv(gr_data, index=False, header=False, sep =' ') - self.inputs.gr_data = [] - self.inputs.gr_data.append(gr_data) - self.inputs.gr_data.append(gr_df) + self.gr_data = [] + self.gr_data.append(gr_data) + self.gr_data.append(gr_df) @@ -272,8 +418,8 @@ def macro_06_search_and_match(self, gr_fn): using package Refinery from updated_pipeline_pdffit2.py Args: - gr_fn (str): g(r) data path for searching and matching, ex: self.inputs.gr_data[0] or self.inputs.gr_fn[0] - if using self.inputs.gr_data[0], g(r) is generated in workflow + gr_fn (str): g(r) data path for searching and matching, ex: self.gr_data[0] or self.inputs.gr_fn[0] + if using self.gr_data[0], g(r) is generated in workflow if using self.inputs.gr_fn[0], g(r) is directly read from a file Returns: @@ -316,16 +462,16 @@ def macro_07_fitting_pdf(self, gr_fn, beamline_acronym, 1. Do pdf refinement of gr_fn 2. Generate a filename for fitting data by using metadata of stream_name == fluorescence 3. Save fitting data - 4. Update self.inputs.pdf_property - 5. Update fitting data at self.inputs.gr_fitting - self.inputs.gr_fitting[0]: pf.getR() - self.inputs.gr_fitting[1]: pf.getpdf_fit() - self.inputs.gr_fitting[2]: np.array([pf.getR(), pf.getpdf_fit()]) - self.inputs.gr_fitting[3]: pd.DataFrame([pf.getR(), pf.getpdf_fit()]) + 4. Update self.pdf_property + 5. Update fitting data at self.gr_fitting + self.gr_fitting['R']: pf.getR() + self.gr_fitting['pdf_fit']: pf.getpdf_fit() + self.gr_fitting['array']: np.array([pf.getR(), pf.getpdf_fit()]) + self.gr_fitting['df']: pd.DataFrame([pf.getR(), pf.getpdf_fit()]) Args: - gr_fn (str): g(r) data path for pdf fitting, ex: self.inputs.gr_data[0] or self.inputs.gr_fn[0] - if using self.inputs.gr_data[0], g(r) is generated in workflow + gr_fn (str): g(r) data path for pdf fitting, ex: self.gr_data[0] or self.inputs.gr_fn[0] + if using self.gr_data[0], g(r) is generated in workflow if using self.inputs.gr_fn[0], g(r) is directly read from a file beamline_acronym (str): catalog name for tiled to access data @@ -346,43 +492,482 @@ def macro_07_fitting_pdf(self, gr_fn, beamline_acronym, pf.setphase(i+1) particel_size.append(pf.getvar(pf.spdiameter)) # Grab metadat from stream_name = fluorescence for naming gr file - fn_uid = de._fn_generator(self.inputs.uid, beamline_acronym=beamline_acronym) + fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) fgr_fn = os.path.join(self.inputs.fitting_pdf_path[0], f'{fn_uid}_scattering.fgr') pf.save_pdf(1, f'{fgr_fn}') - self.inputs.pdf_property = {} - self.inputs.pdf_property.update({'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]}) + self.pdf_property = {} + self.pdf_property.update({'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]}) gr_fit_arrary = np.asarray([pf.getR(), pf.getpdf_fit()]) gr_fit_df = pd.DataFrame() gr_fit_df['fit_r'] = pf.getR() gr_fit_df['fit_g(r)'] = pf.getpdf_fit() - self.inputs.gr_fitting = [] - self.inputs.gr_fitting.append(pf.getR()) - self.inputs.gr_fitting.append(pf.getpdf_fit()) - self.inputs.gr_fitting.append(gr_fit_arrary) - self.inputs.gr_fitting.append(gr_fit_df) + self.gr_fitting = {} + gr_fitting = { 'R':pf.getR(), + 'pdf_fit':pf.getpdf_fit(), + 'array': gr_fit_arrary, + 'df': gr_fit_df} + self.gr_fitting.update(gr_fitting) + # self.gr_fitting.append(pf.getR()) + # self.gr_fitting.append(pf.getpdf_fit()) + # self.gr_fitting.append(gr_fit_arrary) + # self.gr_fitting.append(gr_fit_df) def macro_08_no_fitting_pdf(self): - """macro to update self.inputs.gr_fitting while no pdf fitting, used in kafka consumer + """macro to update self.gr_fitting while no pdf fitting, used in kafka consumer + + This macro will + 1. Update fitting data at self.gr_fitting + self.gr_fitting['R']: [] + self.gr_fitting['pdf_fit']: [] + self.gr_fitting['array']: None + self.gr_fitting['df']: pd.DataFrame([np.nan, np.nan]) """ - self.inputs.gr_fitting = [] + self.gr_fitting = {} gr_fit_arrary = None gr_fit_df = pd.DataFrame() gr_fit_df['fit_r'] = np.nan gr_fit_df['fit_g(r)'] = np.nan + + gr_fitting = { 'R':[], + 'pdf_fit':[], + 'array': gr_fit_arrary, + 'df': gr_fit_df} + self.gr_fitting.update(gr_fitting) - self.inputs.gr_fitting.append([]) - self.inputs.gr_fitting.append([]) - self.inputs.gr_fitting.append(gr_fit_arrary) - self.inputs.gr_fitting.append(gr_fit_df) + # self.gr_fitting.append([]) + # self.gr_fitting.append([]) + # self.gr_fitting.append(gr_fit_arrary) + # self.gr_fitting.append(gr_fit_df) pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan} - self.inputs.pdf_property.update(pdf_property) + self.pdf_property.update(pdf_property) + + + + def macro_09_qepro_dic(self, stream_name, beamline_acronym): + """macro to read uv-vis data into dic and save data into .csv file + + This macro will + 1. Read Uv-Vis data according to stream name + 3. Save Uv-Vis data + 4. Update + self.qepro_dic + self.metadata_dic + 5. Reset self.PL_goodbad to an empty {} + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + beamline_acronym (str): catalog name for tiled to access data + """ + ## Read data from databroker and turn into dic + self.qepro_dic, self.metadata_dic = de.read_qepro_by_stream( + self.uid, + stream_name=stream_name, + data_agent='tiled', + beamline_acronym=beamline_acronym) + self.sample_type = self.metadata_dic['sample_type'] + + ## Save data in dic into .csv file + if stream_name == 'take_a_uvvis': + saving_path = self.inputs.csv_path[1] + else: + saving_path = self.inputs.csv_path[0] + de.dic_to_csv_for_stream(saving_path, self.qepro_dic, self.metadata_dic, stream_name=stream_name) + print(f'\n** export {stream_name} in uid: {self.uid[0:8]} to ../{os.path.basename(saving_path)} **\n') + + self.PL_goodbad = {} + + + + def macro_10_good_bad(self, stream_name): + """macro to identify good/bad data of fluorescence (PL) peak + + This macro will + 1. Identify a good or bad PL peak in 'take_a_uvvis' and 'fluorescence' + 2. Update self.PL_goodbad + self.PL_goodbad['wavelength']: wavelenght (nm) of PL + self.PL_goodbad['intensity']: intensity of PL + self.PL_goodbad['data_id']: f'{t0[0]}_{t0[1]}_{metadata_dic["uid"][:8]}' + self.PL_goodbad['peak']: peaks from scipy.find_peaks + self.PL_goodbad['prop']: properties from scipy.find_peaks + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + if self.qepro_dic['QEPro_spectrum_type'][0]==2 and stream_name=='take_a_uvvis': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + x0, y0, data_id, peak, prop = da._identify_one_in_kafka( + self.qepro_dic, + self.metadata_dic, + key_height=self.inputs.key_height[0], + distance=self.inputs.distance[0], + height=self.inputs.height[0], + dummy_test=self.inputs.dummy_kafka[0]) + + elif stream_name == 'fluorescence': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + ## Apply percnetile filtering for PL spectra, defaut percent_range = [30, 100] + x0, y0, data_id, peak, prop = da._identify_multi_in_kafka( + self.qepro_dic, + self.metadata_dic, + key_height=self.inputs.key_height[0], + distance=self.inputs..distance[0], + height=self.inputs.height[0], + dummy_test=self.inputs.dummy_kafka[0], + percent_range=[30, 100]) + + self.PL_goodbad = {} + PL_goodbad = { 'wavelength':np.asarray(x0), 'intensity':np.asarray(y0), + 'data_id':data_id, 'peak':peak, 'prop':prop} + self.PL_goodbad.update(PL_goodbad) + + + + def macro_11_absorbance(self, stream_name): + """macro to apply an offset to zero baseline of absorption spectra + + This macro will + 1. Apply a 2D line to fit the baseline of absorption spectra + 2. Update self.abs_data + self.abs_data['wavelength']: wavelenght of absorbance nm + self.abs_data['absorbance']: absorbance array (percentile_mean) + self.abs_data['offset']: absorbance array offset + 3. Update self.abs_fitting + self.abs_fitting['fit_function']: da.line_2D + self.abs_fitting['curve_fit']: popt_abs (popt from scipy.curve_fit) + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + ## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85] + abs_per = da.percentile_abs(self.qepro_dic['QEPro_x_axis'], + self.qepro_dic['QEPro_output'], + percent_range=[15, 85]) + + print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') + # abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) + abs_array = abs_per.mean(axis=0) + wavelength = qepro_dic['QEPro_x_axis'][0] + + popt_abs01, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[205, 240], plot=False) + popt_abs02, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False) + if abs(popt_abs01[0]) >= abs(popt_abs02[0]): + popt_abs = popt_abs02 + elif abs(popt_abs01[0]) <= abs(popt_abs02[0]): + popt_abs = popt_abs01 + + abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs) + print(f'\nFitting function for baseline offset: {da.line_2D}\n') + ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs} + + self.abs_data = {} + self.abs_data.update({'wavelength':wavelength, 'absorbance':abs_array, 'offset':abs_array_offset}) + + self.abs_fitting = {} + self.abs_fitting.update(ff_abs) + + de.dic_to_csv_for_stream(self.inputs.csv_path[0], + self.qepro_dic, self.metadata_dic, + stream_name=stream_name, + fitting=ff_abs) + + + def macro_12_PL_fitting(self): + """macro to do peak fitting with gaussian distribution of PL spectra + + This macro will + 1. Apply a 1-peak gaussian fitting to fit the peak of PL spectra + 2. Update self.PL_fitting + self.PL_fitting['fit_function']: da._1gauss + self.PL_fitting['curve_fit']: popt from scipy.curve_fit + self.PL_fitting['fwhm']: full width at half maximum of highest peak + self.PL_fitting['peak_emission']: highest peak position + self.PL_fitting['wavelength']: wavelenght (nm) of PL between 400 ~ 800 nm + self.PL_fitting['intensity']: intensity of PL (nm) between 400 ~ 800 nm + self.PL_fitting['shifted_peak_idx']: peak index between 400 ~ 800 nm + + """ + + x, y, p, f_fit, popt = da._fitting_in_kafka( + self.PL_goodbad['wavelength'], + self.PL_goodbad['intensity'], + self.PL_goodbad['data_id'], + self.PL_goodbad['peak'], + self.PL_goodbad['prop'], + dummy_test=self.inputs.dummy_kafka[0]) + + fitted_y = f_fit(x, *popt) + r2_idx1, _ = da.find_nearest(x, popt[1] - 3*popt[2]) + r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2]) + r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0) + + metadata_dic["r_2"] = r_2 + + if 'gauss' in f_fit.__name__: + constant = 2.355 + else: + constant = 1 + + intensity_list = [] + peak_list = [] + fwhm_list = [] + for i in range(int(len(popt)/3)): + intensity_list.append(popt[i*3+0]) + peak_list.append(popt[i*3+1]) + fwhm_list.append(popt[i*3+2]*constant) + + peak_emission_id = np.argmax(np.asarray(intensity_list)) + peak_emission = peak_list[peak_emission_id] + fwhm = fwhm_list[peak_emission_id] + + ff={'fit_function': f_fit, 'curve_fit': popt, + 'fwhm': fwhm, 'peak_emission': peak_emission, + 'wavelength': x, 'intensity': y, + 'shifted_peak_idx': p} + + self.PL_fitting = {} + self.PL_fitting.update(ff) + + + def macro_13_PLQY(self): + """macro to calculate integral of PL peak, PLQY and update optical_property + + This macro will + 1. Integrate PL peak by scipy.integrate.simpson + 2. Calculate PLQY based on the parameters in self.inputs.PLQY + 3. Update self.optical_property + self.optical_property['PL_integral']: PL peak integral + self.optical_property['Absorbance_365']: absorbance at 365 nm + self.optical_property['Peak']: highest peak position + self.optical_property['FWHM']: full width at half maximum of highest peak + self.optical_property['PLQY']: Photoluminescence Quantum Yield + + """ + + ## Integrate PL peak + x = self.PL_fitting['wavelength'] + y = self.PL_fitting['intensity'] + peak_emission = self.PL_fitting['peak_emission'] + PL_integral_s = integrate.simpson(y) + + ## Find absorbance at 365 nm from absorbance stream + idx1, _ = da.find_nearest(self.abs_data['wavelength'], self.inputs.PLQY[2]) + absorbance_s = self.abs_data['offset'][idx1] + + if self.inputs.PLQY[1] == 'fluorescein': + plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *self.inputs.PLQY[3:]) + else: + plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *self.inputs.PLQY[3:]) + + + plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy} + self.plqy_dic = {} + self.plqy_dic.update(plqy_dic) + + optical_property = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, + 'Peak':peak_emission, 'FWHM':fwhm, 'PLQY':plqy} + self.optical_property = {} + self.optical_property.update(optical_property) + + + + def macro_14_upate_agent(self): + """macro to update agent_data in type of dict for exporting as json and wirte to sandbox + + This macro will + 1. Update self.agent_data with + self.optical_property + self.pdf_property + self.metadata_dic + self.abs_fitting + self.PL_fitting + + """ + + ## Creat agent_data in type of dict for exporting as json and wirte to sandbox + if 'agent_target' in self.agent_data.keys(): + pass + else: + self.agent_data = {} + self.agent_data.update(self.optical_property) + self.agent_data.update(self.pdf_property) + self.agent_data.update({k:v for k, v in self.metadata_dic.items() if len(np.atleast_1d(v)) == 1}) + self.agent_data.update(de._exprot_rate_agent(self.metadata_dic, self.rate_label_dic, self.agent_data)) + + ## Update absorbance offset and fluorescence fitting results inot agent_data + ff_abs = self.abs_fitting + ff = self.PL_fitting + self.agent_data.update({'abs_offset':{'fit_function':ff_abs['fit_function'].__name__, 'popt':ff_abs['curve_fit'].tolist()}}) + self.agent_data.update({'PL_fitting':{'fit_function':ff['fit_function'].__name__, 'popt':ff['curve_fit'].tolist()}}) + + + + def macro_15_save_data(self, stream_name): + """macro to save processed data and agent data + + This macro will + 1. Save fitting data locally (self.inputs.csv_path[0]) + 2. Save processed data in df and agent_data as metadta in sandbox + 3. Save agent_data locally (self.inputs.agent_data_path[0]) + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + ## Save fitting data + print(f'\nFitting function: {self.PL_fitting['fit_function']}\n') + de.dic_to_csv_for_stream(self.inputs.csv_path[0], + self.qepro_dic, + self.metadata_dic, + stream_name = stream_name, + fitting = self.PL_fitting, + plqy_dic = self.plqy_dic) + print(f'\n** export fitting results complete**\n') + + ## Save processed data in df and agent_data as metadta in sandbox + if self.inputs.write_to_sandbox[0] and (stream_name == 'fluorescence'): + df = pd.DataFrame() + x0 = self.PL_goodbad['wavelength'] + df['wavelength_nm'] = x0 + df['absorbance_mean'] = self.abs_data['absorbance'] + df['absorbance_offset'] = self.abs_data['offset'] + df['fluorescence_mean'] = self.PL_goodbad['intensity'] + f_fit = self.PL_fitting['fit_function'] + popt = self.PL_fitting['curve_fit'] + df['fluorescence_fitting'] = f_fit(x0, *popt) + + ## use pd.concat to add various length data together + df_new = pd.concat([df, self.iq_data['df'], self.gr_data[1], self.gr_fitting['df']], ignore_index=False, axis=1) + + # entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data) + entry = self.sandbox_tiled_client.write_dataframe(df_new, metadata=self.agent_data) + # uri = sandbox_tiled_client.values()[-1].uri + uri = entry.uri + sandbox_uid = uri.split('/')[-1] + self.uid_sandbox.append(sandbox_uid) + self.agent_data.update({'sandbox_uid': sandbox_uid}) + print(f"\nwrote to Tiled sandbox uid: {sandbox_uid}") + + ## Save agent_data locally + if self.inputs.write_agent_data[0] and (stream_name == 'fluorescence'): + # agent_data.update({'sandbox_uid': sandbox_uid}) + with open(f"{self.inputs.agent_data_path[0]}/{self.PL_goodbad['data_id']}.json", "w") as f: + json.dump(agent_data, f, indent=2) + + print(f"\nwrote to {self.inputs.agent_data_path[0]}\n") + + + def macro_16_num_good(self, stream_name): + """macro to add self.PL_goodbad['data_id'] into self.good_data or self.bad_data + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + type_peak = type(kafka_process.PL_goodbad['peak']) + type_prop = type(kafka_process.PL_goodbad['prop']) + + ## Append good/bad idetified results + if stream_name == 'take_a_uvvis': + + if (type_peak is np.ndarray) and (type_prop is dict): + self.good_data.append(self.PL_goodbad['data_id']) + + elif (type(self.PL_goodbad['peak']) == list) and (self.PL_goodbad['prop'] == []): + self.bad_data.append(self.PL_goodbad['data_id']) + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {self.uid[:8]} ***\n") + print(f"\n*** since {stream_name} in uid: {self.uid[:8]} is a bad data.***\n") + + print('\n*** export, identify good/bad, fitting complete ***\n') + + print(f"\n*** {self.sample_type} of uid: {self.uid[:8]} has: ***\n" + f"{self.optical_property = }***\n" + f"{self.pdf_property = }***\n") + + + def macro_17_add_queue(self, stream_name, qserver_process, RM): + """macro to add queus task to qserver + + This macro will + 1. Add another 'take_a_uvvis' depending on # of good_data + 2. Add new_points preditcted by self.agent + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. + RM (REManagerAPI): Run Engine Manager API. + """ + + qin = qserver_process.inputs + ## Depend on # of good/bad data, add items into queue item or stop + if (stream_name == 'take_a_uvvis') and (self.inputs.use_good_bad[0]): + if len(self.bad_data) > 3: + print('*** qsever aborted due to too many bad scans, please check setup ***\n') + + ### Stop all infusing pumps and wash loop + sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', + zmq_control_addr=qin.zmq_control_addr[0], + zmq_info_addr=qin.zmq_info_addr[0]) + + RM.queue_stop() + + elif (len(self.good_data) <= 2) and (self.inputs.use_good_bad[0]): + print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n') + + restplan = BPlan('sleep_sec_q', 5) + RM.item_add(restplan, pos=0) + + scanplan = BPlan('take_a_uvvis_csv_q', + sample_type=self.metadata_dic['sample_type'], + spectrum_type='Corrected Sample', + correction_type='Dark', + pump_list=qin.pump_list, + precursor_list=qin.precursor_list, + mixer=qin.mixer) + RM.item_add(scanplan, pos=1) + RM.queue_start() + + elif (len(self.good_data) > 2) and (self.inputs.use_good_bad[0]): + print('*** # of good data is enough so go to the next: bundle plan ***\n') + self.bad_data.clear() + self.good_data.clear() + self.finished.append(self.metadata_dic['sample_type']) + print(f'After event: good_data = {self.good_data}\n') + print(f'After event: finished sample = {self.finished}\n') + + RM.queue_start() + + ## Add predicted new points from ML agent into qserver + elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.inputs.agent_iteration[-1]): + print('*** Add new points from agent to the fron of qsever ***\n') + + new_points = self.macro_agent(qserver_process, RM, check_target=True) + + print(f'*** New points from agent: {new_points} ***\n') + + rate_list = self.auto_rate_list(qin.pump_list, new_points, self.inputs.fix_Br_ratio) + + if self.inputs.post_dilute[0]: + rate_list.append(sum(rate_list)*self.inputs.post_dilute[1]) + + qin.sample = de._auto_name_sample(rate_list, prefix=qin.prefix[1:]) + qin.infuse_rates = rate_list + sq.synthesis_queue_xlsx(qserver_process) + + else: + print('*** Move to next reaction in Queue ***\n') + time.sleep(2) + # RM.queue_start() + + + diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index 02800d65ce4a02c1fdeee95e4f43a4b5c6b16378..1998e987c4064262272dad4ec62835610a68135c 100644 GIT binary patch delta 10611 zcmZ8{1yCGKw>EAeK*Hkg9&B-fySs0I;7)+xFhFpJ#oZwUmjFv}5AN;|AUFgFe)Hzu z|9k)XrfRynr_br`se0y_V-w%tW7^aF&FhDfjF`K_UtH zLi7A>b7>!k>y_Y!i;O9j%U zUq~QeV}E#jvqyY%kb>CLNO!BFDV|pwV2r~O&5wQ#gY~(5nGKSL-4+LJC|ftbudj@H zSs1eTQO4p)a@W=csOM1Z($3LAw~D4MSJh#~`|Lo3737H5kd(Z*%_ZWH_1tBdRR&6r zFXWBAzQ6Ys=chg*^kE4>S+BNyQMN?pA4-g%IVBlo{Fz$d3-v2113!<41$V3SPD0e{ zUP8Sp3na9@{*`o;o*^({Z|pN-&@2BmXQSccKynPhDab}QFf_-r<{>nQVVBVkJ6{#_ z#k}(6bMq0^>Qq3ZU|iI$meSC4c$SMwI;BJ+S;U!DKRzKH6C=yD%=4pLxWKTl>?~OZ;7gz9c~u_Os~xG8C(11z6@vFGT8t=8=Gv!_A?xiXUOX2SUij5HeW}&X z!<5|p@KdYR{xxQ%%1NtSR9(uW-M2NH(?=ergfy<0KS0ITLI>7CGiQV;&ud9XAKq~V zdB?&p#_pji>$-WVawv*f%@oSJ1hA64M)Rf<)3gi*Fu3ITE7` zYtHWl0v(z;)z+m0EtKfn?L6xj&QYv+vwdo95w2QEPiEN`QFq;~dFPOZiHqbut8rt` z5>n{IMd`OXZ%mVnqNQ>t2YkA!##-z0L8Y-GkEIwB=5K3h?-02>dhg%i9X0L~*&w1tm2>x>DA*UG- zvpylh9k`12Bp;9jDOpX!?Q_EH)g;AozPeQu1?;fPyWeQY(M8(&t+@Fe#uJk;XRNla z?~6hcKzQ|rFB}mtxI3`pYEj_eVo0E`*ogqnH=YjmS(=7U%akDh+=ge9b8@qz8HY-K z?b&jJcO^EfyXhvRg#qUQE4;@YH)8e-8b!?I7Ou^|TS@sx$gdS-3>_;b93teh^1_>x zbPFKK`+ISzLu8Q^g$~lij$RR1D+OVymrgZC)6g^4iUtAFa)z<;;OQkj{|_A z-seX3nXZY7nq$mTZ8)ZgY1*OMf&yvT&~Bri{6hndr;0_H^Eonk%+#4cU^#pY%RUk! zSz4y=1Vf)LM~p0v?oe2^17K3M;E1m?OXU zZ@C_QeB+o%(%)QmM20HWZW?Z4p8-V2d}%M2Eh1ye6n?QEY($R=4zXJLJDLxSnDu1Z&^VT>|Gw$6%+z6-`zy+0rB9x{pz`&$?AYR zZAdx!fU%$ScmFxvbB)8sQbRZncvBMVZyXjX_P4mHXZL3AhDKaWz|=)knGs^$M)4NUs+cwB5PZmel0o=@x;tyob99 zlbNETpSIb=Ac(dh;q?=S#WWAiu;hyN#B;DY27?bxMFqL0ui zT$c`<#8KeRHEV)L{4_cG2f}!zYRRC&JoFD@mb&SqJ{!l4mLC=t`QYlWv1^bpC^~|I z^ZtNE>MvNJLm(W$VA1U9YjRl-t8YM^f1BQ(_iULQ&CN=9 zB4Ng`I3N3E;%<4efn5`9q!FE%&3G=)toeEyb!zGmZKQ!j{;_b&x~{j9j!v5#+H@_R zyuPK<+di*ZK$jcqvvnV>ct@S`r$-z;j)O^75&8_64^+vRWHR=?D4Vewvgq9;s1V7@ zzOX{j^-ekXQJvJVGsmxaib!R-eA$b5K(mFb)l;^ zKM)$KOJD`;Q%61OSkKly`RUbc2)E>wdYnHz^bAWg<@cCs4@Q=E#(|BCndpb53G#9^ zNLTva-kC)|_3Q8BDO2`sU+%?=@r;DdRK+Z!{Ki=6rQ8XAExd!u$>?)V3-W#958HS? zDc%!LiH4=#SYfk6#ya#GELDVwt`t++QPiC^A|rw0q>i+rNk~R7ld|RZa`lfh`9bAl zcwBduUN&~s@$L_U0)#R9tj!xTfwa+b!)HAbDvPyW!rpx#mhJrRF-GFY%L(U$2LAdf z;5%IY#}f-P4pQ`|^?QNY1HKg0dg zB-PL1mm*-%Y&|HqqHOhGnkjU4^1h}tF?`r0mmx4Y{fw{Z4f~6+VG{0N&&=?!% z%6QPaQnax+BoW0@#JIdIeE(d8a*iNkUm4u+^{=o=v!{5O2 z)8NIoBQb+Xu)Iroi~V3ieod2eT6rP@dCY>f^+Q5$&=&cnN25JtyrhrKEo1qrgbcmV zut-mV?lGT+E&;!9xSTVE9dEA9iiluxZ7Xw&@5BOj%RDCE-A*@;Kq!L!6iAw9w?NJ_hm z{z!3fq|+ain4g=cJ=wdDQ|dMjjBe%yO-Ejwp{eBcaSo&@2YutfXsn58zi#eH9?W93EZTMs&EUx!NfkrB1)$vX6m zRRr+^{(|D426#@=J9b>=!1PU~#EzFs>CkG1#}`_CM>B|Uq$>m#Z)$sFY~>zuHCKkm zXb^~$mG>xGJ-OcYZo}TYzny67cwG7>>U8Yxdhg}mHtD2#?0&hNvNt?^BieK2-qOC7 z+hl3=YeIU8dZLcX$2Rj#elP`=l6!qeQ-$_Oc8zGlt8!m7qhT}s ze4ds5GelGsy69ZJvXw7P#hiYvD>+oE1{zvbxmaph^3=V(o&E`CXI{~jhKTjfk1x58 ziKDLr^u0&rb-WinO~q0U$|)pF8Q7SSf}uvyb^5YSR~U^Icdy7mO9AYv-fS<7oam}+ z2gj!JBoE3xSzdyc?LZH_)*vI7DaIGlPQ`;TyvUsKHgO`ukT+ioCi-+hu!TqoGs7X< zOr5X&C=GtJ;0CUCRzGWfHmGGZe?v%8X-Nd|g8~)zYHEOO+45=%=k0n%<>vcXq{B9U znDw2YPT)};_~3QD_SZ#}2k!>W+!(U2NA))Ji#&!|FrEBY-4eXzw5H)X*iC))bl@xp z^B_=NTv57tsr83zjj9j74bspaMK@{_F>Es2(5Jv`u@>JOzv|m$fT3XJs820wj5k!k z?vz9ZtumwX)O%61mftA^vK4CX&2JcnIYZo(qGBia3rJh$1{zW=zDN)cY-4INzSF6m zmm!ycq#Q{6@+Bj$6x)x)`ey*<&fnr8wnL`{|g@WgO>Y+K; z{5fb4A;p6OFx0J5R#_;*05_o$3Y4gDmk(K~HBk}e@`%VV&t#-I7gtUr%i9^$t5vz2 z6i~MDUt}GB^DIy#keb|DG&#@ssk^ROYy;IQ23PVhIt1w^=_t&Xd@i!9>~a*LFKnW< z;jyFkYfZXV=KhlPapzUxh>Laj51~3wS&aUnSvwFd43`;400R-EK1sTO2yUwB}cNBH>&5Sv@DU{rdZb_dj7MU zGzFMo@{*FLZJ8QZXG44wbJxHg5fZmdiQ`*6^Ve{+AAr>-W|e2Dc(7~FzFW2mn&-qI zBpB5F@^te?WlKt7Rwm3IpM$&p#r9B-$`@m1 zI!5XECFa<%!Xs-Pnf5V|7L-oe!3h|X&9_qpg=}jvSAt9c%iuwSC1#H*W?yxwY@UCM|ewD2$sgk_<%#vXZ8<&DUil8V}XRaMVrE71F*~mA+TDgeq*k z3@fL8ah2q2vg?|IBbfOM<&NI;@J;`us@X4st{zA&2qHWpih^3cZR>p8ou(boVZw`! zA)rPX1W?DCBD$(I`{i)CJ;l8fidIj<`kuP2iLVN@O^YsCcae?6+`VC0&tv_eAwD|) z-I%q`Q)e>aO_uT$%)>lY~7s@(p7 z5U)S~H?puEyl0O23}7Oews(Wa)NU^L#qvHr`JMK}3>}zI37lt*Wc>`$((DfH>iq#S zTu@s1CZ+9wj*Wt=TIsnIQmS(1Mkh?DBpkDKfKQ&f(G`35e9~>QWrL^MaaL9@6d@~k zJNV19x6yWJuZGdy!?R(2t+BtgzdH9_$t`oYpPAto>UO^>2jDczp!){({k8Hc@l*cv zYvq?EaxsL;BqjJfp+>=pdIS#HNEkC$F9gvWG-1NSR-d~pm@N9cvx}!O`U(p)1REEi zQps!d5zNV1bP6%linY?EL6e8=95XT9IO!#h^ucXS*XUr?_wwmcpLI zOda!dezS!VX`u3fV-%#BV1wW>sxtIm%IyoiV=x%I+F~SWJoC6WnB22;rg&||+A~ji< z4lq2p>g+Sh|Jh5~VC3mV|3)~#`jfy4v~O^!8SXOL2nc9Pf$cJiz4z9C(<@9vP!Km3 zSJ){iKc`14$3k0L%&rzNQ-#kouPeqkS{oh|LdNla)=3*I10HO|q%U@cea{+vlL|EJG5 zB9qF_jn06Qw9Wq8OW_HIC1Xs-u{cNMvb+1a`Jz+mv4yEELy=t(uRvea+Bl(RmS6-C zGW3i9eM?`55kp>Ax`he{hd}?A#(4gp|GUKT*Z;-2&F1H`jzF7L(2hy9S-M)}XXx#Y z*I9gbHjZQccz2TmO-Oj?Q4x@ zGg$?MzRO+MWN0PrwO=@Q|EEt~szK|&Sf`JTnV7m_zQSCnxTc|dhu5WU3PcoXRrehc z^kZd#*=MM~6@cV#;~K)`&562ozp~Atfg^FlJBI4`JH|7{G!?bR1?ow+yIbB=m-Pg}T*_mJGBZPkk1)Jpk5e@fkRfXeJzr?uFesp{3Vi(}8l`R# zR&}BpsSl!NSvM%1U6jHW7O!c<3Sxds+unM1iNbZ5pyy+rxDqNQn@vB*&GnK2%9`=} z)i)}E1Ij>;ri|j%fG5QNxHwt*U5Ol2#^YrcRugAyFHcqYW zi;nNM7IJMk#I5e0&6bH;TkaPePu}gP20^GZGQz?9QOz(Z(#zwKw9nFRQ_5?7S@rWUc)g2E{>MVyakwUTV9@pwKp7ZnrNW!tL3u;ZrJ`=w=no`zf147RkVund>UjfSEHU}^Df*=~^Hf}pcl$w!W_m8gkV zZg7b_TkwNmyu3`_qUR2Q$&I4SCiHlF-fOWsc{j1DT2x;ZVx2=c^Kxd!qowl4R2gG1 zF)u+YAxo`2*Y#lCqVwQIavxDx_@gDfmZTC7D@0`WI7@Xw$gHt>jJwS$EHxjP62 zlBf$O%3Z$=W4jDvw+~KnCbIhv>x{f3=}~2Jz+BFJuoziAng|lZ5(p~zqJ-Si+3{6w&o`5q)&s!@x-^om504xcux_G3Awd>EDlYuXTWP@3BLTNtzJ#O~pT zzSwB88aiyp^4OpJcN$kYcI0LW^Y#p>BjdJ3t%0X6qn-2%eRf*dsoYz=$WBj|&yRlr zjV)e6Z6Z8f4RKw);AWe z)_%uNd)P>oj;Y0u#Awja+Obe48Yf5HU(?er71Yq)H*H~e*IwOraU?K@e_s)T09Ofu z^U{ag2wCNstPnHYMb6vH>OKr&hMKwnP^uR=W^5oh>teXUVFVM;+Nz`X@R4L{79vWH zBIfY)m+C9J0Xi4~-&o%?+B8*g9016B&6AJM0 zdzo2(1mJhb2_2>xFw}NSl}<|k9v%zMfC=uxH_}fjCxn0uM|t_rti_K4dHpjT&N&JA zzeig^3{3UexYIEbb~qN|Duj_xHBLZsVACfFL+DHQ|DNZBu<8rk0Hu}^yMdJK z=bC!vqmTS(gxWg;PapA|k6z%NZ)jgL;<@XDv|*vbt1_QV?fnAa>%EB);gK;)1l%&~ zG1(~)fZ{E98D)gWSJDRa1mc)Bx~t%FN1uBEmhN1{SC&FR{1Zt{#u4)|N`1 ze4m$mbGW|q_Q?8>{hpq*$aK1J@bG{M!hFJ&<|0~r)?O#_I3-?!y=&)&9u zzI3d}>CyOxS&fiyi?OSeRTF|dY^paYs|Q&<0DeYQouqp77(PAY)O17^jPC!y9_DxI z`rv8X9w{3u?XqlPWYhIQ&-uKpR!+rRSJqJ9nH1lM=p}BH{K{C+cFo3)?bGTBH~tXb zaL{og+XM=!?b#J&HgQ+%X&#Y8t z05Bouye?aC0aYvuQ#wIE`23>zf=2zIFT&R?j=qC2uzNwKfGb)NEk-Hw&!j9nK2d}L z7kcTrX7>f9IP~}8Cn+x!-l9_5*Dv#x9*(M*XMYDTqCt8XIxcFYa+7M8yA3W*&(N1~ zep+USE2QHlJDco#ms@9jq&Bcn(Hc8P=hy@7XKBBeQ*~uAD=lZf4R*zF8xwq0$;H6D z_j~3LOVBDiT<4FTtM5^k(b~B`d|@CzJVq007HZHRAI$x)4_1G}je-D7{u6b1NbaPF zPfN5Y4Y4!ftUu-pV(rX<5~^WVW3gS5)M#Y49Rv#CMmj6oB(iG|Q0tL*-=zHM5q z@*iy`S6;G1p(V6M66v#fOoHLs#R4&|$U0FF3(h`;S}(`$pzad+oBfjEi$|G|WlTPI51@Fd_#Yc7!D}Q64k2HfYS>1|Lbk2=F9$STpwu6rV zZ&~uy&;>87lS5!Bru1;MFzSYg0{Lf6UK(!J65`%oQHp-aLjY?vqNDqf|5i0_Z|rrJ z^-yO_By-1yuI;be1(m(}6KHn9Rb-Ck8|7T90*#ztTKvp# zo4FzT>22-11GMsow-r-m`@KAK&Z3JmLNeou#0Plx-zes6wA{MOUp6HVz$`qx!P-OE=fcDO$Hbh366c4-ha#X3M%1m8n++B$Mz_@-du6j<{o z`@<~|`K!YVb_%~Xz97qE zbq27#9NKi|cjb5WJ)edpn)fB+E5D_ngL+DNX7jX^rCv_5H=P3fB(EIwvbh&$d_|U= z-`*WuG#|3rWm#-{9<%!MQj0{g=o$yV+vgJ!-f`jzIX$meNC1-p~--TF%=4ZjJ zH`(V>n$vPy$N??NkGgBBl$<+9Ug;@V#TlqTZ8`(N8=T>S?=Cm( z+tlS`RT5N$ec>GVZyq`f@nHx(Gl?WHiFd)F(`i~c?a+8<;@7r1jZ;FQS6ngBwC=#6 znbrb-{*OHjYks1JZ!b@F1JNa2sf4zV4I>)kFl&S^gFNw7UW;%*G^W{v+p@utUhW7h zatwtEo)OVCvXnl1q18Q?c=ObIjrpO1=7gch!C464wg-$ahdL~sm=iTP!};YtYvuht zm{liIekRgmZ4S_9IdXU?lw6(=V4P-I;?l%Qm9o%0U%h;_3f9-%Yaw5D zz{^+;$x~#21F}4PB^BSK6z@&cfKdzT1TVYKF9$@QM_cDrX1Y;91Oka{&A||&5agZ^ z-eq5-dmPLz25Anz)V_Fk80n&F7}7ESb7C&F+26m$CjMgtzRT8AFU?(s{l@Z>qc6{Q?1d z+FP*e%(-vtO~cl2!i#NSTV5x!v+IE@HqHSvl)!pSOGhKA^}2TSTW~e6rwv|Yg%biI ze-RFSvG4KPz~Iukj#nSNGlgo@lS<}Mg9BgTSDASdrA0=#QwejZlBfHd+08y&(SC;$yOo=QbK4&d{xsYGR`kvLB_{SWd)cB% zeL^ZPk--6Rl{m7y4)}yHF#@HXbSJ$&J@Am`3h0<;m7g_1A=hX`t+?$3v0)EIo&Y4}< ziWVEUT|4-CJ<2vD&xI*jeqvI_rW1z9Cr6a27F_)&%s$WN12xK3u2^GJ z4!{-z7jV-VT!y+W`QQIO305wgt2zd?p=!o}abP1XkZ#JpFIR+*4 zlD*e`-SWMM3t)4zEBce=V%N4md`pRt zpf@}eFLj?t2_5NOK#AT0My^N|A#|LA2}7@4OX(!)k9k@<5{@_EJU0I)u{~dU+5>mk zI>|ZYs#v|y5u3c|hsL{+TV*}&TO^;A6FXE{D4FP{7*=#1hYr3%8dPy7&c)>8E8fcD z7eO>FM{g$b?59* $i3Pt5(~j`W$6UU8_QM3mtJY22oKsPEYRawZ=Vt;syK3I7M- zP&?ab2;lu|N#;vou5;Odk3TFchlnaqbfPw)P{Dh_QUS}ELutu@`Rm%KAC5*M@&L-$ z#jB+dw#&^oX~Id=EJKcg*~)1rn#3GG=wFg;TSsuBzkapM79lfkqY(8Tj$Kg&$L@=L z27VFZpYXX0-rf#{PgqtnO}CH94o;Z!VzV(5^Aj+h|1#DdE=QnE=rzKopuS4K6exU~ zja@vz4t?Rpu~uZi|G;riMg|<}Y8Tmw0Jv*pJIT}8A?Dz`-#|98(vS9IR2G`xYQ+K6 zT9E$v@diK$b#=J_+m#;9%=6qcLqzXjW4@3MKz@GCDqw{TaVl(T0We+D>o}-8Fjr9! z1A!YZ$8n3+%$qmJ zvq{--U>w%cbP~>ajnrWwk6K|83=A0|JYhpSxk2@{DijDvw?}T=^)qS=@Bl=yhXdqC z@PJLHis3H5EC6=H)9OVH>7tKx;%srBcW4svcq=>i%c=8 zu%98MLkyx;`}RsB+3XjWPk6POIhkgVc1#fGQh+pH-+}xX&u&a$gbkRGp-bkSy#Dtq z!qMz$klFB_8n@6H=5E(`KQJ?LJC&rz{WPl3$wWnC^vNtpAk?r+?>WBd>=9=5#nB4ZoTHsqB2MblBN}W&I`1YR9=>9(b z8KLTCpO-55^QZ~31P_=QUA}ePWkP&?;XNR&?82QdIp))__FrJ5)c`AU#}A=`XJwl= zYMh5TkKaY;!k3lLX=kO~_)xX`<6daXSV0fGVu8ygJYv1??pJ-zV9c!cO;j|mkL_Yo zBJfMQjgs%`TkjVUnKcqEIf?pcv*}X7G~m&-<3N8>A&IF{}y_r|GRFEAs*>pxAjleB00`~3*oW<725eD^sjI9-$FmQ{|Yf1u_NRN zK;s1Ppw33j@L|wQBTCZ$*r-HsaG-x*`cJP)2AV5_4?Q+wM%Yk+cBs%nMIg)wC~8n! k2=!l_L*pU%@XydvhzR@;^atcE{3ew49X-k?{ePDEA8ke|&Hw-a delta 10633 zcmZX41ytTlxApVjQrwGE+^u+#;_mKJ+}%om;to%7DOxD*?oRRIR@~i-U*6ukzV~0> zpS3cXNp_OSnlmSRpE=QO;E7gnB@#ShCT@oTB@7582f>182S#ga6yu?(tE^c~<cS-7{IwCr!TpbgS z&b*Z@{`n*Q)70L$5c9Vt&t=EwBCxQs_`SkUcFs}yg?J)OyjDusBQw1%t!_2j(PC_v z!kHNKRF0{7Mo2F_0bt};m-lHVXM2d(kry}C&5BZh1#BWpkjR2kRK6|kS;y4aNeE|u z8eez}`{?`U9Wq=CKVrQ?%ug!{&pr+>@PxU|Rrt#wJBBw_;FHyxF;QoEk*uPh>yz~B zzy$qE*mg%C`C-E(x>Cuy3_{lfS(-=f=&WQi!KbbEg7T2z3jl7HUR7=mzkExeAR#|q z(4P*VcEAPr6_Ki9e};G63I{I-ItWQq@3e zZM~~vbzASthZQHI#(K(CYkCB1+%sg~U>3|;d$OYvaUBKwLpKscFk|;3R@3+BaDHqRO&@-NY>SEqxR|?WcY|KZ&>ggWR4w zJnoXE(Us#*$duan#`xVXs(4ci0|YOKY1H;>YR%ae<%f!@Y;uvXUK;K|**=F!NvuRwGs^i%l6qJeJwtg+8@G-8y-8>VN&*fCW-8|8B zd+hyFU+?doGIPFz6K<#3w~P6}L8M^N~0Xb)T}v@iU1h z_YC6=%>!x8Em|cEqUuk=nw(b)k$ANbdcgmYBr0Hn3+YF_#>@q~e2XCe4tnW)%EDMd z`pv3`96~hqkBErQfqCC#7vGds(%J}jqfAaaX|0Sn1RZ1gg>u=8&4;P2B(br}`i-Bp zN(mN@TYl%yE4~g#{qI47A46sn^UvGUl@G+A(^}?TCpHx9N_R*MkJFmRpenrMGyqQd zgg|R%U1zhD1sfhYeOYj z;&E%Xo8Ko@VDEf3;>DZn+Q{61WSZSyBU3R2zaN_xlrZ@$8|2O`N>M8SOD=ELqpfVCEieha%f)|#v013ykxMXK#>qa7eoPcanlgBCyI9M5*$f$@;62>7(GO&)LwCeW6(;*fiK) z7BdtBF?+ii^SGMQa5amXo3b*d4hPa4oN&~Y&~i6`R#V2sumoxZA(k;oZ|Z3(S$jc8 zEGT(QY(R7ffj*+&X;gKYV2x`EDl1HJUUj)zGcxR*w;|;=UF*u1b%C=e=+FtgBMyX< z3-RCoyyxi~v0jmH)F7t7{fN|XnnS77>Zx77SQ*^g?BUe!S;_H7{wz6$>*+oig1Ky2 zVuMGKikiwY+)>c*_59s_^>YcUlLU%wkWeQUy_aFWC*W;HMWc1Epyd~IH}@%`u1MyB zfzOwkr~esKp5iFjk4x}Dey2ue4e_BE3ffKjWOIXe7$I}~9`2sn$Yy*ez%)`bu9G-h zS}yRLPKoB&<3b;z`}O%Vsr5c!h73^g*Iwp{s!M+q!EkI6BNF5s>;~?j9{0l2(NIrwqSZxP;->st55SO%R>3tgZ%gqc!2(ojeONQwqIg>9ZJ1d-$1&JP3@IcN+%q^C3#7^AM z+wL6U%gwYscgGI_!>slff{KRW>+9!MH~!-K&}U4O4o6nNZL>oNRekPM2o-pG?iR|h zQP^|AbUw?dba&|vKi;DuxtpE%%F9W_$O*#<$N1btOK#5eX`8 z7N+Mxcwy8TC$9hzqX5nsCsu6iS!M3Cq5f<0{aGy*TE>bIiPh1&%}oqL(V;>m?BPt3 zi^aP|xAub_MVqPCl(TaB0|1Jm-EX%3l0@dujqq|gy-lHzvyQ~{R`v4WWVNx=NvwQ7j(@EiFGeYq?KV>j2c__;psubV6z7 zQFJ9%2`6bN^BaSEi)pS)Ta$_SNzD%RJj;XGRRwM`UREDtvJ&S4>tm+NqNhH3WKpVy z;ZhShMJaH^P*5;l;gF9OoFXvA@Q&d$QfwAZI4g2kH8t?;ePq_I`LkHmK2fZQWj)id zR}m+Rkdx7syMeD8Qx8Zq6WK-MmMP=AD#u8)%sZLg6;7b;J5u9gquErBOE9~MC$)UM z@+DgNcv*6Cp?!ih*&P(oVA*S8pt!o#6>3S|8~;gJ`A6HejbaV%tZv?1)=A|GNAreN z#fMDuU**S2Q{;24j?Neh-gXC^dG$=Z({@tSKGAdRUlJk_=4=4nLgZhwl%YRShBT0> z(IijG3|23vVE1+TXCjI3TR9~SSn8~%)D~;EaDN%N+5ON@&qtb+k)0ncG<%%Qaj@E= z_}nsmBhO*;kPz9LoP3X)K4x{0KQHT75iH7UbcB<~R9K!QI z4>o6c;k-kBwS3@z-J`40kU}r(o2SIVTVoYVZ3%g_Fl2aiOvAVwdsF$s7`aF6%r})lQ5>M&8r?I#Qh5SU(eZv4bf$ zQ2IhOh3R#|aX=(5$-b|e8++*l6oYy`@)T=sOp2H(pXVEW{3XOo71APw%4~bz#}M&r zSY`;Xy^83&f6*u!#9sslsc(hJk?Ymf-(mlC^%dl=u6`*dofSRgl;N6~aTVSLX;GZK zWTsrBgQ4S`eZayfXM&#;oGJEyCo?)~L@$ZLq0e43N94{k##SWK2D6g45J ztDODTz{&X}bE>ZHnj6cnhyo`)*3nOr@E@%zj8neHLy<(!TkbuAsUKkD;uZ_Rml~IpD66ypSE|^Q(>-M2!xz1J61-3}9KNzrL((opTc;5=neRFF#@OEL zQnhbh=<8`$Z=JZcwwrS#oGfCrCs#+&!v=yI=^a+dIpE;dVSpt7+wBJOW zU!VTIAunR`^&ztd3=``%N%@$Rr`P1{#ki>C?1~j_Fl5X#Tiq~em1Y$8$R>(>y^?NK z0)~}rqk}z?iijg5#dE~!79e7Yt8n4;$r)5K!4#tB-&I9!GH#28@nM<~#Iqxn{vMi? zy`e+bI?{W&Y`i41Yz#uCL*<^94Vn`0Sgfb;(8>FdIcu^o zLQLkd47|4WN_<*&X@!|3AeXIYNjHtcU|LxA66!-Q`!;vuF!e)N$Tg;qV2%b}lo-bV zp4_vnE#~y4TyCl!9OMBr9p;)**tPF-*H(M4`AmBpX5}!xV9HVQyqwD1_n^-TzqUHa zh<570I2m4Gq(^DLsf%!VWxl5||FcDcv<$qrjhCHF2QD+-1w-E9qq)B2#-wEM)nua9L`sr-~J|C)5Ntrsy2BkJ(@M&>j93=Vv>AH`CQt< zi14Hgd?Xj-(0^#D^krG#Y^Ky3shx=qJ28uK+lbYVg((ZREr@f)q7jL4-NZgBn6w6r zb7fq%R8A|QJf;FX_^nvJziQ-!{0j16xEBu=g9qglvnrB>@;H9}8FX!XPPg2MdOME@ z__L@DEiv{@#WQ%31G1}9NwH>rEpk;j>e9~=J@32TK2WELr8+Q|17>U$FjPFej zL!pgY-#I?wT?z^d!5$rp3;oHfyYEjGD>Ejy?VM8ETvLK(*zU_vX5+D^^Y%EaLJm zjggl783LxzOUelAU5Ag|s-!U=z2VI`bJjyXz}6L^?Kyps{2KQ9N4o^3*%h?lNA)~# zhX3M8jj1ube#Q!O+eF{I9fmJ2icvJ;r6w*5IiGG>)5)c*55Rm*?#r(hHN2(iV?>Hi zbSRrTur&c?k$KjBfKi8&^7UBX#WCX|t3n;5Y(fc-@s=;OFnhb>g}d#CY!E*JzqV3l zZq*_30tSI_{!hGQXm{Bka-lya3K!Og_CjRnhyd}i6BTAGGGTVnW9uYYXG%Mk)v;X< z4{xF4dI#Xq?eb>2c64-meCihY>AKO%{@H%6FnOTjrSWom`|;`NhxSUne$7&QZIDZ_)xO+G#^zezLwxax#AgqyukF}sapGHv1;Gmu48fDp5sNpCa1=O zTC0O)n9=ZxlXuNt%|m)L(SB~cl5UAk-wIA@sMBk?+s4O_7@3~hOE2fXOkQ@yZP;*L&sw!O;qmFA1p*q;;ehVtRo((CC(SSkK<5x3Geos^oJv>KFm zM|;8`%m6EFfD7_Gfr46D`Ut;_$DjQ(EM?hx5UN)TnfULmO&>`ig=e|ojp4)%B-+CY zXF=+QcPclwpf-SmGrM^}D+j)3U88*mEuuqVBygL##?ECTSRd$!jd`8a_e48B5->#hm zbPj)Ty!-(*Pwzq?SqSAQDqnhYmDi&X+J*`b-W)FtCBES! zf{6m?iy2d4WvSKnzOn96s0SnYe0D`=Egt-gW4APH=+5|uK8J#j@J>J;?+3lw{9)b~ ze`Rs|nDP>k3#D4rs@5M}a7p+7z9TPUd0kHvj4D}#-?4^(Lw{6?{MS37aCT7_*fW>DVin?Nv&rSCupsQ#i8{)-hOCqRac`PiqHvMiC7 zvgc>DWa6hYpJ2HX`?!YUNU zmHco$I^A6Km}ms}MyFq5_K8f&JOdILs!qF%l;O%cyK2k4Rum0ClV~$c zO)L2SZHVq-U&Y))=auq+k+>#j7(HI)mOIYhLB!*Pg9839n$n3Dmy#g#tNT5x?!wJm zk+<0$%{i#m$H%g|Kl;#yvyiYYPc1JP!v&fMewWXs^RJHCU7jC~pKMZnvE_ag>_Gc7 zcgeQ&6ujZ<<@dB(-m`u0=QDKi;j8wNUjMuUe$<5j%c?G`xJzTh=z3sBitU)C3cCsG zkk%MLQ0SeRO20kqW-CMdChJg*?57fDMB%oxxD4+b)a_DQ%|F)+JwEadOOlRwU5all z9+bW#^skZe*p{w_oP*Xvc2yixW!lZ_kRe8V2;ouxBj z;|VM4bt-x~aIUq8oz8H5#?TGF74#BVU23id*6|h7|E#!(yD{20kpTkOM`bv{etuMp zk@ST^@v{TqiP((GaMJRvmx>E27WS=;axQfroB7Lm}-(P8d1ap3vsEL z9D@)zP6_xb6RT!6O`|^WzNR*wY12F>caFP>c#S(q)v)M&^qRt^o=yT2_?n90DX<1G z(YYxpjjYnMO=tjLvoKhYo91A*ts?XNnvdJ^GrjU1L#nHa!m(Mpv@n&A-?H-zAa~9u zRGdrqDH$jDj3idWlY}M18%8S4R3+EmC2asdeS|bK^Sj7|Uosj=({D3S(oV8n9sX;u z%E*ylrh6c&XlBKm>o~hM*F~GQ5#(CnxB4Quu&3IkZCbYe`d4bjwX>wsVT%m^nrrX< zn`?-Co9Wf^kPM6IwU<^Qabd|I)2fo}!mKQlxV>j+)Az{IMr)=Xg|0`2(EFi!<^vj? zA~)N^+}RvA?enMWi8=)ScgE$Be1oqr44veci}*R88U#dsSReh+ zGp0wxno=HgRKQ57WBTNkUsn-;K-jN}(9@R1#nRaMlbNckvz5Jt%RfsM^7o}j=< zAd;T?0Anbvoa=IO2-~_@XW8S?TUOe9fA8XBm`kv#p4iR>MYW#pm|=-e+WR`?=zPZUHaCcN&z!h7^~F_aRKaeIXHrH;Z&{!;(ajJvvQ!Gw7u_qdzP5~UoV0KFAdPkwPlwg5q?Fw6WEjr7*FOo%1s z?NI7^MWO3kO(5SYBdgbn{c`MX|DY_lX2E`Vh?B&=ibQt*NcpD~I|@%6oB*84fQ{gC z88_1YAl$gYa2om>6m&&geweOhUWqaj~2vsc?=Q%SPt0q9-LRz8~iTj4SO_8cm_;a@J6 zDQVEk+L>PKf9^KFz3i;m&{(fSM??}0;p%mlM|B61=Z$ia&I!VZo=HzcfIC8b^t9Fh zeU|_c(ZFw~XWz1S4@q0U5?Y@5m+YX`+QHa|@acx)E*9x?$t&P7-X?(7Jwciz*uQOm zCUJ?|1)*qv(|G>2##=xI=>!*2c%y&Mx9Mv|e%NL?-J2o({j^`tLfL9#$=hv@yLk(h|RUH;jYgJr!0#Xu*R;xN4vgi04AO=sA0%U12)s&3!}S{|6n z?SLd?)|Rnb+qkw(y88Ue=`qR@`5|JwkC)Vbdj8ajE>6bkZ853%^J_HJ8M7D{Fcf65 zt3qs^^@q*g#c;D7FItns;gT@b$*7HHO2I3@f}bLKtQ%qIQo5`_fhJN%A!(G}q%2HB zI!Gf#dpc~^j-)NSD?BF^+dpl+&DW}|X#aRAos8M5X$CQspOt@6G~({}E*k-8(f91> zT8X-RBOU7#DKC07Jex6HdSCA|z-N8cYn}dxECo!LkU%sa_MZ>x95#m^W6v;+Lanex z!tlC96=hLp_{NWxH$%%pO&-+;>vCvzKP^as4RvzTeukXK+iZ7`J4$8+7*5r^J08T7 zVSBKCBU*f7RK#Q!ip(uVlWrSSp*v(fC+(B2ho(@ve2f4l^>-Jukg44PQ1)w|Zq8Rw zs*DxoMD?DSB2cQfMS?E;wJJKu&wZb>q(=uRQ^y?=4X*?aZ=_XeFElx66&N}si^GF4 z_1)}YF_?HAz=H&B9v5MUpKj_=#t(|!ED&LS`pz#G4I^I@bX--yz!HS;ozp_?f7l_@ z?M}p$)6waoC+F5Ru}0JehOlL>v&K~@XnyK-i+kR|ybI^=E!T5a56|qLD2vzdsAz}2 z^?2Xa{9E%()}z=oVb}f2R_0f|Wb~C~WYM^g!O+WlT13J0QRX=|}pn_m1-D^_VgqPTCNb*>eVqe)xSk zwSA?E6*!5|%-~w z+x6Qof3^mzJ$&wJe8^gyNlAj`Ro?enTDY_7FUTf#iSkn>09gQ?l-Vx+?iQ&}XvS-T zj!()gIqG_~(>NXd^LP%5Jvvyx#n}TIDd^CQmjFwdET(}28A?xYd@_Lpi){%fhIQ-mf3W-Ch8|A{Hq{_gSRNTJ1{5Pp7w!71g7cABc2&7P-U%Rlq zbU}0Ielpn&*crZmlC7>6gx<0*eD|3)`U!6*`Z}FCU1295hO&`RSjy>w--oo=Yl17N zoiiwW;12;GHNmEZ)MIg1KWA1SU7w8xvY%z89YY$J^^F^>kxAqBr-@4M^6jtv=i0m1 zFR_tBBZhK{XrrtC^zhU!7VvvRN1ffAA1Yb-c{v$fk*A6*K5GR5O?uC0kN^?vniB<5 zD41&$oyQssWg0LrDbvG>{Tc5uG=T=TCjXc#-xWu#Hx#j(3wzbX`8e0^ZGx=};E-m& z5<;;>ewzj9Y1*akxGOKQMCt!2A+IEmO$*gWNUqK+5_t#RE)nl0hp6!$iP{nwcLjUb zf_IL<;f+AL#1|(_+pWZ}qm5bF1ulH=dM~J^M^FVsz0UFgw)ZaWoK%xhKozFc$NiM) zwG^Y6cn$PJ1B_!z*i)+OpK2n)j!-z$*%?&lWYqBG(5g#9q%d1y+OiEZe%~`!Ozpb( zD=h%s1uX9LQ6mT(;cr0B;M?~aR{Esk(FkaI=PnK!eOPe?R863!zXJEaL3hlr zptlfeJ#@l<1=N3o`XS1C;$Rm@vYs$l4>GSuM)==M_TSI`0>RQ3fyz^WJStE@eDvv| xh7@0?AO+z+CJuRxC|*E;Kp6kRUu)fvEqy_76y%-32XH>b*nk>NLi-=U{{ZkyIOYHV diff --git a/scripts/kafka_consumer_iterate_1LL09_v2.py b/scripts/kafka_consumer_iterate_1LL09_v2.py index e5b5d1c..446df37 100644 --- a/scripts/kafka_consumer_iterate_1LL09_v2.py +++ b/scripts/kafka_consumer_iterate_1LL09_v2.py @@ -62,7 +62,10 @@ print(f'Sample: {sample}') -def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ): +def print_kafka_messages(beamline_acronym, + kafka_process=kafka_process, + qserver_process=qserver_process, + RM=RM, ): """Print kafka message from beamline_acronym @@ -73,6 +76,9 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ): RM (REManagerAPI, optional): Run Engine Manager API. Defaults to RM. """ + kin = kafka_process.inputs + qin = qserver_process.inputs + print(f"Listening for Kafka messages for {beamline_acronym}") print(f'Defaul parameters:\n' f' csv path: {kin.csv_path[0]}\n' @@ -187,21 +193,28 @@ def print_message(consumer, doctype, doc, kin.stream_list.append(stream_name) -////////////////////////////////////////////////////////////////////////////////////////////////// ## Set good/bad data condictions to the corresponding sample kh = kin.key_height[0] hei = kin.height[0] dis = kin.distance[0] ## obtain phase fraction & particle size from g(r) - if 'scattering' in stream_list: - if fitting_pdf: - phase_fraction, particel_size = pc._pdffit2_CsPbX3(gr_data, cif_list, qmax=20, qdamp=0.031, qbroad=0.032, fix_APD=False, toler=0.001) - pdf_property={'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]} + if 'scattering' in kin.stream_list: + + ## macro_07: do pdf fitting and update kin.gr_fitting + if kin.fitting_pdf[0]: + kafka_process.macro_07_fitting_pdf( + kin.gr_fn[0], beamline_acronym, + rmax=100.0, qmax=12.0, qdamp=0.031, qbroad=0.032, + fix_APD=True, toler=0.01 + ) + + ## macro_08: not do pdf fitting but also update kin.gr_fitting else: - pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan} + kafka_process.macro_08_no_fitting_pdf() + ## remove 'scattering' from stream_list to avoid redundant work in next for loop - stream_list.remove('scattering') + kin.stream_list.remove('scattering') ## Export, plotting, fitting, calculate # of good/bad data, add queue item for stream_name in stream_list: diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index ab201e8..9d0b6d9 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -47,15 +47,24 @@ qin = qserver_process.inputs ## Input varaibales for Kafka, reading from xlsx_fn by given sheet name -kafka_process = LK.xlsx_to_inputs(LK._kafka_process(), xlsx_fn=xlsx_fn, sheet_name='kafka_process') +kafka_process = LK.xlsx_to_inputs(LK._kafka_inputs(), xlsx_fn=xlsx_fn, sheet_name='kafka_process') kin = kafka_process.inputs ## Define RE Manager API as RM RM = REManagerAPI(zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0]) +## Make the first prediction from kafka_process.agent +first_points = kafka_process.macro_agent(qserver_process, RM, check_target=True) +rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio) +if kin.post_dilute[0]: + rate_list.append(sum(rate_list)*kin.post_dilute[1]) + +qin.infuse_rates = rate_list + + ## Import Qserver parameters to RE Manager import _synthesis_queue_RM as sq -sq.synthesis_queue_xlsx(qin) +sq.synthesis_queue_xlsx(qserver_process) ## Auto name samples by prefix if qin.name_by_prefix[0]: @@ -69,10 +78,12 @@ def print_kafka_messages(beamline_acronym_01, qserver_process=qserver_process, RM=RM, ): + """Print kafka message from beamline_acronym Args: - beamline_acronym (str): subscribed topics for data publishing (ex: xpd, xpd-analysis, xpd-ldrd20-31) + beamline_acronym_01 (str): subscribed topics for raw data publishing (ex: xpd, xpd-ldrd20-31) + beamline_acronym_02 (str): subscribed topics for analysis data publishing (ex: xpd-analysis) kafka_process (_LDRD_Kafka.xlsx_to_inputs, optional): kafka parameters read from xlsx. Defaults to kafka_process. qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. Defaults to qserver_process. RM (REManagerAPI, optional): Run Engine Manager API. Defaults to RM. @@ -110,7 +121,8 @@ def print_kafka_messages(beamline_acronym_01, f'{qin.zmq_control_addr[0] = }') ## Assignt raw data tiled clients - kin.tiled_client.append = from_profile(beamline_acronym_01) + kin.beamline_acronym.append(beamline_acronym_01) + kafka_process.tiled_client = from_profile(beamline_acronym_01) ## 'xpd-analysis' is not a catalog name so can't be accessed in databroker ## Append good/bad data folder to csv_path @@ -123,9 +135,7 @@ def print_kafka_messages(beamline_acronym_01, pass - def print_message(consumer, doctype, doc, - bad_data = [], good_data = [], check_abs365 = False, finished = [], - agent_iteration = []): + def print_message(consumer, doctype, doc, check_abs365 = False, agent_iteration = []): name, message = doc # print(f"contents: {pprint.pformat(message)}\n") @@ -167,10 +177,10 @@ def print_message(consumer, doctype, doc, if 'sample_type' in message.keys(): print(f"sample type: {message['sample_type']}") - ## Reset kin.uid to an empty list - kin.uid = [] + ## Reset kafka_process.uid to an empty list + kafka_process.uid = [] - ## macro_01 + ## macro_01: get iq from sandbox ######### While document (name == 'event') and ('topic' in doc[1]) ########## ## key 'topic' is added into the doc of xpd-analysis in pdfstream ## ## Read uid of analysis data from doc[1]['data']['chi_I'] ## @@ -197,14 +207,14 @@ def print_message(consumer, doctype, doc, ) ## wait 1 second for databroker to save data time.sleep(1) - ## Reset kin.uid to an empty list - kin.uid = [] + ## Reset kafka_process.uid to an empty list + kafka_process.uid = [] - ## macro_02 + ## macro_02: get raw data uid from metadata of sandbox doc #### (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0) #### ## With taking xray_uvvis_plan and analysis of pdfstream finished ## - ## Sleep 1 second and assign uid, stream_list from kin.entry[-1] ## + ## Sleep 1 second and assign uid, stream_list from kafka_process.entry[-1] ## ## No need to stop queue since the net queue task is wahsing loop ## ##################################################################################### elif (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0): @@ -214,7 +224,7 @@ def print_message(consumer, doctype, doc, kafka_process.macro_02_get_uid() - ## macro_03 + ## macro_03: stop queue and get uid for take_a_uvvis scan ######### (name == 'stop') and ('take_a_uvvis' in message['num_events']) ########## ## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream ## ## Stop queue first for identify good/bad data ## @@ -229,436 +239,175 @@ def print_message(consumer, doctype, doc, kafka_process.macro_03_stop_queue_uid(RM) - ################## (name == 'stop') and (type(kin.uid) is str) #################### + ############## (name == 'stop') and (type(kafka_process.uid) is str) ############## ## ## ## When uid is assigned and type is a string, move to data fitting, calculation ## ## ## ##################################################################################### - if (name == 'stop') and (type(kin.uid) is str): - print(f'\n**** start to export uid: {kin.uid} ****\n') - print(f'\n**** with stream name in {kin.stream_list} ****\n') - - ## Set good/bad data condictions to the corresponding sample - kh = kin.key_height[0] - hei = kin.height[0] - dis = kin.distance[0] - - ## obtain phase fraction & particle size from g(r) - if 'scattering' in kin.stream_list: + if (name == 'stop') and (type(kafka_process.uid) is str): + print(f'\n**** start to export uid: {kafka_process.uid} ****\n') + print(f'\n**** with stream name in {kafka_process.stream_list} ****\n') + + + ## macro_04 ~ macro_07 or 08 + #################### 'scattering' in kafka_process.stream_list ################### + ## ## + ## Process X-ray scattering data (PDF): iq to gr, search & match, pdf fitting ## + ## obtain phase fraction & particle size from g(r) ## + ##################################################################################### + if 'scattering' in kafka_process.stream_list: # Get metadata from stream_name fluorescence for plotting - qepro_dic, metadata_dic = de.read_qepro_by_stream( - kin.uid, stream_name='fluorescence', data_agent='tiled', + kafka_process.qepro_dic, kafka_process.metadata_dic = de.read_qepro_by_stream( + kafka_process.uid, stream_name='fluorescence', data_agent='tiled', beamline_acronym=beamline_acronym_01) - u = plot_uvvis(qepro_dic, metadata_dic) + u = plot_uvvis(kafka_process.qepro_dic, kafka_process.metadata_dic) - ## macro_04 (dummy test, e.g., CsPbBr2) + ## macro_04: setting dummy pdf data for test, e.g., CsPbBr2) if kin.dummy_pdf[0]: kafka_process.macro_04_dummy_pdf() - ## macro_05 + ## macro_05: do i(q) to g(r) through pdfstream if kin.iq_to_gr[0]: kafka_process.macro_05_iq_to_gr(beamline_acronym_01) - ## macro_06 + ## macro_06: do search and match if kin.search_and_match[0]: # cif_fn = kafka_process.macro_06_search_and_match(kin.gr_fn[0]) - cif_fn = kafka_process.macro_06_search_and_match(kin.gr_data[0]) + cif_fn = kafka_process.macro_06_search_and_match(kafka_process.gr_data[0]) print(f'\n\n*** After matching, the most correlated strucuture is\n' f'*** {cif_fn} ***\n\n') - ## macro_07 + ## macro_07: do pdf fitting and update kafka_process.gr_fitting if kin.fitting_pdf[0]: kafka_process.macro_07_fitting_pdf( - kin.gr_data[0], beamline_acronym_01, + kafka_process.gr_data[0], beamline_acronym_01, rmax=100.0, qmax=12.0, qdamp=0.031, qbroad=0.032, fix_APD=True, toler=0.01 ) + ## macro_08: not do pdf fitting but also update kafka_process.gr_fitting else: kafka_process.macro_08_no_fitting_pdf() if kin.iq_to_gr[0]: - u.plot_iq_to_gr(kin.iq_data[2], kin.gr_data[1].to_numpy().T, gr_fit=kin.gr_fitting[2]) + u.plot_iq_to_gr(kafka_process.iq_data['array'], kafka_process.gr_data[1].to_numpy().T, gr_fit=kafka_process.gr_fitting['array']) ## remove 'scattering' from stream_list to avoid redundant work in next for loop - kin.stream_list.remove('scattering') + kafka_process.stream_list.remove('scattering') -////////////////////////////////////////////////////////////////////////////////////////// - ## Export, plotting, fitting, calculate # of good/bad data, add queue item - for stream_name in stream_list: - ## Read data from databroker and turn into dic - qepro_dic, metadata_dic = de.read_qepro_by_stream( - uid, stream_name=stream_name, data_agent='tiled', beamline_acronym=beamline_acronym_01) - sample_type = metadata_dic['sample_type'] - ## Save data in dic into .csv file - - if stream_name == 'take_a_uvvis': - saving_path = path_1 - else: - saving_path = path_0 - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name) - print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(saving_path)} **\n') + ######## Other stream names except 'scattering' in kafka_process.stream_list ##### + ## Process Uv-Vis data: Export, plotting, fitting, calculate # of good/bad data ## + ## add queue item, PLQY ## + ## Agent: save agent data locally & to sandbox, make optimization ## + ##################################################################################### + for stream_name in kafka_process.stream_list: + + ############################## macro_09 ~ macro_11 ############################### + ## read uv-vis data into dic and save data into .csv file ## + ## Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' ## + ## Avergae fluorescence scans in 'fluorescence' and idenfify good/bad ## + ##################################################################################### + + ## macro_09: read uv-vis data into dic and save data into .csv file + kafka_process.macro_09_qepro_dic(stream_name, beamline_acronym_01) + ## Plot data in dic - u = plot_uvvis(qepro_dic, metadata_dic) + u = plot_uvvis(kafka_process.qepro_dic, kafka_process.metadata_dic) if len(good_data)==0 and len(bad_data)==0: clear_fig=True else: clear_fig=False u.plot_data(clear_fig=clear_fig) - print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') + print(f'\n** Plot {stream_name} in uid: {kafka_process.uid[0:8]} complete **\n') - global abs_array, abs_array_offset, x0, y0 - ## Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' + + ## macro_10_good_bad: Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' if qepro_dic['QEPro_spectrum_type'][0]==2 and stream_name=='take_a_uvvis': print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') - x0, y0, data_id, peak, prop = da._identify_one_in_kafka(qepro_dic, metadata_dic, key_height=kh, distance=dis, height=hei, dummy_test=dummy_test) + kafka_process.macro_10_good_bad(stream_name) - ## Apply an offset to zero baseline of absorption spectra + ## macro_11_absorbance: Apply an offset to zero baseline of absorption spectra elif stream_name == 'absorbance': print(f'\n*** start to filter absorbance within 15%-85% due to PF oil phase***\n') - ## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85] - abs_per = da.percentile_abs(qepro_dic['QEPro_x_axis'], qepro_dic['QEPro_output'], percent_range=[15, 85]) - - print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') - # abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) - abs_array = abs_per.mean(axis=0) - wavelength = qepro_dic['QEPro_x_axis'][0] - - popt_abs01, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[205, 240], plot=False) - popt_abs02, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False) - if abs(popt_abs01[0]) >= abs(popt_abs02[0]): - popt_abs = popt_abs02 - elif abs(popt_abs01[0]) <= abs(popt_abs02[0]): - popt_abs = popt_abs01 - - abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs) + kafka_process.macro_11_absorbance(stream_name) - print(f'\nFitting function for baseline offset: {da.line_2D}\n') - ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs, 'percentile_mean': abs_array} - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff_abs) - u.plot_offfset(wavelength, da.line_2D, popt_abs) + u.plot_offfset(kafka_process.abs_data['wavelength'], kafka_process.abs_fitting['fit_function'], kafka_process.abs_fitting['curve_fit']) print(f'\n** export offset results of absorption spectra complete**\n') - ## Avergae scans in 'fluorescence' and idenfify good/bad + ## macro_10_good_bad: Avergae scans in 'fluorescence' and idenfify good/bad elif stream_name == 'fluorescence': print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') - ## Apply percnetile filtering for PL spectra, defaut percent_range = [30, 100] - x0, y0, data_id, peak, prop = da._identify_multi_in_kafka(qepro_dic, metadata_dic, - key_height=kh, distance=dis, height=hei, - dummy_test=dummy_test, percent_range=[30, 100]) - label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' - # u.plot_average_good(x0, y0, color=cmap(color_idx[sub_idx]), label=label_uid) - # sub_idx = sample.index(metadata_dic['sample_type']) - u.plot_average_good(x0, y0, label=label_uid, clf_limit=9) - - global f_fit + kafka_process.macro_10_good_bad(stream_name) + + label_uid = f'{kafka_process.uid[0:8]}_{kafka_process.metadata_dic["sample_type"]}' + u.plot_average_good(kafka_process.PL_goodbad['x0'], kafka_process.PL_goodbad['y0'], label=label_uid, clf_limit=9) + + ## Skip peak fitting if qepro type is absorbance if qepro_dic['QEPro_spectrum_type'][0] == 3: - print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {kafka_process.uid[:8]} ***\n") + ## macro_12 ~ macro_16 else: - ## for a good data, type(peak) will be a np.array and type(prop) will be a dic - ## fit the good data, export/plotting fitting results - ## append data_id into good_data or bad_data for calculate numbers - if (type(peak) is np.ndarray) and (type(prop) is dict): - x, y, p, f_fit, popt = da._fitting_in_kafka(x0, y0, data_id, peak, prop, dummy_test=dummy_test) - - fitted_y = f_fit(x, *popt) - r2_idx1, _ = da.find_nearest(x, popt[1] - 3*popt[2]) - r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2]) - r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0) - - metadata_dic["r_2"] = r_2 + ############## (type(peak) is np.ndarray) and (type(prop) is dict) ################ + ## for a good data, type(peak) will be a np.array and type(prop) will be a dict ## + ## fit the good data, export/plotting fitting results ## + ## append data_id into good_data or bad_data for calculate numbers ## + ##################################################################################### + type_peak = type(kafka_process.PL_goodbad['peak']) + type_prop = type(kafka_process.PL_goodbad['prop']) + + ## When PL data is good + if (type_peak is np.ndarray) and (type_prop is dict): - if 'gauss' in f_fit.__name__: - constant = 2.355 - else: - constant = 1 - - intensity_list = [] - peak_list = [] - fwhm_list = [] - for i in range(int(len(popt)/3)): - intensity_list.append(popt[i*3+0]) - peak_list.append(popt[i*3+1]) - fwhm_list.append(popt[i*3+2]*constant) - - peak_emission_id = np.argmax(np.asarray(intensity_list)) - peak_emission = peak_list[peak_emission_id] - fwhm = fwhm_list[peak_emission_id] - ff={'fit_function': f_fit, 'curve_fit': popt, 'percentile_mean': y0} + ## macro_12_PL_fitting: do peak fitting with gaussian distribution of PL spectra + kafka_process.macro_12_PL_fitting() ## Calculate PLQY for fluorescence stream if (stream_name == 'fluorescence') and (PLQY[0]==1): - PL_integral_s = integrate.simpson(y) - label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' - u.plot_CsPbX3(x, y, peak_emission, label=label_uid, clf_limit=9) - - ## Find absorbance at 365 nm from absorbance stream - # q_dic, m_dic = de.read_qepro_by_stream(uid, stream_name='absorbance', data_agent='tiled') - # abs_array = q_dic['QEPro_output'][1:].mean(axis=0) - # wavelength = q_dic['QEPro_x_axis'][0] - idx1, _ = da.find_nearest(wavelength, PLQY[2]) - absorbance_s = abs_array_offset[idx1] - - if PLQY[1] == 'fluorescein': - plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) - else: - plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) - - - plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy} - - optical_property = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, - 'Peak': peak_emission, 'FWHM':fwhm, 'PLQY':plqy} - - - ## Creat agent_data in type of dict for exporting as json and wirte to sandbox - agent_data = {} - agent_data.update(optical_property) - agent_data.update(pdf_property) - agent_data.update({k:v for k, v in metadata_dic.items() if len(np.atleast_1d(v)) == 1}) - agent_data = de._exprot_rate_agent(metadata_dic, rate_label_dic, agent_data) - - ## Update absorbance offset and fluorescence fitting results inot agent_data - agent_data.update({'abs_offset':{'fit_function':ff_abs['fit_function'].__name__, 'popt':ff_abs['curve_fit'].tolist()}}) - agent_data.update({'PL_fitting':{'fit_function':ff['fit_function'].__name__, 'popt':ff['curve_fit'].tolist()}}) - - - if USE_AGENT_iterate: - - # print(f"\ntelling agent {agent_data}") - agent = build_agen(peak_target=peak_target, agent_data_path=agent_data_path, use_OAm=True) - - if len(agent.table) < 2: - acq_func = "qr" - else: - acq_func = "qei" - - new_points = agent.ask(acq_func, n=1) - - ## Get target of agent.ask() - agent_target = agent.objectives.summary['target'].tolist() - - ## Get mean and standard deviation of agent.ask() - res_values = [] - for i in new_points_label: - if i in new_points['points'].keys(): - res_values.append(new_points['points'][i][0]) - x_tensor = torch.tensor(res_values) - post = agent.posterior(x_tensor) - post_mean = post.mean.tolist()[0] - post_stddev = post.stddev.tolist()[0] - - ## apply np.exp for log-transform objectives - if_log = agent.objectives.summary['transform'] - for j in range(if_log.shape[0]): - if if_log[j] == 'log': - post_mean[j] = np.exp(post_mean[j]) - post_stddev[j] = np.exp(post_stddev[j]) - - ## Update target, mean, and standard deviation in agent_data - agent_data.update({'agent_target': agent_target}) - agent_data.update({'posterior_mean': post_mean}) - agent_data.update({'posterior_stddev': post_stddev}) - - - # peak_diff = peak_emission - peak_target - peak_diff = False - - # if (peak_diff <= 3) and (peak_diff >=-3): - if peak_diff: - print(f'\nTarget peak: {peak_target} nm vs. Current peak: {peak_emission} nm\n') - print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n') - - ### Stop all infusing pumps and wash loop - sq.wash_tube_queue2(pump_list, wash_tube, 'ul/min', - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr) - - inst1 = BInst("queue_stop") - RM.item_add(inst1, pos='front') - agent_iteration.append(False) - - else: - agent_iteration.append(True) - + ## macro_13_PLQY: calculate integral of PL peak, PLQY and update optical_property + kafka_process.macro_13_PLQY() + label_uid = f'{self.uid[0:8]}_{self.metadata_dic["sample_type"]}' + u.plot_CsPbX3(self.PL_fitting['wavelength'], self.PL_fitting['intensity'], + self.PL_fitting['peak_emission'], label=label_uid, clf_limit=9) else: - plqy_dic = None - optical_property = None - - ## Save fitting data - print(f'\nFitting function: {f_fit}\n') - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff, plqy_dic=plqy_dic) - print(f'\n** export fitting results complete**\n') + self.plqy_dic ={} + self.optical_property = {} ## Plot fitting data - u.plot_peak_fit(x, y, f_fit, popt, peak=p, fill_between=True) + u.plot_peak_fit(self.PL_fitting['wavelength'], + self.PL_fitting['intensity'], + self.PL_fitting['fit_function'], + self.PL_fitting['curve_fit'], + peak=self.PL_fitting['shifted_peak_idx'], + fill_between=True) print(f'\n** plot fitting results complete**\n') - print(f'{peak = }') - print(f'{prop = }') - - ## Append good/bad idetified results - if stream_name == 'take_a_uvvis': - good_data.append(data_id) - - elif (type(peak) == list) and (prop == []): - bad_data.append(data_id) - print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") - print(f"\n*** since {stream_name} in uid: {uid[:8]} is a bad data.***\n") - - print('\n*** export, identify good/bad, fitting complete ***\n') + + ## macro_14_agent_data: Creat agent_data in type of dict for exporting as json and wirte to sandbox + kafka_process.macro_14_upate_agent() + + ## macro_15_save_data: Save processed data and agent data + kafka_process.macro_15_save_data(stream_name) - try : - print(f"\n*** {sample_type} of uid: {uid[:8]} has: ***\n" - f"{optical_property = }***\n" - f"{pdf_property = }***\n") - except (UnboundLocalError): - pass - - global sandbox_uid - ## Save processed data in df and agent_data as metadta in sandbox - if write_to_sandbox and (stream_name == 'fluorescence'): - df = pd.DataFrame() - df['wavelength_nm'] = x0 - df['absorbance_mean'] = abs_array - df['absorbance_offset'] = abs_array_offset - df['fluorescence_mean'] = y0 - df['fluorescence_fitting'] = f_fit(x0, *popt) - - ## use pd.concat to add various length data together - df_new = pd.concat([df, iq_df, gr_df, gr_fit_df], ignore_index=False, axis=1) - - # entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data) - entry = sandbox_tiled_client.write_dataframe(df_new, metadata=agent_data) - # uri = sandbox_tiled_client.values()[-1].uri - uri = entry.uri - sandbox_uid = uri.split('/')[-1] - agent_data.update({'sandbox_uid': sandbox_uid}) - print(f"\nwrote to Tiled sandbox uid: {sandbox_uid}") - - ## Save agent_data locally - if write_agent_data and (stream_name == 'fluorescence'): - # agent_data.update({'sandbox_uid': sandbox_uid}) - with open(f"{agent_data_path}/{data_id}.json", "w") as f: - json.dump(agent_data, f, indent=2) - - print(f"\nwrote to {agent_data_path}\n") - - print(f'*** Accumulated num of good data: {len(good_data)} ***\n') - print(f'good_data = {good_data}\n') - print(f'*** Accumulated num of bad data: {len(bad_data)} ***\n') + ## macro_16_num_good: Add self.PL_goodbad['data_id'] into self.good_data or self.bad_data + kafka_process.macro_16_num_good(stream_name) + + + print(f'*** Accumulated num of good data: {len(self.good_data)} ***\n') + print(f'{self.good_data = }\n') + print(f'*** Accumulated num of bad data: {len(self.bad_data)} ***\n') print('########### Events printing division ############\n') - - ## Depend on # of good/bad data, add items into queue item or stop - if stream_name == 'take_a_uvvis' and use_good_bad: - if len(bad_data) > 3: - print('*** qsever aborted due to too many bad scans, please check setup ***\n') - - ### Stop all infusing pumps and wash loop - sq.wash_tube_queue2(pump_list, wash_tube, 'ul/min', - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr) - - RM.queue_stop() - - elif len(good_data) <= 2 and use_good_bad: - print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n') - - restplan = BPlan('sleep_sec_q', 5) - RM.item_add(restplan, pos=0) - - scanplan = BPlan('take_a_uvvis_csv_q', - sample_type=metadata_dic['sample_type'], - spectrum_type='Corrected Sample', - correction_type='Dark', - pump_list=pump_list, - precursor_list=precursor_list, - mixer=mixer) - RM.item_add(scanplan, pos=1) - RM.queue_start() - - elif len(good_data) > 2 and use_good_bad: - print('*** # of good data is enough so go to the next: bundle plan ***\n') - bad_data.clear() - good_data.clear() - finished.append(metadata_dic['sample_type']) - print(f'After event: good_data = {good_data}\n') - print(f'After event: finished sample = {finished}\n') - - RM.queue_start() - - ## Add predicted new points from ML agent into qserver - elif stream_name == 'fluorescence' and USE_AGENT_iterate and agent_iteration[-1]: - print('*** Add new points from agent to the fron of qsever ***\n') - print(f'*** New points from agent: {new_points} ***\n') - - if (post_dilute is True) and (fix_Br_ratio is False): - set_target_list = [0 for i in range(len(pump_list))] - rate_list = [] - for i in new_points_label: - if i in new_points['points'].keys(): - rate_list.append(new_points['points'][i][0]) - else: - pass - # rate_list.append(0) - # rate_list.insert(1, rate_list[0]*5) - rate_list.append(sum(rate_list)*5) - - elif (post_dilute is True) and (fix_Br_ratio is True): - set_target_list = [0 for i in range(len(pump_list))] - rate_list = [] - for i in new_points_label: - if i in new_points['points'].keys(): - rate_list.append(new_points['points'][i][0]) - else: - pass - # rate_list.append(0) - rate_list.insert(1, rate_list[0]*5) - rate_list.append(sum(rate_list)*5) - - else: - # set_target_list = [0 for i in range(new_points['points'].shape[1])] - set_target_list = [0 for i in range(len(pump_list))] - rate_list = new_points['points'] - - sample = de._auto_name_sample(rate_list, prefix=prefix[1:]) - - sq.synthesis_queue( - syringe_list=syringe_list, - pump_list=pump_list, - set_target_list=set_target_list, - target_vol_list=target_vol_list, - rate_list = rate_list, - syringe_mater_list=syringe_mater_list, - precursor_list=precursor_list, - mixer=mixer, - resident_t_ratio=resident_t_ratio, - prefix=prefix[1:], - sample=sample, - wash_tube=wash_tube, - name_by_prefix=bool(prefix[0]), - num_abs=num_uvvis[0], - num_flu=num_uvvis[1], - det1=num_uvvis[2], - det1_time=num_uvvis[3], - det1_frame_rate=num_uvvis[4], - is_iteration=True, - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr, - ) - - # RM.queue_start() - - # elif use_good_bad: - else: - print('*** Move to next reaction in Queue ***\n') - time.sleep(2) - # RM.queue_start() + + ################# macro_17_add_queue: Add queus task to qserver ################### + ## ## + ## Depend on # of good/bad data, add items into queue item or stop ## + ## 'take_a_uvvis' or new_points of self.agent ## + ##################################################################################### + kafka_process.macro_17_add_queue(stream_name, qserver_process, RM) kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")