From 4e18def8a177055e0e4d2d07d47ccc0fd19cc06b Mon Sep 17 00:00:00 2001 From: XPD_Operator Date: Mon, 19 Aug 2024 18:56:36 -0400 Subject: [PATCH] debug workflow at beamline for two topics --- scripts/.~lock.inputs_qserver_kafka_v2.xlsx# | 1 - scripts/_LDRD_Kafka.py | 214 +++++++++++------- scripts/_synthesis_queue_RM.py | 4 +- scripts/dev/print_kafka_with_main_consumer.py | 101 +++------ scripts/inputs_qserver_kafka_v2.xlsx | Bin 22190 -> 22185 bytes scripts/kafka_consumer_iterate_XPD_v2.py | 167 +++++--------- 6 files changed, 221 insertions(+), 266 deletions(-) delete mode 100644 scripts/.~lock.inputs_qserver_kafka_v2.xlsx# diff --git a/scripts/.~lock.inputs_qserver_kafka_v2.xlsx# b/scripts/.~lock.inputs_qserver_kafka_v2.xlsx# deleted file mode 100644 index 6f12a7c..0000000 --- a/scripts/.~lock.inputs_qserver_kafka_v2.xlsx# +++ /dev/null @@ -1 +0,0 @@ -,xf28id2,xf28id2-ws3.nsls2.bnl.local,15.08.2024 18:56,file:///home/xf28id2/.config/libreoffice/4; \ No newline at end of file diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 64b00e8..8b34845 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -7,18 +7,20 @@ import time import pprint import json +from scipy import integrate -import _data_export as de -from _plot_helper import plot_uvvis -import _data_analysis as da -import _pdf_calculator as pc +sq = importlib.import_module("_synthesis_queue_RM") +de = importlib.import_module("_data_export") +da = importlib.import_module("_data_analysis") +pc = importlib.import_module("_pdf_calculator") -from scipy import integrate +from diffpy.pdfgetx import PDFConfig +gp = importlib.import_module("_get_pdf") + +build_agent = importlib.import_module("prepare_agent_pdf").build_agent import torch -# from diffpy.pdfgetx import PDFConfig -# import _get_pdf as gp -from tiled.client import from_uri +from tiled.client import from_uri from bluesky_queueserver_api.zmq import REManagerAPI from bluesky_queueserver_api import BPlan, BInst @@ -109,7 +111,6 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False ## Assign Agent to self.agent self.agent print('\n***** Start to initialize blop agent ***** \n') - build_agent = importlib.import_module("prepare_agent_pdf").build_agent self.agent = build_agent( peak_target=self.inputs.peak_target[0], agent_data_path=self.inputs.agent_data_path[0]) @@ -247,7 +248,6 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False): print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n') ### Stop all infusing pumps and wash loop - sq = importlib.import_module("_synthesis_queue_RM") sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0]) @@ -262,7 +262,6 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False): if is_1st: pass else: - build_agent = importlib.import_module("prepare_agent_pdf").build_agent self.agent = build_agent( peak_target = peak_target, agent_data_path = self.inputs.agent_data_path[0], @@ -304,7 +303,78 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False): return new_points - def macro_01_get_iq(self, iq_I_uid): + + def macro_00_print_start(self, message): + """macro to print metadata when doc name is start and reset self.uid to an empty list + + Args: + message (dict): message in RE document + """ + + if 'uid' in message.keys(): + print(f"uid: {message['uid']}") + if 'plan_name' in message.keys(): + print(f"plan name: {message['plan_name']}") + if 'detectors' in message.keys(): + print(f"detectors: {message['detectors']}") + if 'pumps' in message.keys(): + print(f"pumps: {message['pumps']}") + if 'detectors' in message.keys(): + print(f"detectors: {message['detectors']}") + if 'uvvis' in message.keys() and (len(message['uvvis'])==3): + print(f"uvvis mode:\n" + f" integration time: {message['uvvis'][0]} ms\n" + f" num spectra averaged: {message['uvvis'][1]}\n" + f" buffer capacity: {message['uvvis'][2]}" + ) + elif 'uvvis' in message.keys() and (len(message['uvvis'])==5): + print(f"uvvis mode:\n" + f" spectrum type: {message['uvvis'][0]}\n" + f" integration time: {message['uvvis'][2]} ms\n" + f" num spectra averaged: {message['uvvis'][3]}\n" + f" buffer capacity: {message['uvvis'][4]}" + ) + if 'mixer' in message.keys(): + print(f"mixer: {message['mixer']}") + if 'sample_type' in message.keys(): + print(f"sample type: {message['sample_type']}") + + ## Reset self.uid to an empty list + self.uid = [] + + + + + def macro_01_stop_queue_uid(self, RM, message): + """macro to stop queue and get raw data uid, used in kafka consumer + while taking a Uv-Vis, no X-ray data but still do analysis of pdfstream + + This macro will + 1. Stop queue + 2. Assign raw data uid to self.uid + 3. Append raw data uid to self.uid_catalog + 4. Update self.stream_list + + Args: + RM (REManagerAPI): Run Engine Manager API. + message (dict): message in RE document + """ + inst1 = BInst("queue_stop") + RM.item_add(inst1, pos='front') + ## wait 1 second for databroker to save data + time.sleep(1) + self.uid = message['run_start'] + self.uid_catalog.append(self.uid) + stream_list = list(message['num_events'].keys()) + ## Reset self.stream_list to an empty list + self.stream_list = [] + for stream_name in stream_list: + self.stream_list.append(stream_name) + + + + + def macro_02_get_iq(self, iq_I_uid): """macro to get iq data, used in kafka consumer whiel taking xray_uvvis_plan and analysis of pdfstream finished @@ -349,7 +419,7 @@ def macro_01_get_iq(self, iq_I_uid): - def macro_02_get_uid(self): + def macro_03_get_uid(self): """macro to get raw data uid, used in kafka consumer This macro will @@ -369,33 +439,6 @@ def macro_02_get_uid(self): - def macro_03_stop_queue_uid(self, RM, message): - """macro to stop queue and get raw data uid, used in kafka consumer - while taking a Uv-Vis, no X-ray data but still do analysis of pdfstream - - This macro will - 1. Stop queue - 2. Assign raw data uid to self.uid - 3. Append raw data uid to self.uid_catalog - 4. Update self.stream_list - - Args: - RM (REManagerAPI): Run Engine Manager API. - message (dict): message in RE document - """ - inst1 = BInst("queue_stop") - RM.item_add(inst1, pos='front') - ## wait 1 second for databroker to save data - time.sleep(1) - self.uid = message['run_start'] - self.uid_catalog.append(self.uid) - stream_list = list(message['num_events'].keys()) - ## Reset self.stream_list to an empty list - self.stream_list = [] - for stream_name in stream_list: - self.stream_list.append(stream_name) - - def macro_04_dummy_pdf(self): """macro to setup a dummy pdf data for testing, used in kafka consumer @@ -426,46 +469,46 @@ def macro_04_dummy_pdf(self): - # def macro_05_iq_to_gr(self, beamline_acronym): - # """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer + def macro_05_iq_to_gr(self, beamline_acronym): + """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer - # This macro will - # 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence - # 2. Read pdf config file from self.inputs.cfg_fn[-1] - # 3. Read pdf background file from self.inputs.bkg_fn[-1] - # 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0] - # 5. Read saved g(r) into pd.DataFrame and save again to remove the headers - # 6. Update g(r) data path and data frame to self.gr_data - # self.gr_data[0]: gr_data (path) - # self.gr_data[1]: gr_df - - # Args: - # beamline_acronym (str): catalog name for tiled to access data - # """ - # # Grab metadat from stream_name = fluorescence for naming gr file - # fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) - # gr_fn = f'{fn_uid}_scattering.gr' - - # ### dummy test, e.g., CsPbBr2 - # if self.inputs.dummy_pdf[0]: - # gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr' - - # # Build pdf config file from a scratch - # pdfconfig = PDFConfig() - # pdfconfig.readConfig(self.inputs.cfg_fn[-1]) - # pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1] - # sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0], - # plot_setting={'marker':'.','color':'green'}, test=True, - # gr_fn=gr_fn) - # gr_data = sqfqgr_path['gr'] - - # ## Remove headers by reading gr_data into pd.Dataframe and save again - # gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ') - # gr_df.to_csv(gr_data, index=False, header=False, sep =' ') - - # self.gr_data = [] - # self.gr_data.append(gr_data) - # self.gr_data.append(gr_df) + This macro will + 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence + 2. Read pdf config file from self.inputs.cfg_fn[-1] + 3. Read pdf background file from self.inputs.bkg_fn[-1] + 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0] + 5. Read saved g(r) into pd.DataFrame and save again to remove the headers + 6. Update g(r) data path and data frame to self.gr_data + self.gr_data[0]: gr_data (path) + self.gr_data[1]: gr_df + + Args: + beamline_acronym (str): catalog name for tiled to access data + """ + # Grab metadat from stream_name = fluorescence for naming gr file + fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym) + gr_fn = f'{fn_uid}_scattering.gr' + + ### dummy test, e.g., CsPbBr2 + if self.inputs.dummy_pdf[0]: + gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr' + + # Build pdf config file from a scratch + pdfconfig = PDFConfig() + pdfconfig.readConfig(self.inputs.cfg_fn[-1]) + pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1] + sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0], + plot_setting={'marker':'.','color':'green'}, test=True, + gr_fn=gr_fn) + gr_data = sqfqgr_path['gr'] + + ## Remove headers by reading gr_data into pd.Dataframe and save again + gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ') + gr_df.to_csv(gr_data, index=False, header=False, sep =' ') + + self.gr_data = [] + self.gr_data.append(gr_data) + self.gr_data.append(gr_df) @@ -986,7 +1029,6 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM): print('*** qsever aborted due to too many bad scans, please check setup ***\n') ### Stop all infusing pumps and wash loop - sq = importlib.import_module("_synthesis_queue_RM") sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min', zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0]) @@ -994,10 +1036,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM): RM.queue_stop() elif (len(self.good_data) <= 2) and (self.inputs.use_good_bad[0]): - print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n') - - restplan = BPlan('sleep_sec_q', 5) - RM.item_add(restplan, pos=0) + print('*** Add another fluorescence scan to the front of qsever ***\n') scanplan = BPlan('take_a_uvvis_csv_q', sample_type=self.metadata_dic['sample_type'], @@ -1007,6 +1046,10 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM): precursor_list=qin.precursor_list, mixer=qin.mixer) RM.item_add(scanplan, pos=1) + + restplan = BPlan('sleep_sec_q', 5) + RM.item_add(restplan, pos=2) + RM.queue_start() elif (len(self.good_data) > 2) and (self.inputs.use_good_bad[0]): @@ -1021,7 +1064,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM): ## Add predicted new points from ML agent into qserver elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.agent_iteration[-1]): - print('*** Add new points from agent to the fron of qsever ***\n') + print('*** Add new points from agent to the front of qsever ***\n') new_points = self.macro_agent(qserver_process, RM, check_target=True) @@ -1034,7 +1077,6 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM): qin.sample = de._auto_name_sample(rate_list, prefix=qin.prefix[1:]) qin.infuse_rates = rate_list - sq = importlib.import_module("_synthesis_queue_RM") sq.synthesis_queue_xlsx(qserver_process) else: diff --git a/scripts/_synthesis_queue_RM.py b/scripts/_synthesis_queue_RM.py index 71e7bb8..7422a60 100644 --- a/scripts/_synthesis_queue_RM.py +++ b/scripts/_synthesis_queue_RM.py @@ -1,11 +1,11 @@ import numpy as np -import pandas as pd +# import pandas as pd # from bluesky_queueserver.manager.comms import zmq_single_request import _data_export as de from bluesky_queueserver_api.zmq import REManagerAPI from bluesky_queueserver_api import BPlan, BInst from ophyd.sim import det, noisy_det -from _LDRD_Kafka import xlsx_to_inputs +# from _LDRD_Kafka import xlsx_to_inputs ## Pass qsever parameters by xlsx_to_inputs ## Arrange tasks of for PQDs synthesis diff --git a/scripts/dev/print_kafka_with_main_consumer.py b/scripts/dev/print_kafka_with_main_consumer.py index 3a23558..498b3f7 100644 --- a/scripts/dev/print_kafka_with_main_consumer.py +++ b/scripts/dev/print_kafka_with_main_consumer.py @@ -29,96 +29,53 @@ plt.rcParams["figure.raise_window"] = False -def print_kafka_messages(beamline_acronym): - print(f"Listening for Kafka messages for {beamline_acronym}") +def print_kafka_messages(beamline_acronym_01, beamline_acronym_02): + print(f"Listening for Kafka messages for {beamline_acronym_01} & {beamline_acronym_02}") global db, catalog - db = databroker.Broker.named(beamline_acronym) - catalog = databroker.catalog[f'{beamline_acronym}'] + db = databroker.Broker.named(beamline_acronym_01) + catalog = databroker.catalog[f'{beamline_acronym_01}'] # plt.figure() # def print_message(name, doc): def print_message(consumer, doctype, doc): name, message = doc print( - f"{datetime.datetime.now().isoformat()} document: {name}\n" - f"document keys: {list(message.keys())}\n" - # f"contents: {pprint.pformat(message)}\n" + f"\n{datetime.datetime.now().isoformat()} document: {name}\n" + # f"\ndocument keys: {list(message.keys())}\n" + # f"\ncontents: {pprint.pformat(message)}\n" ) - # if name == 'start': - # print( - # f"{datetime.datetime.now().isoformat()} documents {name}\n" - # f"document keys: {list(message.keys())}") + if name == 'start': + print( + # f"\n{datetime.datetime.now().isoformat()} documents {name}\n" + f"\ndocument keys: {list(message.keys())}\n" + ) - # if 'uid' in message.keys(): - # print(f"uid: {message['uid']}") - # if 'plan_name' in message.keys(): - # print(f"plan name: {message['plan_name']}") - # if 'detectors' in message.keys(): - # print(f"detectors: {message['detectors']}") - # if 'pumps' in message.keys(): - # print(f"pumps: {message['pumps']}") - # # if 'detectors' in message.keys(): - # # print(f"detectors: {message['detectors']}") - # if 'uvvis' in message.keys() and message['plan_name']!='count': - # print(f"uvvis mode:\n" - # f" integration time: {message['uvvis'][0]} ms\n" - # f" num spectra averaged: {message['uvvis'][1]}\n" - # f" buffer capacity: {message['uvvis'][2]}" - # ) - # elif 'uvvis' in message.keys() and message['plan_name']=='count': - # print(f"uvvis mode:\n" - # f" spectrum type: {message['uvvis'][0]}\n" - # f" integration time: {message['uvvis'][2]} ms\n" - # f" num spectra averaged: {message['uvvis'][3]}\n" - # f" buffer capacity: {message['uvvis'][4]}" - # ) - # if 'mixer' in message.keys(): - # print(f"mixer: {message['mixer']}") - # if 'sample_type' in message.keys(): - # print(f"sample type: {message['sample_type']}") - - # if name == 'stop': + elif name == 'event': + print( + # f"\n{datetime.datetime.now().isoformat()} documents {name}\n" + f"\ndocument keys: {list(message.keys())}\n" + ) + + elif name == 'stop': # # print('Kafka test good!!') - # print(f"{datetime.datetime.now().isoformat()} documents {name}\n" - # f"contents: {pprint.pformat(message)}" - # ) - # # num_events = len(message['num_events']) + print( + # f"\n{datetime.datetime.now().isoformat()} documents {name}\n" + f"\ndocument keys: {list(message.keys())}\n" + f"\ncontents: {pprint.pformat(message['num_events'])}\n" + ) - # time.sleep(2) - # uid = message['run_start'] - # print(f'\n**** start to export uid: {uid} ****\n') - # # for stream_name in ['primary', 'absorbance', 'fluorescence']: - # # if stream_name in message['num_events'].keys(): - # # qepro_dic, metadata_dic = de.read_qepro_by_stream(uid, stream_name=stream_name, data_agent='catalog') - # # de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name) - # # print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(csv_path)} **\n') - # # u = plot_uvvis(qepro_dic, metadata_dic) - # # u.plot_data() - # # print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') - - # # if qepro_dic['QEPro_spectrum_type'][0] == 2: - # # print('\n*** start to identify good/bad data ***\n') - # # x, y, p, f, popt = da._fitting_in_kafka(qepro_dic, metadata_dic, key_height=200, distance=100, height=50) - - # # ff={'fit_function': f, 'curve_fit': popt} - # # de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff) - # # print(f'\n** export fitting results complete**\n') - - # # u.plot_peak_fit(x, y, p, f, popt, fill_between=True) - # # print(f'\n** plot fitting results complete**\n') - - # print('\n*** export, identify good/bad, fitting complete ***\n') - # print('########### Events printing division ############\n') + print('\n########### Events printing division ############\n') kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml") # this consumer should not be in a group with other consumers # so generate a unique consumer group id for it - unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}" + unique_group_id = f"echo-{beamline_acronym_01}-{str(uuid.uuid4())[:8]}" kafka_consumer = BasicConsumer( - topics=[f"{beamline_acronym}.bluesky.runengine.documents"], + topics=[f"{beamline_acronym_01}.bluesky.runengine.documents", + f"{beamline_acronym_02}.bluesky.runengine.documents"], bootstrap_servers=kafka_config["bootstrap_servers"], group_id=unique_group_id, consumer_config=kafka_config["runengine_producer_config"], @@ -134,4 +91,4 @@ def print_message(consumer, doctype, doc): if __name__ == "__main__": import sys - print_kafka_messages(sys.argv[1]) + print_kafka_messages(sys.argv[1], sys.argv[2]) diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index dfd300c28de7aca2b98a2840b48fe5cda2a1b227..32f207941f1e8cbd1afe84a0a040c45907553a04 100644 GIT binary patch delta 4523 zcmZ9QcU%)o*T)kjlt3s_q)P9E-b5^+34{)z2T-K<5(OeGB2@%|C|w|eAVpf}Tmk7- z1(7brPz1z6lP0{tz3+3c?mxS8_IqaM^E-29_DtzZaN$cZ%m8_ogaJfOP7aE$rmukU zfzKY_ak5}OK(9@bHvF5i`mL>WMKwdACq!ttA}UPv+7yqymnH8}&$P+%r2REl>|#x> zMR22xd!xBAJGa0ZTHmuconbc5Y+?<&F0Y*8X2i#!M?vJ+oF|EK--Ti0u3S@t6?T)P z1o>X0V$;SchcXuS*7UuZ&t*(?m@>YqYuSoRAwZ&ls>0Ry*05QI7t)#c<{#Bv`~={8 z`?8OQ_PwELUE?5m=6fwHuO8mNeuLEG-W`wN?56}r8g6DdWYK6x56}qMeWlV^^zv(w zWA%Gao6biVh1(uJ)sl-0wp@udTPr>z8c$%kqmmjD9Y4GbkYwaTEbz52AP^`R3<8~M zLjM1v5YT6y{H@B?S#h9?$)EscR|&r|a4RMqj$Q3#Y(%(B>}`EByY|k*(d%cHU(i=G z;&_GFUzeSEMbdKYt|gEqnWZHJhK&^0Uqxh9=s1?V;N$yDLOL+im@3*U3X^Cs%6SPQ zUgUKVGNYKVEEejF^uphAoMjkF%+OBhZ!lRhmpbh5ZQ8FFBE=lpD+((0q>kJhnP87TOuyUaPE~rYEZ>cJT!nrsO7a;U{>J77qU|@g zHu+M2OF~L}2MaS$8d3Q*625!ZJMi_bCO=Aj#NMg{?=pVxF+}F}TznHuK8rA3fEJCX3 z2_a;+yvZ%r-FrBZx%A~V5ElPykxyxtJieDgSZ{~6swNmLKqqg6bBuIFjRD4!iCdwTBE z!|qGFNV2cnehJ-$;*XTyZitGz|LDE>>*D}rq06t){sbQRcRDdac9KpGYV9HHQ3yr0 z2!w><#{T7*tDM`@KgrsrKi#d!*-jmBB6jGrUXa76G+WRsHLI)QzLQo?I=Rkz$Y{P< zNw6MrJzVXc3ZBsiz$Z!uqcIq5m6NExH-MS@;)Hmx;}QV_`L4 ziH>uKEB5kejpl_x#hz@vH7}a?KUF71F21qP9Lmg`kjSe#$~us@G2FI(Lot`^osd$n zow)&gYWL}N)|0m0c1HM0N#}~SSzA{M7KkywuGlk3R_ey=>qX+GN|iIqRqwtV7mMkJ z#4)k-1TKUpGWa2yGX>S_x3^`kbldiS*Ve%d6d-;)V;$ewI5R7OAA zCCa!C$iz}}C*RoqOK6(1DC*KjP@zW!~#^BXc+$Xe(UMe9pA&#|yh}_Xc43o}qH#bS zc%)K%#FCqCMB$!xq|Ga9bRr8=-I6&Ah~?yP)PFnl#2nm%}+W zBB4##CofPDps9~reY-+zKtY3IZ9l6l^Jt)@M7*M|YiB>#+QT@c56kAYK$2_q23l*% z%f+=i7RCHCB3B5eyQ&Ln)SkWqXh316j39$5*2rrdQThD+a?Gx^zbr8+u84muVdo?5|?K44IF52HCl zJH)mbdH6$>6w}=zi`*38Zd1DD3|;=Y_lp?CKFqzpAC4))Sgq}*5_?hypL7hT7oRah z-lca7{^rae@+B)nOPvfy@0a{~KHo$ne8*e3q%Y>u?c}7a9SLR+=iPnhm|>2LRGR2O zwwq8NQo3%P%b!z&K+(Ib7$BOwe|Z0?c;uBFM%VU?XlpBRs1G$AKG5~PUV%Kwc-76A z4fH81ZKvG$lj^m*IdoCc__VsE+jn`pLs_d6HJTuUW+oZGrsICV$J$;E`eF8_@x`60 z==awXnwla;n9E`i6EL*B0e$Y8(|G}%cb(QF%*`?56>7s zF(+Cepwcuogg^zcM5Q9=iOZHOk|zV88VH%NjBEH6A`{k8XA?J?YDiE2S(`-?U-}sQ ze)Nf@eddHTB)K^M`E!m5s}>HcMwxln{x|m)&X|kebAXDO=_0X_A4B%l%iA}$)*Ct> zKAdrOwXlh>cIUEs>fgs;UYed*7;tcI1m$prYF|cSmG}C8{ zE~Wk9p<8HZ*CTEQA+WB0mB$^|=!hzFGl8oUqyZkS&TO-B+{v|~7%Xy#D52Np@*u=> zuyr_QwSM+W2;@W7!t~Ac9YXj5IwvJbpg(?{8iTD2xAQq43TqDqvvoHSpYJUy0Xrm6 z43;FqZZQ8Fm6O230ymI;(bR<@l;Vye4n!`J!HU6l_(ke-C3yI$o96g0LyLB3=V^& zon2KBx?4(}d4&?PTQoyetM3&!nJyW;pQY7>aox`sU>G(j%g{+DRe3+lrQ3Mvxm(xT z*Y@HEtFj>naPy>%MwKmk3%@W2-r<^=1((UyXI5V+W)2i}E!JZf*$bBVmVj`%kY1Z} zoj7u8Mby$8!Tzjsbc!erDNnDQAO&-|vMv6Ch?QaJpxuVP^SWj|?Q2^8G8yea z;>5T=3=S=Bf!z4nBVi~KgfPwHDtc^mu1Fl35QOr49V)e5d(mZ*R)Z^svo zNud{L{E}uUAZniGJR}Baz^dRjg2hR3?j}??KI`h@kDr093c5(se(&6?LOd=(dNSjO zs9x%9>oF$&w3RQLCe{owVhU3p)HTV`OcaLA&E#;A>U?cpQ9VbQ#DYxGBP(sU4K%y0 zc&(_9biR)@=CbbeT*;1i>0#4;Mb1H+YEt6Ro{s5=S^C^TS=qdcdg4t}m`G;Sa9>vi z0mFOQ^wwea^}?9v(}1C;o2`ApMSY)om(LjoWG5T5S1VmKD;WsHAoRz^e6pJo#FgC0 z{eJ3a@Xhi`vb?h!~5OA`wty%4+UVw!%)@+7l}~`EZLsg%TEQf~t|H zkeQYR_peF2z@<&GyEJW|@A0A!TX&iAoLLqqsqxf;pfUd5tj(VdEQq#ttkexX} zcsq3}eq31x7g(c7rvJLZntmCR8vk0vJdPOn4H&t&7s;+k;oRY*OuBw)Ka75>m|w9P z1w6+hr>cCxSbRpXsQHJDm1O7Zv0=le)!I%G*`d4#1=V{q51i`NhQ^!X4ac#8t#Y^5sZ5shIwrk|lVj{@J_`C6N_fS1`8yt`Pldv_*_!QTLY9Ka zAI`8h+t>cMBu~6kdR>7{AlvTS_8gCVY#_sm9Dz}GA-J}lU*3-rx>X5Chxqx%d=y9- z(_t5qwP~EVMvoW1ondoR2A*T^NY^yMaXx}kmhtJ7W9nExm5^*J4t9&#<*crA|u;e;jo*4eC z4{gk{jQkFKx|28)egt-t0L!TL2v}}&V)c#9+C_Plpzgma!c!OEP z3i8PH$J;D)a7$D)P{dFMJIK432?|fyjk6{04apb z<(9RQwL(6o(j!I_*C88N=mX@3Wx6*^!QZX}V&=K=Trs6-p)dWFCksi>1t#-6j#&Oq z#9DQR2a7N_*lov++I@e`@k_lyAgt-F>bASPuvf&F#hr$-5$&WIOh5ai;NE+@ZR^jx zF{@H$#xE6zO}txhbATHA)L7CQ!~Jw0MhvqTX-Pfjb>Yfz$6F3YRr+`{Z{?Lc;|Z02 z**$T0a&_p_hS~2YTQ7f!zcX7-ngV-51D=cBu+@++!TU-j`fDt;i!MlvnNPyS4B(H$ z&g*r}O+MT452&v;Lz*(6*=XEf3`()zKG!%m7f?c_7bRcu0pK{vWjO02xgt^!$W`Ev zTqbZB^#A?i{KZfnc3ibs{IAf&Nf=3So>-W7Ra?xCYaKxn5QzS`1pW6i!~sS;r_6ur zA-aEosIcDf8cIfaW(w7N=x<;dI5ZGG+r$ zpL!H9uL~ZZ4uIy5mnnwdJos_V#=NJ@Fu?7HJ7%6=1c8`;W4v%+6T#D7MNQbw zXma8zP>eXV3Eip5$Ak_12p4A}&2=IJVo1jA%5jl`VEMC?N`X_iO%u9P(4OKWBj>T$ z`rLnl@?xi;66$a8zbowT!^)681w&DEr{h=SNSo0)UW#3X|Lk!eSC0C7{1Yg)GjWJh zw-y9M%m4{K!*DFfKPM*^|D6H9e~V4)#7RKPp^iZeVvELp{Iua8G1@wn5h8i JnBj?<{|7s9AnWo56pC3{_2S!M4_MiNrE_9ZhuvOb>)Lyd zN+|r$@Anthcn(53Kp$ zdeOV!h{GJEFdJpj+4@#ExVoj@IW7974VaTbfM8kY&;n5LK9Wq1zY#KZ9{%vXqNfBK`zFCkcOXZ{J5QtW; zooy61gF|(2B)tD-2v1E_VSZV`&C;CBm-K^dCpG#tqNKZz z!OhQB_Q*0Bqr9mY|cY~l?Nj}P4-t)wjRZD#lCG4xaTk->K#Xfq|VO-OC zfkhmEnKSu)NdE;IYskqSn+&z(!2VT;iqiAzM#3v9Rc8=g>^ff-2%#Q^X=P`s$?s2l zDRMU~RdPj0=8NJ8gE8encrIra&{}YP`L%%q^bhuo7A^VV#IZ3~m=YK~m%*Ook9QFc zJuk^b*;LpN8beH9 z6sV3!LY~!`#nOIhi!?F7KH*Zc6dcN&Z|XQi$hmL^QQkRwNPM>5&0NEALfJJ-RlYcm zlz**q-O`CnbTyD%hO=*S_5qprCQj%Jj;wCF>JP*+#W7$sOhGW`t(fg!(X%;IDy&|a zD!_{yWBb|hvo)KltOpBqjt5mr5@~uU5`5)ju|@+!+W;eP!g8=Q&qu~Mul+Za*sT?g zI2aF`Px^|n`Ph6y`*D5BRfD19k|fT%j+sf+-&-vEy2HayZczriLP>)yT$g|NH37Fb z8+X8AjqOL)id!Nn{T(ke#AV^@ApsST0I8Q@kU0vQn*QOI$tSU?Bb>E4Bfm!DYJWCm zoZ?;(*>fOI?08P1A;Eu%1Xpa3-?=>26{3`?#Ft6m4B@WGq%Z$x;=|5Ccu93C565ra zX2^4bzjbZ-Z(oX0=Z1iYptqA_qnVR?$}OtXfaah%<(2JaxI*AVgHFl6UZQ$)wDZ(U z#C~Xlq@Li0la4zdZ|G8w5u!J}omr%ZxQp35+2!~?noaM`$X5)TQQw1z%U3m~JKX1v zu}fXl%{!=hvGy~|DS>50M8{r7XX~oL1fUzhTyn!ta3#c5^Hvwo1M}QU%A!K+Nu6&$dTQeOzRenIIt(46Z z@+hy#NAwj9XO>odd)Mah>1rDG(4Ry<9H1`0$qd|IO2y;`&2qIbO=O5#lyNs{Be;rF z1>g(Db<%ljReDPN=;fB?J8IR?(nWj08`DU}nfHr_dq8>)_dK`rv{%h+0WsnXDfii9 zfdeAepjMm1(WYNrEkv*YlX>Ht+r}ec6Yh@8MWk2Lsc%N_&VU^^uD%=h4}oPNUtl}F z+di##PVk+czgnkP92G-DLH*8@-`!xw&r(OVaj}V{83Sw%1h)>ajwy0yY!DDpQ5dd zddbgdS=x8Fjhi)3xx~1uFj#zNq4;$cNv^_cT!8YO#eZ@?UswKj@Pc#u5uMtWm`KGNd_5CAT-nEFcpuw=$%`valrph9Y0uyUEbE#~pp7#z1 zzXA$~0uxL0Uxc@4HO{PAui}|ysWe1%RV`&xtx#P_&V#Oc!108unxIu6j_srfunf;_ z{EtLTZs82c5q{2{;2`pReP;W``&9mStHyVx7-oXxu*`DCc{{9iaM@uAfBdzL`*zWL z$qYGi0}_N1A`=0gR&DrBy^o{!QW9D32Lq$udpg7#oW7l8b>M6FDvWl|+U7evZi(_h z-_#m=R_}+A7 zzC06ci8F#F%7PXz%(p`;pRR9@p;>D-zKXKngIHy*SkDDgZ%ICHXgYP93P&iW z$G*-3l0vUFWCfgU4ebOCN9-PSW?09h80EpYG3K79Je2%=nYpK&$xI4_+S)hr3oJSl z#lMH7L`kyX;uqJ{tv1Av3YzBryf|A#tqSS{J?3h?`?7dbzw%$*r`53vSb&NI> zXd(iP^TW6tbV@fvCZG+nPALyOdgSI(O7eIC&49twg>R{gsihrn#x>i#sarvL@TsGa zaTphFULjA%7KPyt!lLD>ro~D7Lp<0*+Y8}C)FM3v=2uC6hm1~zuOFoW zdJ2nSJW4nt)lcD9?-)-)F3U$&) z7WC=pw%?8GLVTE7wseTFi=+BZ5jBu);7}-DK5ql1@#(JJZ+=$^ooU}&Lex`Vf5g;j zG&!`$+7}64SzoDVC+n7%zi zmOdNb_@9A4tUmW}yq*0_cHqHA)mfoL9O^@|`XI2du&{1k@Bpu8>L>26T@<}FQSX2w zTka>>`#HgRmR6Vu4;igaOFI6}y#_Dn28+KkWe$GjyciqaQJQ~i#PJAq>O^7l=SCjq zi9u551GJ@4f6@JHRsm9j`y)5g=mKzG?Ph#g2l&#LsMT$~tdXv-HGcfEh}JI=DYv`x z(li;>rO5-jTd}Q)1Oai3udi=Mbvc>5;*aSs_|(Mq+enimb)3bt9G?)?$RE zx&>0Fw`@LvyMRMko1kwukJ`cpXSybYkMFxPbIRFRm@1O}mDRcmTi&kaWHH|#c=f7& zo2<(Hkda_?SDx&apxtErVXXt5eSeU`0Ft(%PGHim8q+f_Cxib!s})2*cjY|}MJo)3 zFMxR|;b*`O>m+5?ad4Z=S7CHfE#2Jpv0r=-*v}8BZUf;AO+>tGBD2Uk`EIdEG&ri6T9j%W zh*gz8eiv5euZi*S?38ge3GNB&ei69aek62_`HNm{qMGzu_xXhVtB%E6^z`}k8m(k` z3%gGG&!ii$6Gc3Hk(d}F=bMF z2EE?jP2`F`l;XU5G5x#YMBOwHxy*7417?sO%GZF4{X+il4(Q5xWBSjximEr@xU`r) zw}?j_8?caESX@5!QS2}}c`;HP9B)ST)5RQh-xY0bQA|4(T#_%_4rVgx diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index 3d06841..4dfcf4a 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -6,30 +6,20 @@ from bluesky_kafka.consume import BasicConsumer import matplotlib.pyplot as plt import numpy as np -import pandas as pd -from scipy import integrate -import time -import databroker -import glob -from tqdm import tqdm -# from diffpy.pdfgetx import PDFConfig from tiled.client import from_uri, from_profile import resource resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) -import _data_export as de -from _plot_helper import plot_uvvis -import _data_analysis as da -import _pdf_calculator as pc -# import _get_pdf as gp - import importlib LK = importlib.import_module("_LDRD_Kafka") sq = importlib.import_module("_synthesis_queue_RM") +de = importlib.import_module("_data_export") +# da = importlib.import_module("_data_analysis") +plot_uvvis = importlib.import_module("_plot_helper").plot_uvvis from bluesky_queueserver_api.zmq import REManagerAPI -from bluesky_queueserver_api import BPlan, BInst +# from bluesky_queueserver_api import BPlan, BInst try: from nslsii import _read_bluesky_kafka_config_file # nslsii <0.7.0 @@ -142,92 +132,20 @@ def print_message(consumer, doctype, doc): name, message = doc # print(f"contents: {pprint.pformat(message)}\n") - ######### While document (name == 'start') and ('topic' in doc[1]) ########## - ## ## + ## macro_00: print metadata when doc name is start and reset self.uid to an empty list + ######### While document (name == 'start') and ('topic' in message) ######### ## Only print metadata when the docuemnt is from pdfstream ## - ## ## + ## reset self.uid to an empty list ## ############################################################################# - if (name == 'start') and ('topic' in doc[1]): + if (name == 'start') and ('topic' in message): print( - f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" + f"\n\n{datetime.datetime.now().isoformat()} documents {name}\n" f"document keys: {list(message.keys())}") - - if 'uid' in message.keys(): - print(f"uid: {message['uid']}") - if 'plan_name' in message.keys(): - print(f"plan name: {message['plan_name']}") - if 'detectors' in message.keys(): - print(f"detectors: {message['detectors']}") - if 'pumps' in message.keys(): - print(f"pumps: {message['pumps']}") - if 'detectors' in message.keys(): - print(f"detectors: {message['detectors']}") - if 'uvvis' in message.keys() and message['plan_name']!='count': - print(f"uvvis mode:\n" - f" integration time: {message['uvvis'][0]} ms\n" - f" num spectra averaged: {message['uvvis'][1]}\n" - f" buffer capacity: {message['uvvis'][2]}" - ) - elif 'uvvis' in message.keys() and message['plan_name']=='count': - print(f"uvvis mode:\n" - f" spectrum type: {message['uvvis'][0]}\n" - f" integration time: {message['uvvis'][2]} ms\n" - f" num spectra averaged: {message['uvvis'][3]}\n" - f" buffer capacity: {message['uvvis'][4]}" - ) - if 'mixer' in message.keys(): - print(f"mixer: {message['mixer']}") - if 'sample_type' in message.keys(): - print(f"sample type: {message['sample_type']}") - ## Reset kafka_process.uid to an empty list - kafka_process.uid = [] - - ## macro_01: get iq from sandbox - ######### While document (name == 'event') and ('topic' in doc[1]) ########## - ## key 'topic' is added into the doc of xpd-analysis in pdfstream ## - ## Read uid of analysis data from doc[1]['data']['chi_I'] ## - ## Get I(Q) data from the integral of 2D image by pdfstream ## - ############################################################################# - if (name == 'event') and ('topic' in doc[1]): - # print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" - # f"contents: {pprint.pformat(message)}") - - iq_I_uid = doc[1]['data']['chi_I'] - kafka_process.macro_01_get_iq(iq_I_uid) - - - - #### While document (name == 'stop') and ('scattering' in message['num_events']) #### - ## Acquisition of xray_uvvis_plan finished but analysis of pdfstream not yet ## - ## So just sleep 1 second but not assign uid, stream_list ## - ## No need to stop queue since the net queue task is wahsing loop ## - ##################################################################################### - if (name == 'stop') and ('scattering' in message['num_events']): - print('\n*** qsever stop for data export, identification, and fitting ***\n') - print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" - f"contents: {pprint.pformat(message)}" - ) - ## wait 1 second for databroker to save data - time.sleep(1) - ## Reset kafka_process.uid to an empty list - kafka_process.uid = [] - - - ## macro_02: get raw data uid from metadata of sandbox doc - #### (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0) #### - ## With taking xray_uvvis_plan and analysis of pdfstream finished ## - ## Sleep 1 second and assign uid, stream_list from kafka_process.entry[-1] ## - ## No need to stop queue since the net queue task is wahsing loop ## - ##################################################################################### - elif (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0): - print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" - f"contents: {pprint.pformat(message)}" - ) - kafka_process.macro_02_get_uid() + kafka_process.macro_00_print_start(message) - ## macro_03: stop queue and get uid for take_a_uvvis scan + ## macro_01: stop queue and get uid for take_a_uvvis scan ######### (name == 'stop') and ('take_a_uvvis' in message['num_events']) ########## ## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream ## ## Stop queue first for identify good/bad data ## @@ -236,25 +154,59 @@ def print_message(consumer, doctype, doc): elif (name == 'stop') and ('take_a_uvvis' in message['num_events']): print('\n*** qsever stop for data export, identification, and fitting ***\n') - print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" + print(f"\n\n{datetime.datetime.now().isoformat()} documents {name}\n" f"contents: {pprint.pformat(message)}") - kafka_process.macro_03_stop_queue_uid(RM, message) + kafka_process.macro_01_stop_queue_uid(RM, message) + + + + ## macro_02: get iq from sandbox + ######### While document (name == 'event') and ('topic' in message) ########## + ## key 'topic' is added into the doc of xpd-analysis in pdfstream ## + ## Read uid of analysis data from message['data']['chi_I'] ## + ## Get I(Q) data from the integral of 2D image by pdfstream ## + ############################################################################## + elif (name == 'event') and ('topic' in message): + # print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" + # f"contents: {pprint.pformat(message)}") + + iq_I_uid = message['data']['chi_I'] + kafka_process.macro_02_get_iq(iq_I_uid) + + + + ## macro_03: get raw data uid from metadata of sandbox doc + #### (name == 'stop') and ('topic' in message) and ('primary' in message['num_events']) #### + ## With taking xray_uvvis_plan and analysis of pdfstream finished ## + ## Sleep 1 second and assign uid, stream_list from kafka_process.entry[-1] ## + ## No need to stop queue since the net queue task is wahsing loop ## + ############################################################################################ + elif (name == 'stop') and ('topic' in message) and ('primary' in message['num_events']): + print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" + f"contents: {pprint.pformat(message)}" + ) + kafka_process.macro_03_get_uid() - ############## (name == 'stop') and (type(kafka_process.uid) is str) ############## - ## ## - ## When uid is assigned and type is a string, move to data fitting, calculation ## - ## ## + ############## (name == 'stop') and uid_is_str and check_event_name ############## + ## Move to data fitting, calculation when uid is assigned, its type is a string, ## + ## and stream name is 'primary': for xray_uvvis_plan2 after pdfstream ## + ## or stream name is 'take_a_uvvis': for take_a_uvvis (raw data) ## ##################################################################################### - if (name == 'stop') and (type(kafka_process.uid) is str): + uid_is_str = (type(kafka_process.uid) is str) + try: + check_event_name = ('primary' in message['num_events'] or 'take_a_uvvis' in message['num_events']) + except KeyError: + check_event_name = False + + if (name == 'stop') and uid_is_str and check_event_name: print(f'\n**** start to export uid: {kafka_process.uid} ****\n') print(f'\n**** with stream name in {kafka_process.stream_list} ****\n') ## macro_04 ~ macro_07 or 08 #################### 'scattering' in kafka_process.stream_list ################### - ## ## ## Process X-ray scattering data (PDF): iq to gr, search & match, pdf fitting ## ## obtain phase fraction & particle size from g(r) ## ##################################################################################### @@ -265,13 +217,13 @@ def print_message(consumer, doctype, doc): beamline_acronym=beamline_acronym_01) u = plot_uvvis(kafka_process.qepro_dic, kafka_process.metadata_dic) - ## macro_04: setting dummy pdf data for test, e.g., CsPbBr2) + ## macro_04: setting dummy pdf data for test, e.g., CsPbBr2 if kin.dummy_pdf[0]: kafka_process.macro_04_dummy_pdf() - # ## macro_05: do i(q) to g(r) through pdfstream - # if kin.iq_to_gr[0]: - # kafka_process.macro_05_iq_to_gr(beamline_acronym_01) + ## macro_05: do i(q) to g(r) through pdfstream + if kin.iq_to_gr[0]: + kafka_process.macro_05_iq_to_gr(beamline_acronym_01) ## macro_06: do search and match if kin.search_and_match[0]: @@ -418,6 +370,11 @@ def print_message(consumer, doctype, doc): if (name == 'stop') and ('fluorescence' in kafka_process.stream_list): kafka_process.save_kafka_dict('/home/xf28id2/Documents/ChengHung/kafka_dict_log') + + + ## Reset kafka_process.uid to an empty list for event doc identification + kafka_process.uid = [] + kafka_process.stream_list = [] kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")