Skip to content

Commit

Permalink
Update class in _LDRD_Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
cheng-hung committed Jul 25, 2024
1 parent 67b9444 commit 0056686
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 33 deletions.
73 changes: 40 additions & 33 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
# import _get_pdf as gp

# import torch
from prepare_agent_pdf import build_agen
# from prepare_agent_pdf import build_agen
# from diffpy.pdfgetx import PDFConfig
from tiled.client import from_uri
# from tiled.client import from_uri



def _qserver_inputs():
qserver_list=[
'dummy_qserver', 'name_by_prefix', 'prefix',
'pump_list', 'precursor_list', 'syringe_mater_list', 'syringe_list',
'target_vol_list', 'sample', 'mixer', 'wash_tube', 'resident_t_ratio',
'zmq_control_addr', 'zmq_info_addr',
'dummy_qserver', 'is_iteration', 'pos',
'name_by_prefix', 'prefix', 'pump_list', 'precursor_list',
'syringe_mater_list', 'syringe_list', 'target_vol_list',
'sample',
'wait_dilute', 'mixer', 'wash_tube', 'resident_t_ratio',
'rate_unit', 'uvvis_config', 'perkin_config',
'set_target_list', 'infuse_rates',
]
Expand Down Expand Up @@ -71,34 +74,38 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'):
## Every attribute in self.inputs is a list!!!
self.inputs = dic_to_inputs(self.print_dic, self.parameters_list)

## Append agent in the list of self.inputs.agent
if self.inputs.agent==[]:
self.inputs.agent.append(
build_agen(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])
)

## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox
## so, turn uri into client and append it in self.inputs.sandbox_tiled_client
if type(self.inputs.sandbox_tiled_client[0]) is str:
self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0]))


## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn
# fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn']
for fn in self.inputs.fn_TBD:

path = getattr(self.inputs, fn)[0]
if path in self.parameters_list:
path = getattr(self.inputs, path)[0]

ff = getattr(self.inputs, fn)[1]

fn_glob = glob.glob(os.path.join(path, ff))

for i in fn_glob:
getattr(self.inputs, fn).append(i)
try:
# ## Append agent in the list of self.inputs.agent
# if self.inputs.agent==[]:
# self.inputs.agent.append(
# build_agen(
# peak_target=self.inputs.peak_target[0],
# agent_data_path=self.inputs.agent_data_path[0])
# )

# ## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox
# ## so, turn uri into client and append it in self.inputs.sandbox_tiled_client
# if type(self.inputs.sandbox_tiled_client[0]) is str:
# self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0]))


## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn
# fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn']
for fn in self.inputs.fn_TBD:

path = getattr(self.inputs, fn)[0]
if path in self.parameters_list:
path = getattr(self.inputs, path)[0]

ff = getattr(self.inputs, fn)[1]

fn_glob = glob.glob(os.path.join(path, ff))

for i in fn_glob:
getattr(self.inputs, fn).append(i)

except AttributeError:
pass



Expand Down
229 changes: 229 additions & 0 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,235 @@
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst
from ophyd.sim import det, noisy_det
from _LDRD_Kafka import xlsx_to_inputs

## Pass qsever parameters by xlsx_to_inputs
## Arrange tasks of for PQDs synthesis
def synthesis_queue_xlsx(parameter_obj):
"""
Pass qsever parameters by xlsx_to_inputs
Arrange tasks of for PQDs synthesis
Args:
parameter_obj (xlsx_to_inputs): parameters passing to qserver
(example: pump_list = parameter_obj.inputs.pump_list)
"""

qsp = parameter_obj.inputs

syringe_list = qsp.syringe_list
pump_list = qsp.pump_list
set_target_list = qsp.set_target_list
target_vol_list = qsp.target_vol_list
rate_list = qsp.infuse_rates
syringe_mater_list = qsp.syringe_mater_list
precursor_list = qsp.precursor_list
mixer = qsp.mixer
resident_t_ratio = qsp.resident_t_ratio
prefix = qsp.prefix
sample = qsp.sample
wash_tube = qsp.wash_tube
rate_unit = qsp.rate_unit[0]
name_by_prefix = qsp.name_by_prefix[0]
det2 = qsp.uvvis_config[0]
num_abs = qsp.uvvis_config[1]
num_flu = qsp.uvvis_config[2]
det1 = qsp.perkin_config[0]
det1_frame_rate = qsp.perkin_config[1]
det1_time = qsp.perkin_config[2]
pos = qsp.pos[0]
dummy_qserver = qsp.dummy_qserver[0]
is_iteration = qsp.is_iteration[0]
zmq_control_addr = qsp.zmq_control_addr[0]
zmq_info_addr = qsp.zmq_info_addr[0]


RM = REManagerAPI(zmq_control_addr=zmq_control_addr, zmq_info_addr=zmq_info_addr)

if name_by_prefix:
sample = de._auto_name_sample(rate_list, prefix=prefix)

rate_list = np.asarray(rate_list, dtype=np.float32)
if len(rate_list.shape) == 1:
rate_list = rate_list.reshape(1, rate_list.shape[0])
rate_list = rate_list.tolist()
else:
rate_list = rate_list.tolist()

set_target_list = np.asarray(set_target_list, dtype=np.int8)
if len(set_target_list.shape) == 1:
set_target_list = set_target_list.reshape(1, set_target_list.shape[0])
set_target_list = set_target_list.tolist()
else:
set_target_list = set_target_list.tolist()


# 0. stop infuese for all pumps
flowplan = BPlan('stop_group', pump_list + [wash_tube[2], wash_tube[5]])
RM.item_add(flowplan, pos=pos)


for i in range(len(rate_list)):
# for i in range(2):
## 1. Set i infuese rates
for sl, pl, ir, tvl, stl, sml in zip(
syringe_list,
pump_list,
rate_list[i],
target_vol_list,
set_target_list[i],
syringe_mater_list
):

# ir = float(ir)
# stl = int(stl)

flowplan = BPlan('set_group_infuse2', [sl], [pl],
rate_list = [ir],
target_vol_list = [tvl],
set_target_list = [stl],
syringe_mater_list = [sml],
rate_unit = rate_unit)
RM.item_add(flowplan, pos=pos)


## 2. Start infuese
if precursor_list[-1] == 'Toluene':
flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1])

else:
flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])

RM.item_add(flowplan, pos=pos)


## 3. Wait for equilibrium
if len(mixer) == 1:
if precursor_list[-1] == 'Toluene':
mixer_pump_list = [[mixer[0], *pump_list[:2]]]
else:
mixer_pump_list = [[mixer[0], *pump_list]]
elif len(mixer) == 2:
if precursor_list[-1] == 'Toluene':
mixer_pump_list = [[mixer[0], *pump_list[:2]], [mixer[1], *pump_list[:-1]]]
else:
mixer_pump_list = [[mixer[0], *pump_list[:2]], [mixer[1], *pump_list]]

if dummy_qserver:
restplan = BPlan('sleep_sec_q', qsp.dummy_qserver[1])
RM.item_add(restplan, pos=pos)

else:
if is_iteration:
rest_time = resident_t_ratio[-1]

elif len(resident_t_ratio) == 1:
rest_time = resident_t_ratio[0]
elif len(resident_t_ratio) > 1 and i==0:
rest_time = resident_t_ratio[0]
elif len(resident_t_ratio) > 1 and i>0:
rest_time = resident_t_ratio[-1]

restplan = BPlan('wait_equilibrium2', mixer_pump_list, ratio=rest_time)
RM.item_add(restplan, pos=pos)


## 3.1 Wait for 30 secpnds for post dilute
if precursor_list[-1] == 'Toluene':
flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]])
RM.item_add(flowplan, pos=pos)

restplan = BPlan('sleep_sec_q', qsp.wait_dilute[0])
RM.item_add(restplan, pos=pos)



## 4.0 Configure area detector in Qserver
if det1 == 'pe1c':
scanplan = BPlan('configure_area_det',
det='pe1c',
exposure=1,
acq_time=det1_frame_rate)
RM.item_add(scanplan, pos=pos)


## 4-1. Take a fluorescence peak to check reaction
scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i],
spectrum_type='Corrected Sample',
correction_type='Dark',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer)
RM.item_add(scanplan, pos=pos)


# ## 4-2. Take a Absorption spectra to check reaction
# scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i],
# spectrum_type='Absorbtion',
# correction_type='Reference',
# pump_list=pump_list,
# precursor_list=precursor_list,
# mixer=mixer)
# RM.item_add(scanplan, pos=pos)


#### Kafka check data here.

## 5. Sleep for 5 seconds for Kafak to check good/bad data
restplan = BPlan('sleep_sec_q', 5)
RM.item_add(restplan, pos=pos)


# ## 6.0 Print global parameters in Qserver
# scanplan = BPlan('print_glbl_qserver')
# RM.item_add(scanplan, pos=pos)

if det1 == 'pe1c':
## 6.1 Configure area detector in Qserver
scanplan = BPlan('configure_area_det',
det=det1,
exposure=det1_time,
acq_time=det1_frame_rate)
RM.item_add(scanplan, pos=pos)


## 6. Start xray_uvvis bundle plan to take real data ('pe1c' or 'det')
scanplan = BPlan('xray_uvvis_plan', det1, det2,
num_abs=num_abs,
num_flu=num_flu,
sample_type=sample[i],
spectrum_type='Absorbtion',
correction_type='Reference',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer,
dilute_pump=pump_list[-1])
RM.item_add(scanplan, pos=pos)

## 6.1 sleep 20 seconds for stopping
restplan = BPlan('sleep_sec_q', qsp.wait_dilute[1])
RM.item_add(restplan, pos=pos)


###### Kafka analyze data here. #######

## 7. Wash the loop and mixer
if wash_tube[0] == 0:
wash_tube_queue2(pump_list, wash_tube, rate_unit,
pos=[pos,pos,pos,pos,pos],
zmq_control_addr=zmq_control_addr,
zmq_info_addr=zmq_info_addr)
elif wash_tube[0] == 1:
inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')


# 8. stop infuese for all pumps
flowplan = BPlan('stop_group', pump_list)
RM.item_add(flowplan, pos=pos)




## Arrange tasks of for PQDs synthesis
def synthesis_queue(
Expand Down

0 comments on commit 0056686

Please sign in to comment.