diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 1cdbe95..9f5fba6 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -10,14 +10,15 @@ # import _synthesis_queue_RM as sq import _pdf_calculator as pc import _get_pdf as gp +from scipy import integrate -# import torch +import torch # from prepare_agent_pdf import build_agen from diffpy.pdfgetx import PDFConfig # from tiled.client import from_uri # from bluesky_queueserver_api.zmq import REManagerAPI -# from bluesky_queueserver_api import BPlan, BInst +from bluesky_queueserver_api import BPlan, BInst @@ -36,24 +37,38 @@ def _qserver_inputs(): return qserver_list -def _kafka_process(): - kafka_list=[ +def _kafka_inputs(): + inputs_list=[ 'dummy_kafka', 'csv_path', 'key_height', 'height', 'distance', 'PLQY', 'rate_label_dic_key', 'rate_label_dic_value', 'new_points_label', - 'use_good_bad', 'post_dilute', 'write_agent_data', 'agent_data_path', - 'USE_AGENT_iterate', 'peak_target', 'agent', + 'use_good_bad', 'post_dilute', 'fix_Br_ratio', 'write_agent_data', 'agent_data_path', + 'USE_AGENT_iterate', 'peak_target', 'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn', 'search_and_match', 'mystery_path', 'results_path', 'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn', - 'dummy_pdf', 'write_to_sandbox', 'sandbox_uri', - 'sandbox_tiled_client', 'tiled_client', + 'dummy_pdf', 'write_to_sandbox', 'sandbox_uri', 'beamline_acronym', 'fn_TBD', - 'entry', 'iq_data', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream', + ] + + return inputs_list + + + +def _kafka_process(): + process_list=[ + 'agent', 'sandbox_tiled_client', 'tiled_client', + 'entry', 'iq_data', 'stream_list', + 'uid', 'uid_catalog', 'uid_pdfstream', 'uid_sandbox', 'gr_data', 'pdf_property', 'gr_fitting', + 'qepro_dic', 'metadata_dic', 'sample_type', + 'PL_goodbad', 'PL_fitting', 'abs_data', 'abs_fitting', + 'plqy_dic', 'optical_property', 'agent_data', 'rate_label_dic', + 'good_data', 'bad_data', 'agent_iteration', 'finished', ] - return kafka_list + return process_list + @@ -76,24 +91,26 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): self.parameters_list = parameters_list self.from_xlsx = xlsx_fn self.sheet_name = sheet_name + + ## set attributes of keys in _kafka_process() for processing data + for key in _kafka_process(): + setattr(self, key, []) + + ## set attributes of keys in self.parameters_list for input parameters self.print_dic = de._read_input_xlsx(self.from_xlsx, sheet_name=self.sheet_name) - - ## Every attribute in self.inputs is a list!!! + ## Every attribute in self.inputs is default to a list!!! self.inputs = dic_to_inputs(self.print_dic, self.parameters_list) try: - ## Append agent in the list of self.inputs.agent - if self.inputs.agent==[]: - self.inputs.agent.append( - build_agen( - peak_target=self.inputs.peak_target[0], - agent_data_path=self.inputs.agent_data_path[0]) - ) + ## Assign Agent to self.agent + self.agent = build_agen( + peak_target=self.inputs.peak_target[0], + agent_data_path=self.inputs.agent_data_path[0]) ## self.inputs.sandbox_uri[0] is just the uri of sandbox - ## so, turn uri into client and assign it to self.inputs.sandbox_tiled_client + ## so, turn uri into client and assign it to self.sandbox_tiled_client if type(self.inputs.sandbox_uri[0]) is str: - self.inputs.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0]) + self.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0]) ## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn @@ -110,47 +127,170 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): for i in fn_glob: getattr(self.inputs, fn).append(i) + + ## Making rate_label_dic by rate_label_dic_key and rate_label_dic_value + self.rate_label_dic = {} + for key, value in zip(self.inputs.rate_label_dic_key, rate_label_dic_value): + self.rate_label_dic.update({key: value}) except AttributeError: pass + def auto_rate_list(self, pump_list, new_points, fix_Br_ratio): + """Auto transfer the predicted rates in new_points to a rate_list for qserver + + Args: + pump_list (list): pump list for qserver + new_points (dict): new_points predicted by self.agent + fix_Br_ratio (list): if fix ratio of CsPb/Br. If fixed, the ratio is fix_Br_ratio[1] + + Returns: + list: rate list for importing to qserver + """ + set_target_list = [0 for i in range(len(pump_list))] + rate_list = [] + for i in self.inputs.new_points_label: + if i in new_points['points'].keys(): + rate_list.append(new_points['points'][i][0]) + else: + pass + + if fix_Br_ratio[0]: + rate_list.insert(1, rate_list[0]*fix_Br_ratio[1]) + + return rate_list + + + + def macro_agent(self, qserver_process, RM, check_target=False): + """macro to build agent, make optimization, and update agent_data + + This macro will + 1. Build agent from agent_data_path = self.inputs.agent_data_path[0] + 2. Make optimization + 2. Update self.agent_data with target, predicted mean & standard deviation + self.agent_data['agent_target']: agent_target + self.agent_data['posterior_mean']: post_mean + self.agent_data['posterior_stddev']: post_stddev + 3. Check if meet target. If meet, wash loop; if not, keep iteration. + 4. Update self.agent_iteration + + Args: + qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. + RM (REManagerAPI): Run Engine Manager API. + check_target (bool, optional): Check if peak emission reaches peak target. Defaults to False. + + Returns: + dict: new_points predicted by self.agent + """ + + qin = qserver_process.inputs + peak_target = self.inputs.peak_target[0] + peak_tolerance = self.inputs.peak_target[1] + + self.agent = build_agen( + peak_target = peak_target, + agent_data_path = self.inputs.agent_data_path[0], + use_OAm = True) + + if len(self.agent.table) < 2: + acq_func = "qr" + else: + acq_func = "qei" + + new_points = self.agent.ask(acq_func, n=1) + + ## Get target of agent.ask() + agent_target = self.agent.objectives.summary['target'].tolist() + + ## Get mean and standard deviation of agent.ask() + res_values = [] + for i in self.inputs.new_points_label: + if i in new_points['points'].keys(): + res_values.append(new_points['points'][i][0]) + x_tensor = torch.tensor(res_values) + posterior = agent.posterior(x_tensor) + post_mean = posterior.mean.tolist()[0] + post_stddev = posterior.stddev.tolist()[0] + + ## apply np.exp for log-transform objectives + if_log = self.agent.objectives.summary['transform'] + for j in range(if_log.shape[0]): + if if_log[j] == 'log': + post_mean[j] = np.exp(post_mean[j]) + post_stddev[j] = np.exp(post_stddev[j]) + + ## Update target, mean, and standard deviation in agent_data + self.agent_data = {} + self.agent_data.update({'agent_target': agent_target}) + self.agent_data.update({'posterior_mean': post_mean}) + self.agent_data.update({'posterior_stddev': post_stddev}) + + peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target) + meet_target = (peak_diff <= peak_tolerance) + + if check_target and meet_target: + print(f'\nTarget peak: {self.inputs.peak_target[0]} nm vs. Current peak: {self.PL_fitting['peak_emission']} nm\n') + print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n') + + ### Stop all infusing pumps and wash loop + sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', + zmq_control_addr=qin.zmq_control_addr[0], + zmq_info_addr=qin.zmq_info_addr[0]) + + inst1 = BInst("queue_stop") + RM.item_add(inst1, pos='front') + self.agent_iteration.append(False) + + else: + self.agent_iteration.append(True) + + return new_points + + def macro_01_get_iq(self, iq_I_uid): """macro to get iq data, used in kafka consumer whiel taking xray_uvvis_plan and analysis of pdfstream finished This macro will - 0. Reset self.inputs.iq_data as an empty list - 1. Assgin sandbox entry to self.inputs.entry - 2. Append 4 elements into self.inputs.iq_data - self.inputs.iq_data[0]: chi_Q - self.inputs.iq_data[1]: chi_I - self.inputs.iq_data[2]: np.array([chi_Q, chi_I]) - self.inputs.iq_data[3]: pd.DataFrame([chi_Q, chi_I]) - 3. Reset self.inputs.uid to an empty list + 0. Reset self.iq_data as an empty {} + 1. Assgin sandbox entry to self.entry + 2. Update iq_data as a dict into self.iq_data + self.iq_data['Q']: chi_Q + self.iq_data['I']: chi_I + self.iq_data['array']: np.array([chi_Q, chi_I]) + self.iq_data['df']: pd.DataFrame([chi_Q, chi_I]) + 3. Reset self.uid to an empty list Args: iq_I_uid (str): uid of analysis data, read from doc[1]['data']['chi_I'] """ - self.inputs.uid_pdfstream.append(iq_I_uid) - self.inputs.entry = self.inputs.sandbox_tiled_client[iq_I_uid] - df = self.inputs.entry.read() - # Before appending I(Q) data, reset self.inputs.iq_data as an empty list - self.inputs.iq_data = [] - self.inputs.iq_data.append(df['chi_Q'].to_numpy()) - self.inputs.iq_data.append(df['chi_I'].to_numpy()) + self.uid_pdfstream.append(iq_I_uid) + self.entry = self.sandbox_tiled_client[iq_I_uid] + df = self.entry.read() + # Before updating I(Q) data, reset self.iq_data as an empty dict + self.iq_data = {} + # self.iq_data.append(df['chi_Q'].to_numpy()) + # self.iq_data.append(df['chi_I'].to_numpy()) iq_array = np.asarray([df['chi_Q'].to_numpy(), df['chi_I'].to_numpy()]) - self.inputs.iq_data.append(iq_array) + # self.iq_data.append(iq_array) iq_df = pd.DataFrame() iq_df['q'] = df['chi_Q'].to_numpy() iq_df['I(q)'] = df['chi_I'].to_numpy() - self.inputs.iq_data.append(iq_df) + # self.iq_data.append(iq_df) - ## Reset self.inputs.uid to an empty list - self.inputs.uid = [] + iq_data = { 'Q':df['chi_Q'].to_numpy(), + 'I':df['chi_I'].to_numpy(), + 'array':iq_array, + 'df':iq_df} + self.iq_data.update(iq_data) + + ## Reset self.uid to an empty list + self.uid = [] @@ -158,19 +298,19 @@ def macro_02_get_uid(self): """macro to get raw data uid, used in kafka consumer This macro will - 1. Assign raw data uid to self.inputs.uid - 2. Append raw data uid to self.inputs.uid_catalog - 3. Update self.inputs.stream_list + 1. Assign raw data uid to self.uid + 2. Append raw data uid to self.uid_catalog + 3. Update self.stream_list """ ## wait 1 second for databroker to save data time.sleep(1) - self.inputs.uid = self.inputs.entry.metadata['run_start'] - self.inputs.uid_catalog.append(self.inputs.uid) - stream_list = self.inputs.tiled_client[self.inputs.uid].metadata['summary']['stream_names'] - ## Reset self.inputs.stream_list to an empty list - self.inputs.stream_list = [] + self.uid = self.entry.metadata['run_start'] + self.uid_catalog.append(self.uid) + stream_list = self.tiled_client[self.uid].metadata['summary']['stream_names'] + ## Reset self.stream_list to an empty list + self.stream_list = [] for stream_name in syringe_list: - self.inputs.stream_list.append(stream_name) + self.stream_list.append(stream_name) @@ -180,24 +320,24 @@ def macro_03_stop_queue_uid(sefl, RM): This macro will 1. Stop queue - 2. Assign raw data uid to self.inputs.uid - 3. Append raw data uid to self.inputs.uid_catalog - 4. Update self.inputs.stream_list + 2. Assign raw data uid to self.uid + 3. Append raw data uid to self.uid_catalog + 4. Update self.stream_list Args: - RM (REManagerAPI): Run Engine Manager API. Defaults to RM. + RM (REManagerAPI): Run Engine Manager API. """ inst1 = BInst("queue_stop") RM.item_add(inst1, pos='front') ## wait 1 second for databroker to save data time.sleep(1) - self.inputs.uid = message['run_start'] - self.inputs.uid_catalog.append(self.inputs.uid) + self.uid = message['run_start'] + self.uid_catalog.append(self.uid) stream_list = list(message['num_events'].keys()) - ## Reset self.inputs.stream_list to an empty list - self.inputs.stream_list = [] + ## Reset self.stream_list to an empty list + self.stream_list = [] for stream_name in syringe_list: - self.inputs.stream_list.append(stream_name) + self.stream_list.append(stream_name) @@ -206,21 +346,27 @@ def macro_04_dummy_pdf(sefl): while self.inputs.dummy_pdf[0] is True This macro will - 0. Reset self.inputs.iq_data as an empty list - 1. Read pdf data from self.inputs.iq_fn[-1] - 2. Append 4 elements into self.inputs.iq_data - self.inputs.iq_data[0]: chi_Q - self.inputs.iq_data[1]: chi_I - self.inputs.iq_data[2]: np.array([chi_Q, chi_I]) - self.inputs.iq_data[3]: pd.DataFrame([chi_Q, chi_I]) + 0. Reset self.iq_data as an empty dict + 1. Read pdf data from self.iq_fn[-1] + 2. Update iq_data as a dict into self.iq_data + self.iq_data['Q']: iq_array[0] + self.iq_data['I']: iq_array[1] + self.iq_data['array']: iq_array + self.iq_data['df']: iq_df """ - self.inputs.iq_data = [] - iq_array = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T - self.inputs.iq_data.append(iq_array[0]) - self.inputs.iq_data.append(iq_array[1]) - self.inputs.iq_data.append(iq_array) - iq_df = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ') - self.inputs.iq_data.append(iq_df) + self.iq_data = {} + iq_array = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T + # self.iq_data.append(iq_array[0]) + # self.iq_data.append(iq_array[1]) + # self.iq_data.append(iq_array) + iq_df = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ') + # self.iq_data.append(iq_df) + + iq_data = { 'Q':iq_array[0], + 'I':iq_array[1], + 'array':iq_array, + 'df':iq_df} + self.iq_data.update(iq_data) @@ -233,15 +379,15 @@ def macro_05_iq_to_gr(self, beamline_acronym): 3. Read pdf background file from self.inputs.bkg_fn[-1] 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0] 5. Read saved g(r) into pd.DataFrame and save again to remove the headers - 6. Update g(r) data path and data frame to self.inputs.gr_data - self.inputs.gr_data[0]: gr_data (path) - self.inputs.gr_data[1]: gr_df + 6. Update g(r) data path and data frame to self.gr_data + self.gr_data[0]: gr_data (path) + self.gr_data[1]: gr_df Args: beamline_acronym (str): catalog name for tiled to access data """ # Grab metadat from stream_name = fluorescence for naming gr file - fn_uid = de._fn_generator(self.inputs.uid, beamline_acronym=beamline_acronym) + fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) gr_fn = f'{fn_uid}_scattering.gr' ### dummy test, e.g., CsPbBr2 @@ -252,7 +398,7 @@ def macro_05_iq_to_gr(self, beamline_acronym): pdfconfig = PDFConfig() pdfconfig.readConfig(self.inputs.cfg_fn[-1]) pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1] - sqfqgr_path = gp.transform_bkg(pdfconfig, self.inputs.iq_array[2], output_dir=self.inputs.iq_to_gr_path[0], + sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0], plot_setting={'marker':'.','color':'green'}, test=True, gr_fn=gr_fn) gr_data = sqfqgr_path['gr'] @@ -261,9 +407,9 @@ def macro_05_iq_to_gr(self, beamline_acronym): gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ') gr_df.to_csv(gr_data, index=False, header=False, sep =' ') - self.inputs.gr_data = [] - self.inputs.gr_data.append(gr_data) - self.inputs.gr_data.append(gr_df) + self.gr_data = [] + self.gr_data.append(gr_data) + self.gr_data.append(gr_df) @@ -272,8 +418,8 @@ def macro_06_search_and_match(self, gr_fn): using package Refinery from updated_pipeline_pdffit2.py Args: - gr_fn (str): g(r) data path for searching and matching, ex: self.inputs.gr_data[0] or self.inputs.gr_fn[0] - if using self.inputs.gr_data[0], g(r) is generated in workflow + gr_fn (str): g(r) data path for searching and matching, ex: self.gr_data[0] or self.inputs.gr_fn[0] + if using self.gr_data[0], g(r) is generated in workflow if using self.inputs.gr_fn[0], g(r) is directly read from a file Returns: @@ -316,16 +462,16 @@ def macro_07_fitting_pdf(self, gr_fn, beamline_acronym, 1. Do pdf refinement of gr_fn 2. Generate a filename for fitting data by using metadata of stream_name == fluorescence 3. Save fitting data - 4. Update self.inputs.pdf_property - 5. Update fitting data at self.inputs.gr_fitting - self.inputs.gr_fitting[0]: pf.getR() - self.inputs.gr_fitting[1]: pf.getpdf_fit() - self.inputs.gr_fitting[2]: np.array([pf.getR(), pf.getpdf_fit()]) - self.inputs.gr_fitting[3]: pd.DataFrame([pf.getR(), pf.getpdf_fit()]) + 4. Update self.pdf_property + 5. Update fitting data at self.gr_fitting + self.gr_fitting['R']: pf.getR() + self.gr_fitting['pdf_fit']: pf.getpdf_fit() + self.gr_fitting['array']: np.array([pf.getR(), pf.getpdf_fit()]) + self.gr_fitting['df']: pd.DataFrame([pf.getR(), pf.getpdf_fit()]) Args: - gr_fn (str): g(r) data path for pdf fitting, ex: self.inputs.gr_data[0] or self.inputs.gr_fn[0] - if using self.inputs.gr_data[0], g(r) is generated in workflow + gr_fn (str): g(r) data path for pdf fitting, ex: self.gr_data[0] or self.inputs.gr_fn[0] + if using self.gr_data[0], g(r) is generated in workflow if using self.inputs.gr_fn[0], g(r) is directly read from a file beamline_acronym (str): catalog name for tiled to access data @@ -346,43 +492,482 @@ def macro_07_fitting_pdf(self, gr_fn, beamline_acronym, pf.setphase(i+1) particel_size.append(pf.getvar(pf.spdiameter)) # Grab metadat from stream_name = fluorescence for naming gr file - fn_uid = de._fn_generator(self.inputs.uid, beamline_acronym=beamline_acronym) + fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) fgr_fn = os.path.join(self.inputs.fitting_pdf_path[0], f'{fn_uid}_scattering.fgr') pf.save_pdf(1, f'{fgr_fn}') - self.inputs.pdf_property = {} - self.inputs.pdf_property.update({'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]}) + self.pdf_property = {} + self.pdf_property.update({'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]}) gr_fit_arrary = np.asarray([pf.getR(), pf.getpdf_fit()]) gr_fit_df = pd.DataFrame() gr_fit_df['fit_r'] = pf.getR() gr_fit_df['fit_g(r)'] = pf.getpdf_fit() - self.inputs.gr_fitting = [] - self.inputs.gr_fitting.append(pf.getR()) - self.inputs.gr_fitting.append(pf.getpdf_fit()) - self.inputs.gr_fitting.append(gr_fit_arrary) - self.inputs.gr_fitting.append(gr_fit_df) + self.gr_fitting = {} + gr_fitting = { 'R':pf.getR(), + 'pdf_fit':pf.getpdf_fit(), + 'array': gr_fit_arrary, + 'df': gr_fit_df} + self.gr_fitting.update(gr_fitting) + # self.gr_fitting.append(pf.getR()) + # self.gr_fitting.append(pf.getpdf_fit()) + # self.gr_fitting.append(gr_fit_arrary) + # self.gr_fitting.append(gr_fit_df) def macro_08_no_fitting_pdf(self): - """macro to update self.inputs.gr_fitting while no pdf fitting, used in kafka consumer + """macro to update self.gr_fitting while no pdf fitting, used in kafka consumer + + This macro will + 1. Update fitting data at self.gr_fitting + self.gr_fitting['R']: [] + self.gr_fitting['pdf_fit']: [] + self.gr_fitting['array']: None + self.gr_fitting['df']: pd.DataFrame([np.nan, np.nan]) """ - self.inputs.gr_fitting = [] + self.gr_fitting = {} gr_fit_arrary = None gr_fit_df = pd.DataFrame() gr_fit_df['fit_r'] = np.nan gr_fit_df['fit_g(r)'] = np.nan + + gr_fitting = { 'R':[], + 'pdf_fit':[], + 'array': gr_fit_arrary, + 'df': gr_fit_df} + self.gr_fitting.update(gr_fitting) - self.inputs.gr_fitting.append([]) - self.inputs.gr_fitting.append([]) - self.inputs.gr_fitting.append(gr_fit_arrary) - self.inputs.gr_fitting.append(gr_fit_df) + # self.gr_fitting.append([]) + # self.gr_fitting.append([]) + # self.gr_fitting.append(gr_fit_arrary) + # self.gr_fitting.append(gr_fit_df) pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan} - self.inputs.pdf_property.update(pdf_property) + self.pdf_property.update(pdf_property) + + + + def macro_09_qepro_dic(self, stream_name, beamline_acronym): + """macro to read uv-vis data into dic and save data into .csv file + + This macro will + 1. Read Uv-Vis data according to stream name + 3. Save Uv-Vis data + 4. Update + self.qepro_dic + self.metadata_dic + 5. Reset self.PL_goodbad to an empty {} + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + beamline_acronym (str): catalog name for tiled to access data + """ + ## Read data from databroker and turn into dic + self.qepro_dic, self.metadata_dic = de.read_qepro_by_stream( + self.uid, + stream_name=stream_name, + data_agent='tiled', + beamline_acronym=beamline_acronym) + self.sample_type = self.metadata_dic['sample_type'] + + ## Save data in dic into .csv file + if stream_name == 'take_a_uvvis': + saving_path = self.inputs.csv_path[1] + else: + saving_path = self.inputs.csv_path[0] + de.dic_to_csv_for_stream(saving_path, self.qepro_dic, self.metadata_dic, stream_name=stream_name) + print(f'\n** export {stream_name} in uid: {self.uid[0:8]} to ../{os.path.basename(saving_path)} **\n') + + self.PL_goodbad = {} + + + + def macro_10_good_bad(self, stream_name): + """macro to identify good/bad data of fluorescence (PL) peak + + This macro will + 1. Identify a good or bad PL peak in 'take_a_uvvis' and 'fluorescence' + 2. Update self.PL_goodbad + self.PL_goodbad['wavelength']: wavelenght (nm) of PL + self.PL_goodbad['intensity']: intensity of PL + self.PL_goodbad['data_id']: f'{t0[0]}_{t0[1]}_{metadata_dic["uid"][:8]}' + self.PL_goodbad['peak']: peaks from scipy.find_peaks + self.PL_goodbad['prop']: properties from scipy.find_peaks + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + if self.qepro_dic['QEPro_spectrum_type'][0]==2 and stream_name=='take_a_uvvis': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + x0, y0, data_id, peak, prop = da._identify_one_in_kafka( + self.qepro_dic, + self.metadata_dic, + key_height=self.inputs.key_height[0], + distance=self.inputs.distance[0], + height=self.inputs.height[0], + dummy_test=self.inputs.dummy_kafka[0]) + + elif stream_name == 'fluorescence': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + ## Apply percnetile filtering for PL spectra, defaut percent_range = [30, 100] + x0, y0, data_id, peak, prop = da._identify_multi_in_kafka( + self.qepro_dic, + self.metadata_dic, + key_height=self.inputs.key_height[0], + distance=self.inputs..distance[0], + height=self.inputs.height[0], + dummy_test=self.inputs.dummy_kafka[0], + percent_range=[30, 100]) + + self.PL_goodbad = {} + PL_goodbad = { 'wavelength':np.asarray(x0), 'intensity':np.asarray(y0), + 'data_id':data_id, 'peak':peak, 'prop':prop} + self.PL_goodbad.update(PL_goodbad) + + + + def macro_11_absorbance(self, stream_name): + """macro to apply an offset to zero baseline of absorption spectra + + This macro will + 1. Apply a 2D line to fit the baseline of absorption spectra + 2. Update self.abs_data + self.abs_data['wavelength']: wavelenght of absorbance nm + self.abs_data['absorbance']: absorbance array (percentile_mean) + self.abs_data['offset']: absorbance array offset + 3. Update self.abs_fitting + self.abs_fitting['fit_function']: da.line_2D + self.abs_fitting['curve_fit']: popt_abs (popt from scipy.curve_fit) + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + ## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85] + abs_per = da.percentile_abs(self.qepro_dic['QEPro_x_axis'], + self.qepro_dic['QEPro_output'], + percent_range=[15, 85]) + + print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') + # abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) + abs_array = abs_per.mean(axis=0) + wavelength = qepro_dic['QEPro_x_axis'][0] + + popt_abs01, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[205, 240], plot=False) + popt_abs02, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False) + if abs(popt_abs01[0]) >= abs(popt_abs02[0]): + popt_abs = popt_abs02 + elif abs(popt_abs01[0]) <= abs(popt_abs02[0]): + popt_abs = popt_abs01 + + abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs) + print(f'\nFitting function for baseline offset: {da.line_2D}\n') + ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs} + + self.abs_data = {} + self.abs_data.update({'wavelength':wavelength, 'absorbance':abs_array, 'offset':abs_array_offset}) + + self.abs_fitting = {} + self.abs_fitting.update(ff_abs) + + de.dic_to_csv_for_stream(self.inputs.csv_path[0], + self.qepro_dic, self.metadata_dic, + stream_name=stream_name, + fitting=ff_abs) + + + def macro_12_PL_fitting(self): + """macro to do peak fitting with gaussian distribution of PL spectra + + This macro will + 1. Apply a 1-peak gaussian fitting to fit the peak of PL spectra + 2. Update self.PL_fitting + self.PL_fitting['fit_function']: da._1gauss + self.PL_fitting['curve_fit']: popt from scipy.curve_fit + self.PL_fitting['fwhm']: full width at half maximum of highest peak + self.PL_fitting['peak_emission']: highest peak position + self.PL_fitting['wavelength']: wavelenght (nm) of PL between 400 ~ 800 nm + self.PL_fitting['intensity']: intensity of PL (nm) between 400 ~ 800 nm + self.PL_fitting['shifted_peak_idx']: peak index between 400 ~ 800 nm + + """ + + x, y, p, f_fit, popt = da._fitting_in_kafka( + self.PL_goodbad['wavelength'], + self.PL_goodbad['intensity'], + self.PL_goodbad['data_id'], + self.PL_goodbad['peak'], + self.PL_goodbad['prop'], + dummy_test=self.inputs.dummy_kafka[0]) + + fitted_y = f_fit(x, *popt) + r2_idx1, _ = da.find_nearest(x, popt[1] - 3*popt[2]) + r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2]) + r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0) + + metadata_dic["r_2"] = r_2 + + if 'gauss' in f_fit.__name__: + constant = 2.355 + else: + constant = 1 + + intensity_list = [] + peak_list = [] + fwhm_list = [] + for i in range(int(len(popt)/3)): + intensity_list.append(popt[i*3+0]) + peak_list.append(popt[i*3+1]) + fwhm_list.append(popt[i*3+2]*constant) + + peak_emission_id = np.argmax(np.asarray(intensity_list)) + peak_emission = peak_list[peak_emission_id] + fwhm = fwhm_list[peak_emission_id] + + ff={'fit_function': f_fit, 'curve_fit': popt, + 'fwhm': fwhm, 'peak_emission': peak_emission, + 'wavelength': x, 'intensity': y, + 'shifted_peak_idx': p} + + self.PL_fitting = {} + self.PL_fitting.update(ff) + + + def macro_13_PLQY(self): + """macro to calculate integral of PL peak, PLQY and update optical_property + + This macro will + 1. Integrate PL peak by scipy.integrate.simpson + 2. Calculate PLQY based on the parameters in self.inputs.PLQY + 3. Update self.optical_property + self.optical_property['PL_integral']: PL peak integral + self.optical_property['Absorbance_365']: absorbance at 365 nm + self.optical_property['Peak']: highest peak position + self.optical_property['FWHM']: full width at half maximum of highest peak + self.optical_property['PLQY']: Photoluminescence Quantum Yield + + """ + + ## Integrate PL peak + x = self.PL_fitting['wavelength'] + y = self.PL_fitting['intensity'] + peak_emission = self.PL_fitting['peak_emission'] + PL_integral_s = integrate.simpson(y) + + ## Find absorbance at 365 nm from absorbance stream + idx1, _ = da.find_nearest(self.abs_data['wavelength'], self.inputs.PLQY[2]) + absorbance_s = self.abs_data['offset'][idx1] + + if self.inputs.PLQY[1] == 'fluorescein': + plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *self.inputs.PLQY[3:]) + else: + plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *self.inputs.PLQY[3:]) + + + plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy} + self.plqy_dic = {} + self.plqy_dic.update(plqy_dic) + + optical_property = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, + 'Peak':peak_emission, 'FWHM':fwhm, 'PLQY':plqy} + self.optical_property = {} + self.optical_property.update(optical_property) + + + + def macro_14_upate_agent(self): + """macro to update agent_data in type of dict for exporting as json and wirte to sandbox + + This macro will + 1. Update self.agent_data with + self.optical_property + self.pdf_property + self.metadata_dic + self.abs_fitting + self.PL_fitting + + """ + + ## Creat agent_data in type of dict for exporting as json and wirte to sandbox + if 'agent_target' in self.agent_data.keys(): + pass + else: + self.agent_data = {} + self.agent_data.update(self.optical_property) + self.agent_data.update(self.pdf_property) + self.agent_data.update({k:v for k, v in self.metadata_dic.items() if len(np.atleast_1d(v)) == 1}) + self.agent_data.update(de._exprot_rate_agent(self.metadata_dic, self.rate_label_dic, self.agent_data)) + + ## Update absorbance offset and fluorescence fitting results inot agent_data + ff_abs = self.abs_fitting + ff = self.PL_fitting + self.agent_data.update({'abs_offset':{'fit_function':ff_abs['fit_function'].__name__, 'popt':ff_abs['curve_fit'].tolist()}}) + self.agent_data.update({'PL_fitting':{'fit_function':ff['fit_function'].__name__, 'popt':ff['curve_fit'].tolist()}}) + + + + def macro_15_save_data(self, stream_name): + """macro to save processed data and agent data + + This macro will + 1. Save fitting data locally (self.inputs.csv_path[0]) + 2. Save processed data in df and agent_data as metadta in sandbox + 3. Save agent_data locally (self.inputs.agent_data_path[0]) + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + ## Save fitting data + print(f'\nFitting function: {self.PL_fitting['fit_function']}\n') + de.dic_to_csv_for_stream(self.inputs.csv_path[0], + self.qepro_dic, + self.metadata_dic, + stream_name = stream_name, + fitting = self.PL_fitting, + plqy_dic = self.plqy_dic) + print(f'\n** export fitting results complete**\n') + + ## Save processed data in df and agent_data as metadta in sandbox + if self.inputs.write_to_sandbox[0] and (stream_name == 'fluorescence'): + df = pd.DataFrame() + x0 = self.PL_goodbad['wavelength'] + df['wavelength_nm'] = x0 + df['absorbance_mean'] = self.abs_data['absorbance'] + df['absorbance_offset'] = self.abs_data['offset'] + df['fluorescence_mean'] = self.PL_goodbad['intensity'] + f_fit = self.PL_fitting['fit_function'] + popt = self.PL_fitting['curve_fit'] + df['fluorescence_fitting'] = f_fit(x0, *popt) + + ## use pd.concat to add various length data together + df_new = pd.concat([df, self.iq_data['df'], self.gr_data[1], self.gr_fitting['df']], ignore_index=False, axis=1) + + # entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data) + entry = self.sandbox_tiled_client.write_dataframe(df_new, metadata=self.agent_data) + # uri = sandbox_tiled_client.values()[-1].uri + uri = entry.uri + sandbox_uid = uri.split('/')[-1] + self.uid_sandbox.append(sandbox_uid) + self.agent_data.update({'sandbox_uid': sandbox_uid}) + print(f"\nwrote to Tiled sandbox uid: {sandbox_uid}") + + ## Save agent_data locally + if self.inputs.write_agent_data[0] and (stream_name == 'fluorescence'): + # agent_data.update({'sandbox_uid': sandbox_uid}) + with open(f"{self.inputs.agent_data_path[0]}/{self.PL_goodbad['data_id']}.json", "w") as f: + json.dump(agent_data, f, indent=2) + + print(f"\nwrote to {self.inputs.agent_data_path[0]}\n") + + + def macro_16_num_good(self, stream_name): + """macro to add self.PL_goodbad['data_id'] into self.good_data or self.bad_data + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + """ + + type_peak = type(kafka_process.PL_goodbad['peak']) + type_prop = type(kafka_process.PL_goodbad['prop']) + + ## Append good/bad idetified results + if stream_name == 'take_a_uvvis': + + if (type_peak is np.ndarray) and (type_prop is dict): + self.good_data.append(self.PL_goodbad['data_id']) + + elif (type(self.PL_goodbad['peak']) == list) and (self.PL_goodbad['prop'] == []): + self.bad_data.append(self.PL_goodbad['data_id']) + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {self.uid[:8]} ***\n") + print(f"\n*** since {stream_name} in uid: {self.uid[:8]} is a bad data.***\n") + + print('\n*** export, identify good/bad, fitting complete ***\n') + + print(f"\n*** {self.sample_type} of uid: {self.uid[:8]} has: ***\n" + f"{self.optical_property = }***\n" + f"{self.pdf_property = }***\n") + + + def macro_17_add_queue(self, stream_name, qserver_process, RM): + """macro to add queus task to qserver + + This macro will + 1. Add another 'take_a_uvvis' depending on # of good_data + 2. Add new_points preditcted by self.agent + + Args: + stream_name (str): the stream name in scan doc to identify scan plan + qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. + RM (REManagerAPI): Run Engine Manager API. + """ + + qin = qserver_process.inputs + ## Depend on # of good/bad data, add items into queue item or stop + if (stream_name == 'take_a_uvvis') and (self.inputs.use_good_bad[0]): + if len(self.bad_data) > 3: + print('*** qsever aborted due to too many bad scans, please check setup ***\n') + + ### Stop all infusing pumps and wash loop + sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', + zmq_control_addr=qin.zmq_control_addr[0], + zmq_info_addr=qin.zmq_info_addr[0]) + + RM.queue_stop() + + elif (len(self.good_data) <= 2) and (self.inputs.use_good_bad[0]): + print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n') + + restplan = BPlan('sleep_sec_q', 5) + RM.item_add(restplan, pos=0) + + scanplan = BPlan('take_a_uvvis_csv_q', + sample_type=self.metadata_dic['sample_type'], + spectrum_type='Corrected Sample', + correction_type='Dark', + pump_list=qin.pump_list, + precursor_list=qin.precursor_list, + mixer=qin.mixer) + RM.item_add(scanplan, pos=1) + RM.queue_start() + + elif (len(self.good_data) > 2) and (self.inputs.use_good_bad[0]): + print('*** # of good data is enough so go to the next: bundle plan ***\n') + self.bad_data.clear() + self.good_data.clear() + self.finished.append(self.metadata_dic['sample_type']) + print(f'After event: good_data = {self.good_data}\n') + print(f'After event: finished sample = {self.finished}\n') + + RM.queue_start() + + ## Add predicted new points from ML agent into qserver + elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.inputs.agent_iteration[-1]): + print('*** Add new points from agent to the fron of qsever ***\n') + + new_points = self.macro_agent(qserver_process, RM, check_target=True) + + print(f'*** New points from agent: {new_points} ***\n') + + rate_list = self.auto_rate_list(qin.pump_list, new_points, self.inputs.fix_Br_ratio) + + if self.inputs.post_dilute[0]: + rate_list.append(sum(rate_list)*self.inputs.post_dilute[1]) + + qin.sample = de._auto_name_sample(rate_list, prefix=qin.prefix[1:]) + qin.infuse_rates = rate_list + sq.synthesis_queue_xlsx(qserver_process) + + else: + print('*** Move to next reaction in Queue ***\n') + time.sleep(2) + # RM.queue_start() + + + diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index 02800d6..1998e98 100644 Binary files a/scripts/inputs_qserver_kafka_v2.xlsx and b/scripts/inputs_qserver_kafka_v2.xlsx differ diff --git a/scripts/kafka_consumer_iterate_1LL09_v2.py b/scripts/kafka_consumer_iterate_1LL09_v2.py index e5b5d1c..446df37 100644 --- a/scripts/kafka_consumer_iterate_1LL09_v2.py +++ b/scripts/kafka_consumer_iterate_1LL09_v2.py @@ -62,7 +62,10 @@ print(f'Sample: {sample}') -def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ): +def print_kafka_messages(beamline_acronym, + kafka_process=kafka_process, + qserver_process=qserver_process, + RM=RM, ): """Print kafka message from beamline_acronym @@ -73,6 +76,9 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ): RM (REManagerAPI, optional): Run Engine Manager API. Defaults to RM. """ + kin = kafka_process.inputs + qin = qserver_process.inputs + print(f"Listening for Kafka messages for {beamline_acronym}") print(f'Defaul parameters:\n' f' csv path: {kin.csv_path[0]}\n' @@ -187,21 +193,28 @@ def print_message(consumer, doctype, doc, kin.stream_list.append(stream_name) -////////////////////////////////////////////////////////////////////////////////////////////////// ## Set good/bad data condictions to the corresponding sample kh = kin.key_height[0] hei = kin.height[0] dis = kin.distance[0] ## obtain phase fraction & particle size from g(r) - if 'scattering' in stream_list: - if fitting_pdf: - phase_fraction, particel_size = pc._pdffit2_CsPbX3(gr_data, cif_list, qmax=20, qdamp=0.031, qbroad=0.032, fix_APD=False, toler=0.001) - pdf_property={'Br_ratio': phase_fraction[0], 'Br_size':particel_size[0]} + if 'scattering' in kin.stream_list: + + ## macro_07: do pdf fitting and update kin.gr_fitting + if kin.fitting_pdf[0]: + kafka_process.macro_07_fitting_pdf( + kin.gr_fn[0], beamline_acronym, + rmax=100.0, qmax=12.0, qdamp=0.031, qbroad=0.032, + fix_APD=True, toler=0.01 + ) + + ## macro_08: not do pdf fitting but also update kin.gr_fitting else: - pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan} + kafka_process.macro_08_no_fitting_pdf() + ## remove 'scattering' from stream_list to avoid redundant work in next for loop - stream_list.remove('scattering') + kin.stream_list.remove('scattering') ## Export, plotting, fitting, calculate # of good/bad data, add queue item for stream_name in stream_list: diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index ab201e8..9d0b6d9 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -47,15 +47,24 @@ qin = qserver_process.inputs ## Input varaibales for Kafka, reading from xlsx_fn by given sheet name -kafka_process = LK.xlsx_to_inputs(LK._kafka_process(), xlsx_fn=xlsx_fn, sheet_name='kafka_process') +kafka_process = LK.xlsx_to_inputs(LK._kafka_inputs(), xlsx_fn=xlsx_fn, sheet_name='kafka_process') kin = kafka_process.inputs ## Define RE Manager API as RM RM = REManagerAPI(zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0]) +## Make the first prediction from kafka_process.agent +first_points = kafka_process.macro_agent(qserver_process, RM, check_target=True) +rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio) +if kin.post_dilute[0]: + rate_list.append(sum(rate_list)*kin.post_dilute[1]) + +qin.infuse_rates = rate_list + + ## Import Qserver parameters to RE Manager import _synthesis_queue_RM as sq -sq.synthesis_queue_xlsx(qin) +sq.synthesis_queue_xlsx(qserver_process) ## Auto name samples by prefix if qin.name_by_prefix[0]: @@ -69,10 +78,12 @@ def print_kafka_messages(beamline_acronym_01, qserver_process=qserver_process, RM=RM, ): + """Print kafka message from beamline_acronym Args: - beamline_acronym (str): subscribed topics for data publishing (ex: xpd, xpd-analysis, xpd-ldrd20-31) + beamline_acronym_01 (str): subscribed topics for raw data publishing (ex: xpd, xpd-ldrd20-31) + beamline_acronym_02 (str): subscribed topics for analysis data publishing (ex: xpd-analysis) kafka_process (_LDRD_Kafka.xlsx_to_inputs, optional): kafka parameters read from xlsx. Defaults to kafka_process. qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx. Defaults to qserver_process. RM (REManagerAPI, optional): Run Engine Manager API. Defaults to RM. @@ -110,7 +121,8 @@ def print_kafka_messages(beamline_acronym_01, f'{qin.zmq_control_addr[0] = }') ## Assignt raw data tiled clients - kin.tiled_client.append = from_profile(beamline_acronym_01) + kin.beamline_acronym.append(beamline_acronym_01) + kafka_process.tiled_client = from_profile(beamline_acronym_01) ## 'xpd-analysis' is not a catalog name so can't be accessed in databroker ## Append good/bad data folder to csv_path @@ -123,9 +135,7 @@ def print_kafka_messages(beamline_acronym_01, pass - def print_message(consumer, doctype, doc, - bad_data = [], good_data = [], check_abs365 = False, finished = [], - agent_iteration = []): + def print_message(consumer, doctype, doc, check_abs365 = False, agent_iteration = []): name, message = doc # print(f"contents: {pprint.pformat(message)}\n") @@ -167,10 +177,10 @@ def print_message(consumer, doctype, doc, if 'sample_type' in message.keys(): print(f"sample type: {message['sample_type']}") - ## Reset kin.uid to an empty list - kin.uid = [] + ## Reset kafka_process.uid to an empty list + kafka_process.uid = [] - ## macro_01 + ## macro_01: get iq from sandbox ######### While document (name == 'event') and ('topic' in doc[1]) ########## ## key 'topic' is added into the doc of xpd-analysis in pdfstream ## ## Read uid of analysis data from doc[1]['data']['chi_I'] ## @@ -197,14 +207,14 @@ def print_message(consumer, doctype, doc, ) ## wait 1 second for databroker to save data time.sleep(1) - ## Reset kin.uid to an empty list - kin.uid = [] + ## Reset kafka_process.uid to an empty list + kafka_process.uid = [] - ## macro_02 + ## macro_02: get raw data uid from metadata of sandbox doc #### (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0) #### ## With taking xray_uvvis_plan and analysis of pdfstream finished ## - ## Sleep 1 second and assign uid, stream_list from kin.entry[-1] ## + ## Sleep 1 second and assign uid, stream_list from kafka_process.entry[-1] ## ## No need to stop queue since the net queue task is wahsing loop ## ##################################################################################### elif (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0): @@ -214,7 +224,7 @@ def print_message(consumer, doctype, doc, kafka_process.macro_02_get_uid() - ## macro_03 + ## macro_03: stop queue and get uid for take_a_uvvis scan ######### (name == 'stop') and ('take_a_uvvis' in message['num_events']) ########## ## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream ## ## Stop queue first for identify good/bad data ## @@ -229,436 +239,175 @@ def print_message(consumer, doctype, doc, kafka_process.macro_03_stop_queue_uid(RM) - ################## (name == 'stop') and (type(kin.uid) is str) #################### + ############## (name == 'stop') and (type(kafka_process.uid) is str) ############## ## ## ## When uid is assigned and type is a string, move to data fitting, calculation ## ## ## ##################################################################################### - if (name == 'stop') and (type(kin.uid) is str): - print(f'\n**** start to export uid: {kin.uid} ****\n') - print(f'\n**** with stream name in {kin.stream_list} ****\n') - - ## Set good/bad data condictions to the corresponding sample - kh = kin.key_height[0] - hei = kin.height[0] - dis = kin.distance[0] - - ## obtain phase fraction & particle size from g(r) - if 'scattering' in kin.stream_list: + if (name == 'stop') and (type(kafka_process.uid) is str): + print(f'\n**** start to export uid: {kafka_process.uid} ****\n') + print(f'\n**** with stream name in {kafka_process.stream_list} ****\n') + + + ## macro_04 ~ macro_07 or 08 + #################### 'scattering' in kafka_process.stream_list ################### + ## ## + ## Process X-ray scattering data (PDF): iq to gr, search & match, pdf fitting ## + ## obtain phase fraction & particle size from g(r) ## + ##################################################################################### + if 'scattering' in kafka_process.stream_list: # Get metadata from stream_name fluorescence for plotting - qepro_dic, metadata_dic = de.read_qepro_by_stream( - kin.uid, stream_name='fluorescence', data_agent='tiled', + kafka_process.qepro_dic, kafka_process.metadata_dic = de.read_qepro_by_stream( + kafka_process.uid, stream_name='fluorescence', data_agent='tiled', beamline_acronym=beamline_acronym_01) - u = plot_uvvis(qepro_dic, metadata_dic) + u = plot_uvvis(kafka_process.qepro_dic, kafka_process.metadata_dic) - ## macro_04 (dummy test, e.g., CsPbBr2) + ## macro_04: setting dummy pdf data for test, e.g., CsPbBr2) if kin.dummy_pdf[0]: kafka_process.macro_04_dummy_pdf() - ## macro_05 + ## macro_05: do i(q) to g(r) through pdfstream if kin.iq_to_gr[0]: kafka_process.macro_05_iq_to_gr(beamline_acronym_01) - ## macro_06 + ## macro_06: do search and match if kin.search_and_match[0]: # cif_fn = kafka_process.macro_06_search_and_match(kin.gr_fn[0]) - cif_fn = kafka_process.macro_06_search_and_match(kin.gr_data[0]) + cif_fn = kafka_process.macro_06_search_and_match(kafka_process.gr_data[0]) print(f'\n\n*** After matching, the most correlated strucuture is\n' f'*** {cif_fn} ***\n\n') - ## macro_07 + ## macro_07: do pdf fitting and update kafka_process.gr_fitting if kin.fitting_pdf[0]: kafka_process.macro_07_fitting_pdf( - kin.gr_data[0], beamline_acronym_01, + kafka_process.gr_data[0], beamline_acronym_01, rmax=100.0, qmax=12.0, qdamp=0.031, qbroad=0.032, fix_APD=True, toler=0.01 ) + ## macro_08: not do pdf fitting but also update kafka_process.gr_fitting else: kafka_process.macro_08_no_fitting_pdf() if kin.iq_to_gr[0]: - u.plot_iq_to_gr(kin.iq_data[2], kin.gr_data[1].to_numpy().T, gr_fit=kin.gr_fitting[2]) + u.plot_iq_to_gr(kafka_process.iq_data['array'], kafka_process.gr_data[1].to_numpy().T, gr_fit=kafka_process.gr_fitting['array']) ## remove 'scattering' from stream_list to avoid redundant work in next for loop - kin.stream_list.remove('scattering') + kafka_process.stream_list.remove('scattering') -////////////////////////////////////////////////////////////////////////////////////////// - ## Export, plotting, fitting, calculate # of good/bad data, add queue item - for stream_name in stream_list: - ## Read data from databroker and turn into dic - qepro_dic, metadata_dic = de.read_qepro_by_stream( - uid, stream_name=stream_name, data_agent='tiled', beamline_acronym=beamline_acronym_01) - sample_type = metadata_dic['sample_type'] - ## Save data in dic into .csv file - - if stream_name == 'take_a_uvvis': - saving_path = path_1 - else: - saving_path = path_0 - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name) - print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(saving_path)} **\n') + ######## Other stream names except 'scattering' in kafka_process.stream_list ##### + ## Process Uv-Vis data: Export, plotting, fitting, calculate # of good/bad data ## + ## add queue item, PLQY ## + ## Agent: save agent data locally & to sandbox, make optimization ## + ##################################################################################### + for stream_name in kafka_process.stream_list: + + ############################## macro_09 ~ macro_11 ############################### + ## read uv-vis data into dic and save data into .csv file ## + ## Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' ## + ## Avergae fluorescence scans in 'fluorescence' and idenfify good/bad ## + ##################################################################################### + + ## macro_09: read uv-vis data into dic and save data into .csv file + kafka_process.macro_09_qepro_dic(stream_name, beamline_acronym_01) + ## Plot data in dic - u = plot_uvvis(qepro_dic, metadata_dic) + u = plot_uvvis(kafka_process.qepro_dic, kafka_process.metadata_dic) if len(good_data)==0 and len(bad_data)==0: clear_fig=True else: clear_fig=False u.plot_data(clear_fig=clear_fig) - print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') + print(f'\n** Plot {stream_name} in uid: {kafka_process.uid[0:8]} complete **\n') - global abs_array, abs_array_offset, x0, y0 - ## Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' + + ## macro_10_good_bad: Idenfify good/bad data if it is a fluorescence scan in 'take_a_uvvis' if qepro_dic['QEPro_spectrum_type'][0]==2 and stream_name=='take_a_uvvis': print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') - x0, y0, data_id, peak, prop = da._identify_one_in_kafka(qepro_dic, metadata_dic, key_height=kh, distance=dis, height=hei, dummy_test=dummy_test) + kafka_process.macro_10_good_bad(stream_name) - ## Apply an offset to zero baseline of absorption spectra + ## macro_11_absorbance: Apply an offset to zero baseline of absorption spectra elif stream_name == 'absorbance': print(f'\n*** start to filter absorbance within 15%-85% due to PF oil phase***\n') - ## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85] - abs_per = da.percentile_abs(qepro_dic['QEPro_x_axis'], qepro_dic['QEPro_output'], percent_range=[15, 85]) - - print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') - # abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) - abs_array = abs_per.mean(axis=0) - wavelength = qepro_dic['QEPro_x_axis'][0] - - popt_abs01, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[205, 240], plot=False) - popt_abs02, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False) - if abs(popt_abs01[0]) >= abs(popt_abs02[0]): - popt_abs = popt_abs02 - elif abs(popt_abs01[0]) <= abs(popt_abs02[0]): - popt_abs = popt_abs01 - - abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs) + kafka_process.macro_11_absorbance(stream_name) - print(f'\nFitting function for baseline offset: {da.line_2D}\n') - ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs, 'percentile_mean': abs_array} - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff_abs) - u.plot_offfset(wavelength, da.line_2D, popt_abs) + u.plot_offfset(kafka_process.abs_data['wavelength'], kafka_process.abs_fitting['fit_function'], kafka_process.abs_fitting['curve_fit']) print(f'\n** export offset results of absorption spectra complete**\n') - ## Avergae scans in 'fluorescence' and idenfify good/bad + ## macro_10_good_bad: Avergae scans in 'fluorescence' and idenfify good/bad elif stream_name == 'fluorescence': print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') - ## Apply percnetile filtering for PL spectra, defaut percent_range = [30, 100] - x0, y0, data_id, peak, prop = da._identify_multi_in_kafka(qepro_dic, metadata_dic, - key_height=kh, distance=dis, height=hei, - dummy_test=dummy_test, percent_range=[30, 100]) - label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' - # u.plot_average_good(x0, y0, color=cmap(color_idx[sub_idx]), label=label_uid) - # sub_idx = sample.index(metadata_dic['sample_type']) - u.plot_average_good(x0, y0, label=label_uid, clf_limit=9) - - global f_fit + kafka_process.macro_10_good_bad(stream_name) + + label_uid = f'{kafka_process.uid[0:8]}_{kafka_process.metadata_dic["sample_type"]}' + u.plot_average_good(kafka_process.PL_goodbad['x0'], kafka_process.PL_goodbad['y0'], label=label_uid, clf_limit=9) + + ## Skip peak fitting if qepro type is absorbance if qepro_dic['QEPro_spectrum_type'][0] == 3: - print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {kafka_process.uid[:8]} ***\n") + ## macro_12 ~ macro_16 else: - ## for a good data, type(peak) will be a np.array and type(prop) will be a dic - ## fit the good data, export/plotting fitting results - ## append data_id into good_data or bad_data for calculate numbers - if (type(peak) is np.ndarray) and (type(prop) is dict): - x, y, p, f_fit, popt = da._fitting_in_kafka(x0, y0, data_id, peak, prop, dummy_test=dummy_test) - - fitted_y = f_fit(x, *popt) - r2_idx1, _ = da.find_nearest(x, popt[1] - 3*popt[2]) - r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2]) - r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0) - - metadata_dic["r_2"] = r_2 + ############## (type(peak) is np.ndarray) and (type(prop) is dict) ################ + ## for a good data, type(peak) will be a np.array and type(prop) will be a dict ## + ## fit the good data, export/plotting fitting results ## + ## append data_id into good_data or bad_data for calculate numbers ## + ##################################################################################### + type_peak = type(kafka_process.PL_goodbad['peak']) + type_prop = type(kafka_process.PL_goodbad['prop']) + + ## When PL data is good + if (type_peak is np.ndarray) and (type_prop is dict): - if 'gauss' in f_fit.__name__: - constant = 2.355 - else: - constant = 1 - - intensity_list = [] - peak_list = [] - fwhm_list = [] - for i in range(int(len(popt)/3)): - intensity_list.append(popt[i*3+0]) - peak_list.append(popt[i*3+1]) - fwhm_list.append(popt[i*3+2]*constant) - - peak_emission_id = np.argmax(np.asarray(intensity_list)) - peak_emission = peak_list[peak_emission_id] - fwhm = fwhm_list[peak_emission_id] - ff={'fit_function': f_fit, 'curve_fit': popt, 'percentile_mean': y0} + ## macro_12_PL_fitting: do peak fitting with gaussian distribution of PL spectra + kafka_process.macro_12_PL_fitting() ## Calculate PLQY for fluorescence stream if (stream_name == 'fluorescence') and (PLQY[0]==1): - PL_integral_s = integrate.simpson(y) - label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' - u.plot_CsPbX3(x, y, peak_emission, label=label_uid, clf_limit=9) - - ## Find absorbance at 365 nm from absorbance stream - # q_dic, m_dic = de.read_qepro_by_stream(uid, stream_name='absorbance', data_agent='tiled') - # abs_array = q_dic['QEPro_output'][1:].mean(axis=0) - # wavelength = q_dic['QEPro_x_axis'][0] - idx1, _ = da.find_nearest(wavelength, PLQY[2]) - absorbance_s = abs_array_offset[idx1] - - if PLQY[1] == 'fluorescein': - plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) - else: - plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) - - - plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy} - - optical_property = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, - 'Peak': peak_emission, 'FWHM':fwhm, 'PLQY':plqy} - - - ## Creat agent_data in type of dict for exporting as json and wirte to sandbox - agent_data = {} - agent_data.update(optical_property) - agent_data.update(pdf_property) - agent_data.update({k:v for k, v in metadata_dic.items() if len(np.atleast_1d(v)) == 1}) - agent_data = de._exprot_rate_agent(metadata_dic, rate_label_dic, agent_data) - - ## Update absorbance offset and fluorescence fitting results inot agent_data - agent_data.update({'abs_offset':{'fit_function':ff_abs['fit_function'].__name__, 'popt':ff_abs['curve_fit'].tolist()}}) - agent_data.update({'PL_fitting':{'fit_function':ff['fit_function'].__name__, 'popt':ff['curve_fit'].tolist()}}) - - - if USE_AGENT_iterate: - - # print(f"\ntelling agent {agent_data}") - agent = build_agen(peak_target=peak_target, agent_data_path=agent_data_path, use_OAm=True) - - if len(agent.table) < 2: - acq_func = "qr" - else: - acq_func = "qei" - - new_points = agent.ask(acq_func, n=1) - - ## Get target of agent.ask() - agent_target = agent.objectives.summary['target'].tolist() - - ## Get mean and standard deviation of agent.ask() - res_values = [] - for i in new_points_label: - if i in new_points['points'].keys(): - res_values.append(new_points['points'][i][0]) - x_tensor = torch.tensor(res_values) - post = agent.posterior(x_tensor) - post_mean = post.mean.tolist()[0] - post_stddev = post.stddev.tolist()[0] - - ## apply np.exp for log-transform objectives - if_log = agent.objectives.summary['transform'] - for j in range(if_log.shape[0]): - if if_log[j] == 'log': - post_mean[j] = np.exp(post_mean[j]) - post_stddev[j] = np.exp(post_stddev[j]) - - ## Update target, mean, and standard deviation in agent_data - agent_data.update({'agent_target': agent_target}) - agent_data.update({'posterior_mean': post_mean}) - agent_data.update({'posterior_stddev': post_stddev}) - - - # peak_diff = peak_emission - peak_target - peak_diff = False - - # if (peak_diff <= 3) and (peak_diff >=-3): - if peak_diff: - print(f'\nTarget peak: {peak_target} nm vs. Current peak: {peak_emission} nm\n') - print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n') - - ### Stop all infusing pumps and wash loop - sq.wash_tube_queue2(pump_list, wash_tube, 'ul/min', - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr) - - inst1 = BInst("queue_stop") - RM.item_add(inst1, pos='front') - agent_iteration.append(False) - - else: - agent_iteration.append(True) - + ## macro_13_PLQY: calculate integral of PL peak, PLQY and update optical_property + kafka_process.macro_13_PLQY() + label_uid = f'{self.uid[0:8]}_{self.metadata_dic["sample_type"]}' + u.plot_CsPbX3(self.PL_fitting['wavelength'], self.PL_fitting['intensity'], + self.PL_fitting['peak_emission'], label=label_uid, clf_limit=9) else: - plqy_dic = None - optical_property = None - - ## Save fitting data - print(f'\nFitting function: {f_fit}\n') - de.dic_to_csv_for_stream(saving_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff, plqy_dic=plqy_dic) - print(f'\n** export fitting results complete**\n') + self.plqy_dic ={} + self.optical_property = {} ## Plot fitting data - u.plot_peak_fit(x, y, f_fit, popt, peak=p, fill_between=True) + u.plot_peak_fit(self.PL_fitting['wavelength'], + self.PL_fitting['intensity'], + self.PL_fitting['fit_function'], + self.PL_fitting['curve_fit'], + peak=self.PL_fitting['shifted_peak_idx'], + fill_between=True) print(f'\n** plot fitting results complete**\n') - print(f'{peak = }') - print(f'{prop = }') - - ## Append good/bad idetified results - if stream_name == 'take_a_uvvis': - good_data.append(data_id) - - elif (type(peak) == list) and (prop == []): - bad_data.append(data_id) - print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") - print(f"\n*** since {stream_name} in uid: {uid[:8]} is a bad data.***\n") - - print('\n*** export, identify good/bad, fitting complete ***\n') + + ## macro_14_agent_data: Creat agent_data in type of dict for exporting as json and wirte to sandbox + kafka_process.macro_14_upate_agent() + + ## macro_15_save_data: Save processed data and agent data + kafka_process.macro_15_save_data(stream_name) - try : - print(f"\n*** {sample_type} of uid: {uid[:8]} has: ***\n" - f"{optical_property = }***\n" - f"{pdf_property = }***\n") - except (UnboundLocalError): - pass - - global sandbox_uid - ## Save processed data in df and agent_data as metadta in sandbox - if write_to_sandbox and (stream_name == 'fluorescence'): - df = pd.DataFrame() - df['wavelength_nm'] = x0 - df['absorbance_mean'] = abs_array - df['absorbance_offset'] = abs_array_offset - df['fluorescence_mean'] = y0 - df['fluorescence_fitting'] = f_fit(x0, *popt) - - ## use pd.concat to add various length data together - df_new = pd.concat([df, iq_df, gr_df, gr_fit_df], ignore_index=False, axis=1) - - # entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data) - entry = sandbox_tiled_client.write_dataframe(df_new, metadata=agent_data) - # uri = sandbox_tiled_client.values()[-1].uri - uri = entry.uri - sandbox_uid = uri.split('/')[-1] - agent_data.update({'sandbox_uid': sandbox_uid}) - print(f"\nwrote to Tiled sandbox uid: {sandbox_uid}") - - ## Save agent_data locally - if write_agent_data and (stream_name == 'fluorescence'): - # agent_data.update({'sandbox_uid': sandbox_uid}) - with open(f"{agent_data_path}/{data_id}.json", "w") as f: - json.dump(agent_data, f, indent=2) - - print(f"\nwrote to {agent_data_path}\n") - - print(f'*** Accumulated num of good data: {len(good_data)} ***\n') - print(f'good_data = {good_data}\n') - print(f'*** Accumulated num of bad data: {len(bad_data)} ***\n') + ## macro_16_num_good: Add self.PL_goodbad['data_id'] into self.good_data or self.bad_data + kafka_process.macro_16_num_good(stream_name) + + + print(f'*** Accumulated num of good data: {len(self.good_data)} ***\n') + print(f'{self.good_data = }\n') + print(f'*** Accumulated num of bad data: {len(self.bad_data)} ***\n') print('########### Events printing division ############\n') - - ## Depend on # of good/bad data, add items into queue item or stop - if stream_name == 'take_a_uvvis' and use_good_bad: - if len(bad_data) > 3: - print('*** qsever aborted due to too many bad scans, please check setup ***\n') - - ### Stop all infusing pumps and wash loop - sq.wash_tube_queue2(pump_list, wash_tube, 'ul/min', - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr) - - RM.queue_stop() - - elif len(good_data) <= 2 and use_good_bad: - print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n') - - restplan = BPlan('sleep_sec_q', 5) - RM.item_add(restplan, pos=0) - - scanplan = BPlan('take_a_uvvis_csv_q', - sample_type=metadata_dic['sample_type'], - spectrum_type='Corrected Sample', - correction_type='Dark', - pump_list=pump_list, - precursor_list=precursor_list, - mixer=mixer) - RM.item_add(scanplan, pos=1) - RM.queue_start() - - elif len(good_data) > 2 and use_good_bad: - print('*** # of good data is enough so go to the next: bundle plan ***\n') - bad_data.clear() - good_data.clear() - finished.append(metadata_dic['sample_type']) - print(f'After event: good_data = {good_data}\n') - print(f'After event: finished sample = {finished}\n') - - RM.queue_start() - - ## Add predicted new points from ML agent into qserver - elif stream_name == 'fluorescence' and USE_AGENT_iterate and agent_iteration[-1]: - print('*** Add new points from agent to the fron of qsever ***\n') - print(f'*** New points from agent: {new_points} ***\n') - - if (post_dilute is True) and (fix_Br_ratio is False): - set_target_list = [0 for i in range(len(pump_list))] - rate_list = [] - for i in new_points_label: - if i in new_points['points'].keys(): - rate_list.append(new_points['points'][i][0]) - else: - pass - # rate_list.append(0) - # rate_list.insert(1, rate_list[0]*5) - rate_list.append(sum(rate_list)*5) - - elif (post_dilute is True) and (fix_Br_ratio is True): - set_target_list = [0 for i in range(len(pump_list))] - rate_list = [] - for i in new_points_label: - if i in new_points['points'].keys(): - rate_list.append(new_points['points'][i][0]) - else: - pass - # rate_list.append(0) - rate_list.insert(1, rate_list[0]*5) - rate_list.append(sum(rate_list)*5) - - else: - # set_target_list = [0 for i in range(new_points['points'].shape[1])] - set_target_list = [0 for i in range(len(pump_list))] - rate_list = new_points['points'] - - sample = de._auto_name_sample(rate_list, prefix=prefix[1:]) - - sq.synthesis_queue( - syringe_list=syringe_list, - pump_list=pump_list, - set_target_list=set_target_list, - target_vol_list=target_vol_list, - rate_list = rate_list, - syringe_mater_list=syringe_mater_list, - precursor_list=precursor_list, - mixer=mixer, - resident_t_ratio=resident_t_ratio, - prefix=prefix[1:], - sample=sample, - wash_tube=wash_tube, - name_by_prefix=bool(prefix[0]), - num_abs=num_uvvis[0], - num_flu=num_uvvis[1], - det1=num_uvvis[2], - det1_time=num_uvvis[3], - det1_frame_rate=num_uvvis[4], - is_iteration=True, - zmq_control_addr=zmq_control_addr, - zmq_info_addr=zmq_info_addr, - ) - - # RM.queue_start() - - # elif use_good_bad: - else: - print('*** Move to next reaction in Queue ***\n') - time.sleep(2) - # RM.queue_start() + + ################# macro_17_add_queue: Add queus task to qserver ################### + ## ## + ## Depend on # of good/bad data, add items into queue item or stop ## + ## 'take_a_uvvis' or new_points of self.agent ## + ##################################################################################### + kafka_process.macro_17_add_queue(stream_name, qserver_process, RM) kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")