Skip to content

Commit

Permalink
update macros with real data at 1LL09
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 23, 2024
1 parent 19608b2 commit bf3a73d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 59 deletions.
9 changes: 5 additions & 4 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
sq = importlib.import_module("_synthesis_queue_RM")
de = importlib.import_module("_data_export")
da = importlib.import_module("_data_analysis")
pc = importlib.import_module("_pdf_calculator")
# pc = importlib.import_module("_pdf_calculator")

# from diffpy.pdfgetx import PDFConfig
# gp = importlib.import_module("_get_pdf")
Expand All @@ -33,7 +33,8 @@ def _qserver_inputs():
'name_by_prefix', 'prefix', 'pump_list', 'precursor_list',
'syringe_mater_list', 'syringe_list', 'target_vol_list',
'sample',
'wait_dilute', 'mixer', 'wash_tube', 'resident_t_ratio',
'wait_dilute', 'if_wash', 'wash_loop', 'wash_sapphire',
'mixer', 'wash_tube', 'resident_t_ratio',
'rate_unit', 'uvvis_config', 'perkin_config',
'auto_set_target_list', 'set_target_list', 'infuse_rates',
]
Expand Down Expand Up @@ -736,7 +737,7 @@ def macro_10_good_bad(self, stream_name):
distance=self.inputs.distance[0],
height=self.inputs.height[0],
dummy_test=self.inputs.dummy_kafka[0],
percent_range=[30, 100])
percent_range=[40, 100])

self.PL_goodbad = {}
PL_goodbad = { 'wavelength':np.asarray(x0), 'percentile_mean':np.asarray(y0),
Expand Down Expand Up @@ -765,7 +766,7 @@ def macro_11_absorbance(self, stream_name):
## Apply percnetile filtering for absorption spectra, defaut percent_range = [15, 85]
abs_per = da.percentile_abs(self.qepro_dic['QEPro_x_axis'],
self.qepro_dic['QEPro_output'],
percent_range=[15, 85])
percent_range=[5, 65])

print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n')
# abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0)
Expand Down
54 changes: 32 additions & 22 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def synthesis_queue_xlsx(parameter_obj):
resident_t_ratio = qsp.resident_t_ratio
prefix = qsp.prefix
sample = qsp.sample
wash_tube = qsp.wash_tube
wait_dilute = qsp.wait_dilute
if_wash = qsp.if_wash
wash_loop = qsp.wash_loop
wash_sapphire = qsp.wash_sapphire
rate_unit = qsp.rate_unit[0]
name_by_prefix = qsp.name_by_prefix[0]
det2 = qsp.uvvis_config[0]
Expand Down Expand Up @@ -72,27 +75,27 @@ def synthesis_queue_xlsx(parameter_obj):
else:
set_target_list = set_target_list.tolist()


num_pumps = int(len(wash_loop)/3)
# 0. stop infuese for all pumps
flowplan = BPlan('stop_group', pump_list + [wash_tube[2], wash_tube[5]])
flowplan = BPlan('stop_group', pump_list + [wash_loop[1+i*3] for i in range(num_pumps)])
RM.item_add(flowplan, pos=pos)


for i in range(len(rate_list)):
# for i in range(2):
## 1. Set i infuese rates
rate_list2 = rate_list[i][:-1] + [60]
for sl, pl, ir, tvl, stl, sml in zip(
syringe_list,
pump_list,
rate_list[i],
rate_list2,
target_vol_list,
set_target_list[i],
syringe_mater_list
):

# ir = float(ir)
# stl = int(stl)

flowplan = BPlan('set_group_infuse2', [sl], [pl],
rate_list = [ir],
target_vol_list = [tvl],
Expand All @@ -102,13 +105,14 @@ def synthesis_queue_xlsx(parameter_obj):
RM.item_add(flowplan, pos=pos)


## 2. Start infuese
if precursor_list[-1] == 'Toluene':
flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1])
# ## 2. Start infuese
# if precursor_list[-1] == 'Toluene':
# flowplan = BPlan('start_group_infuse', pump_list[:-1], rate_list[i][:-1])

else:
flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])
# else:
# flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])

flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])
RM.item_add(flowplan, pos=pos)


Expand Down Expand Up @@ -145,7 +149,13 @@ def synthesis_queue_xlsx(parameter_obj):

## 3.1 Wait for 30 secpnds for post dilute
if precursor_list[-1] == 'Toluene':
flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]])
# flowplan = BPlan('start_group_infuse', [pump_list[-1]], [rate_list[i][-1]])
flowplan = BPlan('set_group_infuse2', [syringe_list[-1]], [pump_list[-1]],
rate_list = [rate_list[i][-1]],
target_vol_list = [target_vol_list[-1]],
set_target_list = [set_target_list[i][-1]],
syringe_mater_list = [syringe_mater_list[-1]],
rate_unit = rate_unit)
RM.item_add(flowplan, pos=pos)

restplan = BPlan('sleep_sec_q', qsp.wait_dilute[0])
Expand Down Expand Up @@ -222,12 +232,12 @@ def synthesis_queue_xlsx(parameter_obj):
###### Kafka analyze data here. #######

## 7. Wash the loop and mixer
if wash_tube[0] == 0:
wash_tube_queue2(pump_list, wash_tube, rate_unit,
if if_wash[0] == 1:
wash_tube_queue2(pump_list, if_wash, wash_loop, rate_unit,
pos=[pos,pos,pos,pos,pos],
zmq_control_addr=zmq_control_addr,
zmq_info_addr=zmq_info_addr)
elif wash_tube[0] == 1:
elif wash_tube[0] == 0:
inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')

Expand Down Expand Up @@ -716,7 +726,7 @@ def wash_tube_queue(pump_list, wash_tube, rate_unit,


## wash loop with two solvents
def wash_tube_queue2(pump_list, wash_tube, rate_unit,
def wash_tube_queue2(pump_list, if_wash, wash_loop, rate_unit,
pos=[0,1,2,3,4],
zmq_control_addr='tcp://localhost:60615',
zmq_info_addr='tcp://localhost:60625'):
Expand All @@ -727,10 +737,10 @@ def wash_tube_queue2(pump_list, wash_tube, rate_unit,
flowplan = BPlan('stop_group', pump_list)
RM.item_add(flowplan, pos=pos[0])


num_pumps = int(len(wash_loop)/3)
### Set up washing tube/loop
flowplan = BPlan('set_group_infuse2', [wash_tube[1], wash_tube[4]], [wash_tube[2], wash_tube[5]],
rate_list=[wash_tube[3], wash_tube[6]],
flowplan = BPlan('set_group_infuse2', [wash_loop[0+i*3] for i in range(num_pumps)], [wash_loop[1+i*3] for i in range(num_pumps)],
rate_list=[wash_loop[2+i*3] for i in range(num_pumps)],
target_vol_list=['30 ml', '15 ml'],
set_target_list=[False, False],
syringe_mater_list = ['steel', 'plastic_BD'],
Expand All @@ -739,18 +749,18 @@ def wash_tube_queue2(pump_list, wash_tube, rate_unit,


### Start washing tube/loop
flowplan = BPlan('start_group_infuse', [wash_tube[2], wash_tube[5]], [wash_tube[3], wash_tube[6]])
flowplan = BPlan('start_group_infuse', [wash_loop[1+i*3] for i in range(num_pumps)], [wash_loop[2+i*3] for i in range(num_pumps)])
RM.item_add(flowplan, pos=pos[2])


### Wash loop/tube for xxx seconds
restplan = BPlan('sleep_sec_q', wash_tube[7])
restplan = BPlan('sleep_sec_q', if_wash[1])
RM.item_add(restplan, pos=pos[3])



### Stop washing
flowplan = BPlan('stop_group', [wash_tube[2], wash_tube[5]])
flowplan = BPlan('stop_group', [wash_loop[1+i*3] for i in range(num_pumps)])
RM.item_add(flowplan, pos=pos[4])


Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
36 changes: 3 additions & 33 deletions scripts/kafka_consumer_iterate_1LL09_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,39 +144,10 @@ def print_message(consumer, doctype, doc):
###################################################
if name == 'start':
print(
f"{datetime.datetime.now().isoformat()} documents {name}\n"
f"\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
f"document keys: {list(message.keys())}")

if 'uid' in message.keys():
print(f"uid: {message['uid']}")
if 'plan_name' in message.keys():
print(f"plan name: {message['plan_name']}")
if 'detectors' in message.keys():
print(f"detectors: {message['detectors']}")
if 'pumps' in message.keys():
print(f"pumps: {message['pumps']}")
if 'detectors' in message.keys():
print(f"detectors: {message['detectors']}")
if 'uvvis' in message.keys() and message['plan_name']!='count':
print(f"uvvis mode:\n"
f" integration time: {message['uvvis'][0]} ms\n"
f" num spectra averaged: {message['uvvis'][1]}\n"
f" buffer capacity: {message['uvvis'][2]}"
)
elif 'uvvis' in message.keys() and message['plan_name']=='count':
print(f"uvvis mode:\n"
f" spectrum type: {message['uvvis'][0]}\n"
f" integration time: {message['uvvis'][2]} ms\n"
f" num spectra averaged: {message['uvvis'][3]}\n"
f" buffer capacity: {message['uvvis'][4]}"
)
if 'mixer' in message.keys():
print(f"mixer: {message['mixer']}")
if 'sample_type' in message.keys():
print(f"sample type: {message['sample_type']}")

## Reset kafka_process.uid to an empty list
kafka_process.uid = []

kafka_process.macro_00_print_start(message)


######### While document name == 'stop' ###########
Expand All @@ -195,7 +166,6 @@ def print_message(consumer, doctype, doc):
print(f"{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}"
)
# num_events = len(message['num_events'])

## wait 2 seconds for databroker to save data
time.sleep(2)
Expand Down

0 comments on commit bf3a73d

Please sign in to comment.