diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 94c2392..d0c6295 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -42,9 +42,9 @@ def _kafka_process(): 'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn', 'search_and_match', 'mystery_path', 'results_path', 'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn', - 'use_sandbox', 'write_to_sandbox', 'sandbox_tiled_client', 'tiled_client', + 'use_sandbox', 'write_to_sandbox', 'sandbox_uri', 'sandbox_tiled_client', 'tiled_client', 'fn_TBD', - 'entry', 'iq_q', 'iq_I', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream', + 'entry', 'iq_data', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream', '' ] @@ -85,10 +85,10 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'): agent_data_path=self.inputs.agent_data_path[0]) ) - ## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox - ## so, turn uri into client and append it in self.inputs.sandbox_tiled_client - if type(self.inputs.sandbox_tiled_client[0]) is str: - self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0])) + ## self.inputs.sandbox_uri[0] is just the uri of sandbox + ## so, turn uri into client and assign it to self.inputs.sandbox_tiled_client + if type(self.inputs.sandbox_uri[0]) is str: + self.inputs.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0]) ## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index 7291fbc..e770318 100644 Binary files a/scripts/inputs_qserver_kafka_v2.xlsx and b/scripts/inputs_qserver_kafka_v2.xlsx differ diff --git a/scripts/kafka_consumer_iterate_1LL09_v2.py b/scripts/kafka_consumer_iterate_1LL09_v2.py index 14af3b9..a519c6b 100644 --- a/scripts/kafka_consumer_iterate_1LL09_v2.py +++ b/scripts/kafka_consumer_iterate_1LL09_v2.py @@ -100,9 +100,8 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ): f'{bool(kin.write_to_sandbox[0]) = }\n' f'{qin.zmq_control_addr[0] = }') - ## Append raw data tiled_client - kin.tiled_client.append(beamline_acronym) - kin.tiled_client.append(from_profile(beamline_acronym)) + ## Assignt raw data tiled clients + kin.tiled_client.append = from_profile(beamline_acronym) ## Append good/bad data folder to csv_path kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad')) @@ -177,19 +176,24 @@ def print_message(consumer, doctype, doc, time.sleep(2) ## Obtain uid from message['run_start'] - uid = message['run_start'] - print(f'\n**** start to export uid: {uid} ****\n') + kin.uid = message['run_start'] + kin.uid_catalog.append(kin.uid) + print(f'\n**** start to export uid: {kin.uid} ****\n') # print(list(message['num_events'].keys())[0]) ## Put stream name of scans into stream_list stream_list = list(message['num_events'].keys()) + kin.stream_list = [] + for stream_name in syringe_list: + kin.stream_list.append(stream_name) + +////////////////////////////////////////////////////////////////////////////////////////////////// ## Set good/bad data condictions to the corresponding sample kh = kin.key_height[0] hei = kin.height[0] dis = kin.distance[0] -////////////////////////////////////////////////////////////////////////////////////////////////// ## obtain phase fraction & particle size from g(r) if 'scattering' in stream_list: if fitting_pdf: diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index f9ecf4e..a600b1c 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -102,13 +102,10 @@ def print_kafka_messages(beamline_acronym_01, beamline_acronym_02, kin=kin, qin= f'{bool(kin.write_to_sandbox[0]) = }\n' f'{qin.zmq_control_addr[0] = }') - ## Append raw data & analysis data tiled clients - kin.tiled_client.append(beamline_acronym_01) - kin.tiled_client.append(from_profile(beamline_acronym_01)) + ## Assignt raw data tiled clients + kin.tiled_client.append = from_profile(beamline_acronym_01) ## 'xpd-analysis' is not a catalog name so can't be accessed in databroker - kin.sandbox_tiled_client.append(from_uri(kin.sandbox_tiled_client[0])) - ## Append good/bad data folder to csv_path kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad')) @@ -163,7 +160,7 @@ def print_message(consumer, doctype, doc, if 'sample_type' in message.keys(): print(f"sample type: {message['sample_type']}") - ## Reset kin.uid to as empty list + ## Reset kin.uid to an empty list kin.uid = [] @@ -178,12 +175,14 @@ def print_message(consumer, doctype, doc, iq_I_uid = doc[1]['data']['chi_I'] kin.uid_pdfstream.append(iq_I_uid) - kin.entry.append(sandbox_tiled_client[iq_I_uid]) - df = kin.entry[-1].read() - kin.iq_Q.append(df['chi_Q'].to_numpy()) - kin.iq_I.append(df['chi_I'].to_numpy()) + kin.entry = kin.sandbox_tiled_client[iq_I_uid] + df = kin.entry.read() + # Before appending I(Q) data, reset kin.iq_data as an empty list + kin.iq_data = [] + kin.iq_data.append(df['chi_Q'].to_numpy()) + kin.iq_data.append(df['chi_I'].to_numpy()) - ## Reset kin.uid to as empty list + ## Reset kin.uid to an empty list kin.uid = [] @@ -202,6 +201,7 @@ def print_message(consumer, doctype, doc, # RM.item_add(inst1, pos='front') ## wait 1 second for databroker to save data time.sleep(1) + ## Reset kin.uid to an empty list kin.uid = [] @@ -216,9 +216,10 @@ def print_message(consumer, doctype, doc, ) ## wait 1 second for databroker to save data time.sleep(1) - kin.uid = kin.entry[-1].metadata['run_start'] + kin.uid = kin.entry.metadata['run_start'] kin.uid_catalog.append(kin.uid) - stream_list = kin.tiled_client[-1][kin.uid].metadata['summary']['stream_names'] + stream_list = kin.tiled_client[kin.uid].metadata['summary']['stream_names'] + ## Reset kin.stream_list to an empty list kin.stream_list = [] for stream_name in syringe_list: kin.stream_list.append(stream_name) @@ -243,52 +244,53 @@ def print_message(consumer, doctype, doc, kin.uid = message['run_start'] kin.uid_catalog.append(kin.uid) stream_list = list(message['num_events'].keys()) + ## Reset kin.stream_list to an empty list kin.stream_list = [] for stream_name in syringe_list: kin.stream_list.append(stream_name) - - ///////////////////////////////////////////////////////////////////////////////////////////////// - - ## When uid is assigned and type is a string, move to data fitting, calculation - if (name == 'stop') and (type(uid) is str): - print(f'\n**** start to export uid: {uid} ****\n') - print(f'\n**** with stream name in {stream_list} ****\n') + + + ################## (name == 'stop') and (type(kin.uid) is str) #################### + ## ## + ## When uid is assigned and type is a string, move to data fitting, calculation ## + ## ## + ##################################################################################### + if (name == 'stop') and (type(kin.uid) is str): + print(f'\n**** start to export uid: {kin.uid} ****\n') + print(f'\n**** with stream name in {kin.stream_list} ****\n') ## Set good/bad data condictions to the corresponding sample - try: - kh = key_height[len(finished)] - hei = height[len(finished)] - dis = distance[len(finished)] - except IndexError: - kh = key_height[0] - hei = height[0] - dis = distance[0] + kh = kin.key_height[0] + hei = kin.height[0] + dis = kin.distance[0] ## obtain phase fraction & particle size from g(r) - if 'scattering' in stream_list: - # Get metadat from stream_name fluorescence for plotting + if 'scattering' in kin.stream_list: + # Get metadata from stream_name fluorescence for plotting qepro_dic, metadata_dic = de.read_qepro_by_stream( - uid, stream_name='fluorescence', data_agent='tiled', + kin.uid, stream_name='fluorescence', data_agent='tiled', beamline_acronym=beamline_acronym_01) u = plot_uvvis(qepro_dic, metadata_dic) - if use_sandbox: - iq_df = np.asarray([iq_Q, iq_I]) - iq_df2 = pd.DataFrame() - iq_df2['q'] = iq_Q - iq_df2['I(q)'] = iq_I + if kin.use_sandbox[0]: + iq_array = np.asarray([kin.iq_data[0], kin.iq_data[1]]) + kin.iq_data.append(iq_array) + + iq_df = pd.DataFrame() + iq_df['q'] = kin.iq_data[0] + iq_df['I(q)'] = kin.iq_data[1] + kin.iq_data.append(iq_df) # ### CsPbBr2 test - # iq_df = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T - # iq_df2 = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ') - - else: - pass - # iq_df = iq_fn[0] + # iq_array = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T + # kin.iq_data.append(iq_array) + # iq_df = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ') + # kin.iq_data.append(iq_df) + - if iq_to_gr: + if kin.iq_to_gr[0]: # Grab metadat from stream_name = fluorescence for naming gr file - fn_uid = de._fn_generator(uid, beamline_acronym=beamline_acronym_01) + fn_uid = de._fn_generator(kin.uid, beamline_acronym=beamline_acronym_01) gr_fn = f'{fn_uid}_scattering.gr' # ### CsPbBr2 test @@ -296,9 +298,9 @@ def print_message(consumer, doctype, doc, # Build pdf config file from a scratch pdfconfig = PDFConfig() - pdfconfig.readConfig(cfg_fn) - pdfconfig.backgroundfiles = bkg_fn[-1] - sqfqgr_path = gp.transform_bkg(pdfconfig, iq_df, output_dir=gr_path, + pdfconfig.readConfig(kin.cfg_fn[-1]) + pdfconfig.backgroundfiles = kin.bkg_fn[-1] + sqfqgr_path = gp.transform_bkg(pdfconfig, iq_array, output_dir=gr_path, plot_setting={'marker':'.','color':'green'}, test=True, gr_fn=gr_fn) gr_data = sqfqgr_path['gr'] @@ -365,7 +367,7 @@ def print_message(consumer, doctype, doc, pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan} if iq_to_gr: - u.plot_iq_to_gr(iq_df, gr_df.to_numpy().T, gr_fit=gr_fit) + u.plot_iq_to_gr(iq_array, gr_df.to_numpy().T, gr_fit=gr_fit) ## remove 'scattering' from stream_list to avoid redundant work in next for loop stream_list.remove('scattering') @@ -616,7 +618,7 @@ def print_message(consumer, doctype, doc, df['fluorescence_fitting'] = f_fit(x0, *popt) ## use pd.concat to add various length data together - df_new = pd.concat([df, iq_df2, gr_df, gr_fit_df], ignore_index=False, axis=1) + df_new = pd.concat([df, iq_df, gr_df, gr_fit_df], ignore_index=False, axis=1) # entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data) entry = sandbox_tiled_client.write_dataframe(df_new, metadata=agent_data)