diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 7f463fa..30d3672 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -6,13 +6,14 @@ import importlib import time import pprint +import json import _data_export as de from _plot_helper import plot_uvvis import _data_analysis as da import _pdf_calculator as pc -from scipy import integrate +from scipy import integrate import torch # from diffpy.pdfgetx import PDFConfig # import _get_pdf as gp @@ -59,7 +60,7 @@ def _kafka_process(): process_list=[ 'agent', 'sandbox_tiled_client', 'tiled_client', 'entry', 'iq_data', 'stream_list', - 'uid', 'uid_catalog', 'uid_pdfstream', 'uid_sandbox', + 'uid', 'uid_catalog', 'uid_bundle', 'uid_pdfstream', 'uid_sandbox', 'gr_data', 'pdf_property', 'gr_fitting', 'qepro_dic', 'metadata_dic', 'sample_type', 'PL_goodbad', 'PL_fitting', 'abs_data', 'abs_fitting', @@ -144,12 +145,39 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False pass + def save_kafka_dict(self, home_path, reset_uid_catalog=True): + + date, ttime = de._readable_time(self.metadata_dic['time']) + json_fn = f'{date}-{ttime}_{self.uid[0:8]}.json' + json_path = os.path.join(home_path, json_fn) + + key_to_save = [ + 'uid', 'uid_catalog', 'uid_bundle', 'uid_pdfstream', 'uid_sandbox', + 'metadata_dic', 'pdf_property', 'optical_property', + 'agent_data', 'agent_iteration', 'finished', ] + + kafka_process_dict = {} + for key in key_to_save: + kafka_process_dict.update({key: getattr(self, key)}) + + with open(json_path, "w") as f: + json.dump(kafka_process_dict, f, indent=2) + json.dump(self.print_dic, f, indent=2) + + print(f"\nwrote kafka info to {home_path}\n") + + if reset_uid_catalog: + self.reset_kafka_process(['uid_catalog']) + + + ## Reset attributes of keys in _kafka_process() to empty lists for next event - def reset_kafka_process(self): - for key in _kafka_process(): + def reset_kafka_process(self, keys): + for key in keys: setattr(self, key, []) + def auto_rate_list(self, pump_list, new_points, fix_Br_ratio): """Auto transfer the predicted rates in new_points to a rate_list for qserver @@ -866,6 +894,9 @@ def macro_15_save_data(self, stream_name): plqy_dic = self.plqy_dic) print(f'\n** export fitting results complete**\n') + if stream_name == 'fluorescence': + self.uid_bundle.append(self.uid) + ## Save processed data in df and agent_data as metadta in sandbox if self.inputs.write_to_sandbox[0] and (stream_name == 'fluorescence'): df = pd.DataFrame() @@ -894,7 +925,7 @@ def macro_15_save_data(self, stream_name): if self.inputs.write_agent_data[0] and (stream_name == 'fluorescence'): # agent_data.update({'sandbox_uid': sandbox_uid}) with open(f"{self.inputs.agent_data_path[0]}/{self.PL_goodbad['data_id']}.json", "w") as f: - json.dump(agent_data, f, indent=2) + json.dump(self.agent_data, f, indent=2) print(f"\nwrote to {self.inputs.agent_data_path[0]}\n") diff --git a/scripts/inputs_qserver_kafka_v2.xlsx b/scripts/inputs_qserver_kafka_v2.xlsx index 379870b..fe6a4cc 100644 Binary files a/scripts/inputs_qserver_kafka_v2.xlsx and b/scripts/inputs_qserver_kafka_v2.xlsx differ diff --git a/scripts/kafka_consumer_iterate_1LL09_v2.py b/scripts/kafka_consumer_iterate_1LL09_v2.py index 27d4607..102cc2d 100644 --- a/scripts/kafka_consumer_iterate_1LL09_v2.py +++ b/scripts/kafka_consumer_iterate_1LL09_v2.py @@ -10,7 +10,6 @@ from scipy import integrate import time import databroker -import json import glob from tqdm import tqdm # from diffpy.pdfgetx import PDFConfig @@ -383,6 +382,10 @@ def print_message(consumer, doctype, doc): ##################################################################################### kafka_process.macro_17_add_queue(stream_name, qserver_process, RM) + + if (name == 'stop') and ('fluorescence' in kafka_process.stream_list): + kafka_process.save_kafka_dict('/home/xf28id2/Documents/ChengHung/kafka_dict_log') + kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml") diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index 7d6531e..df26d71 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -10,7 +10,6 @@ from scipy import integrate import time import databroker -import json import glob from tqdm import tqdm # from diffpy.pdfgetx import PDFConfig