Skip to content

Commit

Permalink
test macros with 100 mM TOABr
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 21, 2024
1 parent 4e18def commit 19608b2
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 75 deletions.
128 changes: 69 additions & 59 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
da = importlib.import_module("_data_analysis")
pc = importlib.import_module("_pdf_calculator")

from diffpy.pdfgetx import PDFConfig
gp = importlib.import_module("_get_pdf")
# from diffpy.pdfgetx import PDFConfig
# gp = importlib.import_module("_get_pdf")

build_agent = importlib.import_module("prepare_agent_pdf").build_agent
import torch
Expand Down Expand Up @@ -45,7 +45,8 @@ def _kafka_inputs():
inputs_list=[
'dummy_kafka', 'csv_path', 'key_height', 'height', 'distance', 'PLQY',
'rate_label_dic_key', 'rate_label_dic_value', 'new_points_label',
'use_good_bad', 'post_dilute', 'fix_Br_ratio', 'write_agent_data', 'agent_data_path',
'use_good_bad', 'post_dilute', 'fix_Br_ratio',
'write_agent_data', 'agent_data_path', 'build_agent',
'USE_AGENT_iterate', 'peak_target',
'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn',
'search_and_match', 'mystery_path', 'results_path',
Expand All @@ -67,7 +68,7 @@ def _kafka_process():
'qepro_dic', 'metadata_dic', 'sample_type',
'PL_goodbad', 'PL_fitting', 'abs_data', 'abs_fitting',
'plqy_dic', 'optical_property', 'agent_data', 'rate_label_dic',
'good_data', 'bad_data', 'agent_iteration', 'finished',
'good_data', 'bad_data', 'continue_iteration', 'finished',

]

Expand Down Expand Up @@ -108,13 +109,14 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False
setattr(self, key, [])

try:
## Assign Agent to self.agent
self.agent
print('\n***** Start to initialize blop agent ***** \n')
self.agent = build_agent(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])
print(f'\n***** Initialized blop agent at {self.agent} ***** \n')
if self.inputs.build_agent[0]:
## Assign Agent to self.agent
self.agent
print('\n***** Start to initialize blop agent ***** \n')
self.agent = build_agent(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])
print(f'\n***** Initialized blop agent at {self.agent} ***** \n')

## self.inputs.sandbox_uri[0] is just the uri of sandbox
## so, turn uri into client and assign it to self.sandbox_tiled_client
Expand Down Expand Up @@ -162,7 +164,7 @@ def save_kafka_dict(self, home_path, reset_uid_catalog=True):
key_to_save = [
'uid', 'uid_catalog', 'uid_bundle', 'uid_pdfstream', 'uid_sandbox',
'metadata_dic', 'pdf_property', 'optical_property',
'agent_data', 'agent_iteration', 'finished', ]
'agent_data', 'continue_iteration', 'finished', ]

kafka_process_dict = {}
for key in key_to_save:
Expand Down Expand Up @@ -223,7 +225,7 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):
self.agent_data['posterior_mean']: post_mean
self.agent_data['posterior_stddev']: post_stddev
3. Check if meet target. If meet, wash loop; if not, keep iteration.
4. Update self.agent_iteration
4. Update self.continue_iteration
Args:
qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx.
Expand Down Expand Up @@ -254,11 +256,12 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):

inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
self.agent_iteration.append(False)
self.continue_iteration.append(False)

else:
self.agent_iteration.append(True)
self.continue_iteration.append(True)

# if self.inputs.build_agent[0]:
if is_1st:
pass
else:
Expand Down Expand Up @@ -469,46 +472,46 @@ def macro_04_dummy_pdf(self):



def macro_05_iq_to_gr(self, beamline_acronym):
"""macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
# def macro_05_iq_to_gr(self, beamline_acronym):
# """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer

This macro will
1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
2. Read pdf config file from self.inputs.cfg_fn[-1]
3. Read pdf background file from self.inputs.bkg_fn[-1]
4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
5. Read saved g(r) into pd.DataFrame and save again to remove the headers
6. Update g(r) data path and data frame to self.gr_data
self.gr_data[0]: gr_data (path)
self.gr_data[1]: gr_df
Args:
beamline_acronym (str): catalog name for tiled to access data
"""
# Grab metadat from stream_name = fluorescence for naming gr file
fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
gr_fn = f'{fn_uid}_scattering.gr'

### dummy test, e.g., CsPbBr2
if self.inputs.dummy_pdf[0]:
gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# Build pdf config file from a scratch
pdfconfig = PDFConfig()
pdfconfig.readConfig(self.inputs.cfg_fn[-1])
pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
plot_setting={'marker':'.','color':'green'}, test=True,
gr_fn=gr_fn)
gr_data = sqfqgr_path['gr']

## Remove headers by reading gr_data into pd.Dataframe and save again
gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

self.gr_data = []
self.gr_data.append(gr_data)
self.gr_data.append(gr_df)
# This macro will
# 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
# 2. Read pdf config file from self.inputs.cfg_fn[-1]
# 3. Read pdf background file from self.inputs.bkg_fn[-1]
# 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
# 5. Read saved g(r) into pd.DataFrame and save again to remove the headers
# 6. Update g(r) data path and data frame to self.gr_data
# self.gr_data[0]: gr_data (path)
# self.gr_data[1]: gr_df

# Args:
# beamline_acronym (str): catalog name for tiled to access data
# """
# # Grab metadat from stream_name = fluorescence for naming gr file
# fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
# gr_fn = f'{fn_uid}_scattering.gr'

# ### dummy test, e.g., CsPbBr2
# if self.inputs.dummy_pdf[0]:
# gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# # Build pdf config file from a scratch
# pdfconfig = PDFConfig()
# pdfconfig.readConfig(self.inputs.cfg_fn[-1])
# pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
# sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
# plot_setting={'marker':'.','color':'green'}, test=True,
# gr_fn=gr_fn)
# gr_data = sqfqgr_path['gr']

# ## Remove headers by reading gr_data into pd.Dataframe and save again
# gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
# gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

# self.gr_data = []
# self.gr_data.append(gr_data)
# self.gr_data.append(gr_df)



Expand Down Expand Up @@ -906,8 +909,12 @@ def macro_14_upate_agent(self):
"""

## Creat agent_data in type of dict for exporting
if 'agent_target' in self.agent_data.keys():
pass

if self.inputs.build_agent[0]:
if 'agent_target' in self.agent_data.keys():
pass
else:
self.agent_data = {}
else:
self.agent_data = {}
self.agent_data.update(self.optical_property)
Expand Down Expand Up @@ -961,7 +968,10 @@ def macro_15_save_data(self, stream_name):
df['fluorescence_fitting'] = f_fit(x0, *popt)

## use pd.concat to add various length data together
df_new = pd.concat([df, self.iq_data['df'], self.gr_data[1], self.gr_fitting['df']], ignore_index=False, axis=1)
try:
df_new = pd.concat([df, self.iq_data['df'], self.gr_data[1], self.gr_fitting['df']], ignore_index=False, axis=1)
except (TypeError, KeyError):
df_new = df

# entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data)
entry = self.sandbox_tiled_client.write_dataframe(df_new, metadata=self.agent_data)
Expand Down Expand Up @@ -1049,7 +1059,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):

restplan = BPlan('sleep_sec_q', 5)
RM.item_add(restplan, pos=2)

RM.queue_start()

elif (len(self.good_data) > 2) and (self.inputs.use_good_bad[0]):
Expand All @@ -1063,7 +1073,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):
RM.queue_start()

## Add predicted new points from ML agent into qserver
elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.agent_iteration[-1]):
elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.continue_iteration[-1]):
print('*** Add new points from agent to the front of qsever ***\n')

new_points = self.macro_agent(qserver_process, RM, check_target=True)
Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
32 changes: 16 additions & 16 deletions scripts/kafka_consumer_iterate_1LL09_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst
# from bluesky_queueserver_api.comm_base import RequestFailedError

try:
from nslsii import _read_bluesky_kafka_config_file # nslsii <0.7.0
Expand All @@ -46,7 +47,7 @@
xlsx_fn = '/home/xf28id2/.ipython/profile_collection/scripts/inputs_qserver_kafka_v2.xlsx'

## Input varaibales for Qserver, reading from xlsx_fn by given sheet name
qserver_process = LK.xlsx_to_inputs(LK._qserver_inputs(), xlsx_fn=xlsx_fn, sheet_name='qserver_test')
qserver_process = LK.xlsx_to_inputs(LK._qserver_inputs(), xlsx_fn=xlsx_fn, sheet_name='qserver_1LL09')
qin = qserver_process.inputs

## Input varaibales for Kafka, reading from xlsx_fn by given sheet name
Expand All @@ -56,13 +57,12 @@
## Define RE Manager API as RM
RM = REManagerAPI(zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0])

## Make the first prediction from kafka_process.agent
first_points = kafka_process.macro_agent(qserver_process, RM, check_target=False, is_1st=True)
rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio)
if kin.post_dilute[0]:
rate_list.append(sum(rate_list)*kin.post_dilute[1])

qin.infuse_rates = rate_list
# ## Make the first prediction from kafka_process.agent
# first_points = kafka_process.macro_agent(qserver_process, RM, check_target=False, is_1st=True)
# rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio)
# if kin.post_dilute[0]:
# rate_list.append(sum(rate_list)*kin.post_dilute[1])
# qin.infuse_rates = rate_list


## Import Qserver parameters to RE Manager
Expand Down Expand Up @@ -184,9 +184,14 @@ def print_message(consumer, doctype, doc):
## Plot data, Agent prediction ##
## Export data, Save data to tiled ##
###################################################
if name == 'stop':
RM.queue_stop()
print('\n*** qsever stop for data export, identification, and fitting ***\n')
if name == 'stop':

## if 'take_a_uvvis' in stream_list, stop to wait for data process
## if not, since next task in queue is washing loop, no need to stop queue
if 'take_a_uvvis' in message['num_events']:
RM.queue_stop()
print('\n*** qsever stop for data export, identification, and fitting ***\n')

print(f"{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}"
)
Expand All @@ -207,11 +212,6 @@ def print_message(consumer, doctype, doc):
for stream_name in stream_list:
kafka_process.stream_list.append(stream_name)

## When 'take_a_uvvis' not in stream_list, no need to wait for data process
## since next task in queue is washing loop. So start queue
if 'take_a_uvvis' not in kafka_process.stream_list:
RM.queue_start()

## Set good/bad data condictions to the corresponding sample
kh = kin.key_height[0]
hei = kin.height[0]
Expand Down

0 comments on commit 19608b2

Please sign in to comment.