Skip to content

Commit

Permalink
test export and save kafka info as log
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 14, 2024
1 parent dd6a507 commit fc16ef1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 7 deletions.
41 changes: 36 additions & 5 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import importlib
import time
import pprint
import json

import _data_export as de
from _plot_helper import plot_uvvis
import _data_analysis as da
import _pdf_calculator as pc
from scipy import integrate

from scipy import integrate
import torch
# from diffpy.pdfgetx import PDFConfig
# import _get_pdf as gp
Expand Down Expand Up @@ -59,7 +60,7 @@ def _kafka_process():
process_list=[
'agent', 'sandbox_tiled_client', 'tiled_client',
'entry', 'iq_data', 'stream_list',
'uid', 'uid_catalog', 'uid_pdfstream', 'uid_sandbox',
'uid', 'uid_catalog', 'uid_bundle', 'uid_pdfstream', 'uid_sandbox',
'gr_data', 'pdf_property', 'gr_fitting',
'qepro_dic', 'metadata_dic', 'sample_type',
'PL_goodbad', 'PL_fitting', 'abs_data', 'abs_fitting',
Expand Down Expand Up @@ -144,12 +145,39 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False
pass


def save_kafka_dict(self, home_path, reset_uid_catalog=True):

date, ttime = de._readable_time(self.metadata_dic['time'])
json_fn = f'{date}-{ttime}_{self.uid[0:8]}.json'
json_path = os.path.join(home_path, json_fn)

key_to_save = [
'uid', 'uid_catalog', 'uid_bundle', 'uid_pdfstream', 'uid_sandbox',
'metadata_dic', 'pdf_property', 'optical_property',
'agent_data', 'agent_iteration', 'finished', ]

kafka_process_dict = {}
for key in key_to_save:
kafka_process_dict.update({key: getattr(self, key)})

with open(json_path, "w") as f:
json.dump(kafka_process_dict, f, indent=2)
json.dump(self.print_dic, f, indent=2)

print(f"\nwrote kafka info to {home_path}\n")

if reset_uid_catalog:
self.reset_kafka_process(['uid_catalog'])



## Reset attributes of keys in _kafka_process() to empty lists for next event
def reset_kafka_process(self):
for key in _kafka_process():
def reset_kafka_process(self, keys):
for key in keys:
setattr(self, key, [])



def auto_rate_list(self, pump_list, new_points, fix_Br_ratio):
"""Auto transfer the predicted rates in new_points to a rate_list for qserver
Expand Down Expand Up @@ -866,6 +894,9 @@ def macro_15_save_data(self, stream_name):
plqy_dic = self.plqy_dic)
print(f'\n** export fitting results complete**\n')

if stream_name == 'fluorescence':
self.uid_bundle.append(self.uid)

## Save processed data in df and agent_data as metadta in sandbox
if self.inputs.write_to_sandbox[0] and (stream_name == 'fluorescence'):
df = pd.DataFrame()
Expand Down Expand Up @@ -894,7 +925,7 @@ def macro_15_save_data(self, stream_name):
if self.inputs.write_agent_data[0] and (stream_name == 'fluorescence'):
# agent_data.update({'sandbox_uid': sandbox_uid})
with open(f"{self.inputs.agent_data_path[0]}/{self.PL_goodbad['data_id']}.json", "w") as f:
json.dump(agent_data, f, indent=2)
json.dump(self.agent_data, f, indent=2)

print(f"\nwrote to {self.inputs.agent_data_path[0]}\n")

Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
5 changes: 4 additions & 1 deletion scripts/kafka_consumer_iterate_1LL09_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from scipy import integrate
import time
import databroker
import json
import glob
from tqdm import tqdm
# from diffpy.pdfgetx import PDFConfig
Expand Down Expand Up @@ -383,6 +382,10 @@ def print_message(consumer, doctype, doc):
#####################################################################################
kafka_process.macro_17_add_queue(stream_name, qserver_process, RM)


if (name == 'stop') and ('fluorescence' in kafka_process.stream_list):
kafka_process.save_kafka_dict('/home/xf28id2/Documents/ChengHung/kafka_dict_log')


kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")

Expand Down
1 change: 0 additions & 1 deletion scripts/kafka_consumer_iterate_XPD_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from scipy import integrate
import time
import databroker
import json
import glob
from tqdm import tqdm
# from diffpy.pdfgetx import PDFConfig
Expand Down

0 comments on commit fc16ef1

Please sign in to comment.