Skip to content

Commit

Permalink
test work flow at 1LL09 with macros
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 13, 2024
1 parent a59b0a9 commit f209438
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 95 deletions.
111 changes: 62 additions & 49 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import numpy as np
import pandas as pd
import importlib
import time
import pprint

import _data_export as de
from _plot_helper import plot_uvvis
Expand All @@ -24,7 +26,7 @@
def _qserver_inputs():
qserver_list=[
'zmq_control_addr', 'zmq_info_addr',
'dummy_qserver', 'is_iteration', 'pos',
'dummy_qserver', 'is_iteration', 'pos', 'use_OAm',
'name_by_prefix', 'prefix', 'pump_list', 'precursor_list',
'syringe_mater_list', 'syringe_list', 'target_vol_list',
'sample',
Expand Down Expand Up @@ -142,6 +144,12 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False
pass


## Reset attributes of keys in _kafka_process() to empty lists for next event
def reset_kafka_process(self):
for key in _kafka_process():
setattr(self, key, [])


def auto_rate_list(self, pump_list, new_points, fix_Br_ratio):
"""Auto transfer the predicted rates in new_points to a rate_list for qserver
Expand All @@ -168,7 +176,7 @@ def auto_rate_list(self, pump_list, new_points, fix_Br_ratio):



def macro_agent(self, qserver_process, RM, check_target=False, use_OAm=False, is_1st=False):
def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):
"""macro to build agent, make optimization, and update agent_data
This macro will
Expand All @@ -185,6 +193,7 @@ def macro_agent(self, qserver_process, RM, check_target=False, use_OAm=False, is
qserver_process (_LDRD_Kafka.xlsx_to_inputs, optional): qserver parameters read from xlsx.
RM (REManagerAPI): Run Engine Manager API.
check_target (bool, optional): Check if peak emission reaches peak target. Defaults to False.
is_ist (bool, optional): Check if it is the first precidciton. If yes, skip build agent.
Returns:
dict: new_points predicted by self.agent
Expand All @@ -194,14 +203,35 @@ def macro_agent(self, qserver_process, RM, check_target=False, use_OAm=False, is
peak_target = self.inputs.peak_target[0]
peak_tolerance = self.inputs.peak_target[1]


if check_target:
peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target)
meet_target = (peak_diff <= peak_tolerance)
if meet_target:
print(f'\nTarget peak: {self.inputs.peak_target[0]} nm vs. Current peak: {self.PL_fitting["peak_emission"]} nm\n')
print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])

inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
self.agent_iteration.append(False)

else:
self.agent_iteration.append(True)

if is_1st:
pass
else:
build_agent = importlib.import_module("prepare_agent_pdf").build_agent
self.agent = build_agent(
peak_target = peak_target,
agent_data_path = self.inputs.agent_data_path[0],
use_OAm = use_OAm)
use_OAm = qin.use_OAm[0])

if len(self.agent.table) < 2:
acq_func = "qr"
Expand Down Expand Up @@ -235,26 +265,7 @@ def macro_agent(self, qserver_process, RM, check_target=False, use_OAm=False, is
self.agent_data.update({'agent_target': agent_target})
self.agent_data.update({'posterior_mean': post_mean})
self.agent_data.update({'posterior_stddev': post_stddev})

if check_target and meet_target:
peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target)
meet_target = (peak_diff <= peak_tolerance)
print(f'\nTarget peak: {self.inputs.peak_target[0]} nm vs. Current peak: {self.PL_fitting["peak_emission"]} nm\n')
print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])

inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
self.agent_iteration.append(False)

else:
self.agent_iteration.append(True)

return new_points


Expand Down Expand Up @@ -350,7 +361,7 @@ def macro_03_stop_queue_uid(sefl, RM):



def macro_04_dummy_pdf(sefl):
def macro_04_dummy_pdf(self):
"""macro to setup a dummy pdf data for testing, used in kafka consumer
while self.inputs.dummy_pdf[0] is True
Expand All @@ -364,11 +375,11 @@ def macro_04_dummy_pdf(sefl):
self.iq_data['df']: iq_df
"""
self.iq_data = {}
iq_array = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T
iq_array = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T
# self.iq_data.append(iq_array[0])
# self.iq_data.append(iq_array[1])
# self.iq_data.append(iq_array)
iq_df = pd.read_csv(self.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ')
iq_df = pd.read_csv(self.inputs.iq_fn[-1], skiprows=1, names=['q', 'I(q)'], sep=' ')
# self.iq_data.append(iq_df)

iq_data = { 'Q':iq_array[0],
Expand Down Expand Up @@ -554,6 +565,7 @@ def macro_08_no_fitting_pdf(self):
# self.gr_fitting.append(gr_fit_arrary)
# self.gr_fitting.append(gr_fit_df)

self.pdf_property = {}
pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan}
self.pdf_property.update(pdf_property)

Expand Down Expand Up @@ -600,11 +612,11 @@ def macro_10_good_bad(self, stream_name):
This macro will
1. Identify a good or bad PL peak in 'take_a_uvvis' and 'fluorescence'
2. Update self.PL_goodbad
self.PL_goodbad['wavelength']: wavelenght (nm) of PL
self.PL_goodbad['intensity']: intensity of PL
self.PL_goodbad['data_id']: f'{t0[0]}_{t0[1]}_{metadata_dic["uid"][:8]}'
self.PL_goodbad['peak']: peaks from scipy.find_peaks
self.PL_goodbad['prop']: properties from scipy.find_peaks
self.PL_goodbad['wavelength']: wavelenght (nm) of PL
self.PL_goodbad['percentile_mean']: intensity of PL (percentile_mean)
self.PL_goodbad['data_id']: f'{t0[0]}_{t0[1]}_{metadata_dic["uid"][:8]}'
self.PL_goodbad['peak']: peaks from scipy.find_peaks
self.PL_goodbad['prop']: properties from scipy.find_peaks
Args:
stream_name (str): the stream name in scan doc to identify scan plan
Expand Down Expand Up @@ -632,7 +644,7 @@ def macro_10_good_bad(self, stream_name):
percent_range=[30, 100])

self.PL_goodbad = {}
PL_goodbad = { 'wavelength':np.asarray(x0), 'intensity':np.asarray(y0),
PL_goodbad = { 'wavelength':np.asarray(x0), 'percentile_mean':np.asarray(y0),
'data_id':data_id, 'peak':peak, 'prop':prop}
self.PL_goodbad.update(PL_goodbad)

Expand All @@ -645,7 +657,7 @@ def macro_11_absorbance(self, stream_name):
1. Apply a 2D line to fit the baseline of absorption spectra
2. Update self.abs_data
self.abs_data['wavelength']: wavelenght of absorbance nm
self.abs_data['absorbance']: absorbance array (percentile_mean)
self.abs_data['percentile_mean']: absorbance array (percentile_mean)
self.abs_data['offset']: absorbance array offset
3. Update self.abs_fitting
self.abs_fitting['fit_function']: da.line_2D
Expand All @@ -663,7 +675,7 @@ def macro_11_absorbance(self, stream_name):
print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n')
# abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0)
abs_array = abs_per.mean(axis=0)
wavelength = qepro_dic['QEPro_x_axis'][0]
wavelength = self.qepro_dic['QEPro_x_axis'][0]

popt_abs01, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[205, 240], plot=False)
popt_abs02, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False)
Expand All @@ -674,10 +686,10 @@ def macro_11_absorbance(self, stream_name):

abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs)
print(f'\nFitting function for baseline offset: {da.line_2D}\n')
ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs}
ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs, 'percentile_mean':abs_array}

self.abs_data = {}
self.abs_data.update({'wavelength':wavelength, 'absorbance':abs_array, 'offset':abs_array_offset})
self.abs_data.update({'wavelength':wavelength, 'percentile_mean':abs_array, 'offset':abs_array_offset})

self.abs_fitting = {}
self.abs_fitting.update(ff_abs)
Expand All @@ -701,12 +713,12 @@ def macro_12_PL_fitting(self):
self.PL_fitting['wavelength']: wavelenght (nm) of PL between 400 ~ 800 nm
self.PL_fitting['intensity']: intensity of PL (nm) between 400 ~ 800 nm
self.PL_fitting['shifted_peak_idx']: peak index between 400 ~ 800 nm
self.PL_fitting['percentile_mean']: intensity of PL (percentile_mean)
"""

x, y, p, f_fit, popt = da._fitting_in_kafka(
self.PL_goodbad['wavelength'],
self.PL_goodbad['intensity'],
self.PL_goodbad['percentile_mean'],
self.PL_goodbad['data_id'],
self.PL_goodbad['peak'],
self.PL_goodbad['prop'],
Expand All @@ -717,7 +729,7 @@ def macro_12_PL_fitting(self):
r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2])
r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0)

metadata_dic["r_2"] = r_2
self.metadata_dic["r_2"] = r_2

if 'gauss' in f_fit.__name__:
constant = 2.355
Expand All @@ -738,8 +750,8 @@ def macro_12_PL_fitting(self):

ff={'fit_function': f_fit, 'curve_fit': popt,
'fwhm': fwhm, 'peak_emission': peak_emission,
'wavelength': x, 'intensity': y,
'shifted_peak_idx': p}
'wavelength': x, 'intensity': y, 'shifted_peak_idx': p,
'percentile_mean': self.PL_goodbad['percentile_mean']}

self.PL_fitting = {}
self.PL_fitting.update(ff)
Expand All @@ -764,6 +776,7 @@ def macro_13_PLQY(self):
x = self.PL_fitting['wavelength']
y = self.PL_fitting['intensity']
peak_emission = self.PL_fitting['peak_emission']
fwhm = self.PL_fitting['fwhm']
PL_integral_s = integrate.simpson(y)

## Find absorbance at 365 nm from absorbance stream
Expand All @@ -788,7 +801,7 @@ def macro_13_PLQY(self):


def macro_14_upate_agent(self):
"""macro to update agent_data in type of dict for exporting as json and wirte to sandbox
"""macro to update agent_data in type of dict for exporting
This macro will
1. Update self.agent_data with
Expand All @@ -800,7 +813,7 @@ def macro_14_upate_agent(self):
"""

## Creat agent_data in type of dict for exporting as json and wirte to sandbox
## Creat agent_data in type of dict for exporting
if 'agent_target' in self.agent_data.keys():
pass
else:
Expand Down Expand Up @@ -847,7 +860,7 @@ def macro_15_save_data(self, stream_name):
df['wavelength_nm'] = x0
df['absorbance_mean'] = self.abs_data['absorbance']
df['absorbance_offset'] = self.abs_data['offset']
df['fluorescence_mean'] = self.PL_goodbad['intensity']
df['fluorescence_mean'] = self.PL_goodbad['percentile_mean']
f_fit = self.PL_fitting['fit_function']
popt = self.PL_fitting['curve_fit']
df['fluorescence_fitting'] = f_fit(x0, *popt)
Expand Down Expand Up @@ -880,8 +893,8 @@ def macro_16_num_good(self, stream_name):
stream_name (str): the stream name in scan doc to identify scan plan
"""

type_peak = type(kafka_process.PL_goodbad['peak'])
type_prop = type(kafka_process.PL_goodbad['prop'])
type_peak = type(self.PL_goodbad['peak'])
type_prop = type(self.PL_goodbad['prop'])

## Append good/bad idetified results
if stream_name == 'take_a_uvvis':
Expand All @@ -897,8 +910,8 @@ def macro_16_num_good(self, stream_name):
print('\n*** export, identify good/bad, fitting complete ***\n')

print(f"\n*** {self.sample_type} of uid: {self.uid[:8]} has: ***\n"
f"{self.optical_property = }***\n"
f"{self.pdf_property = }***\n")
f"*** {self.optical_property = } ***\n"
f"*** {self.pdf_property = } ***\n")


def macro_17_add_queue(self, stream_name, qserver_process, RM):
Expand Down Expand Up @@ -955,12 +968,12 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):
RM.queue_start()

## Add predicted new points from ML agent into qserver
elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.inputs.agent_iteration[-1]):
elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.agent_iteration[-1]):
print('*** Add new points from agent to the fron of qsever ***\n')

new_points = self.macro_agent(qserver_process, RM, check_target=True)

print(f'*** New points from agent: {new_points} ***\n')
print(f'*** New points from agent:\n {pprint.pformat(new_points, indent=1)} ***\n')

rate_list = self.auto_rate_list(qin.pump_list, new_points, self.inputs.fix_Br_ratio)

Expand Down
13 changes: 5 additions & 8 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,15 @@ def synthesis_queue_xlsx(parameter_obj):


## 6. Start xray_uvvis bundle plan to take real data ('pe1c' or 'det')
scanplan = BPlan('xray_uvvis_plan', det1, det2,
scanplan = BPlan('xray_uvvis_plan2', det1, det2,
num_abs=num_abs,
num_flu=num_flu,
sample_type=sample[i],
spectrum_type='Absorbtion',
correction_type='Reference',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer,
dilute_pump=pump_list[-1])
mixer=mixer)
RM.item_add(scanplan, pos=pos)

## 6.1 sleep 20 seconds for stopping
Expand Down Expand Up @@ -424,16 +423,15 @@ def synthesis_queue(


## 6. Start xray_uvvis bundle plan to take real data ('pe1c' or 'det')
scanplan = BPlan('xray_uvvis_plan', det1, 'qepro',
scanplan = BPlan('xray_uvvis_plan2', det1, 'qepro',
num_abs=num_abs,
num_flu=num_flu,
sample_type=sample[i],
spectrum_type='Absorbtion',
correction_type='Reference',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer,
dilute_pump=pump_list[-1])
mixer=mixer)
RM.item_add(scanplan, pos=pos)

## 6.1 sleep 20 seconds for stopping
Expand Down Expand Up @@ -647,8 +645,7 @@ def synthesis_queue3(
correction_type='Reference',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer,
dilute_pump=pump_list[-1])
mixer=mixer)
RM.item_add(scanplan, pos=pos)

## 6.1 sleep 20 seconds for stopping
Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
Loading

0 comments on commit f209438

Please sign in to comment.