Skip to content

Commit

Permalink
Update RM API and correct NotJSONError
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Apr 3, 2024
1 parent e5f2918 commit 6642ee2
Show file tree
Hide file tree
Showing 5 changed files with 652 additions and 287 deletions.
335 changes: 93 additions & 242 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np
import pandas as pd
from bluesky_queueserver.manager.comms import zmq_single_request
# from bluesky_queueserver.manager.comms import zmq_single_request
import _data_export as de
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst
Expand All @@ -26,9 +26,8 @@ def synthesis_queue(
pos='back',
dummy_qserver=False,
is_iteration=False,
zmq_server_address='tcp://xf28id2-ca2:60615',
zmq_control_addr='tcp://xf28id2-ca2:60615',
zmq_info_addr='tcp://xf28id2-ca2:60625',
zmq_control_addr='tcp://localhost:60615',
zmq_info_addr='tcp://localhost:60625',
):

RM = REManagerAPI(zmq_control_addr=zmq_control_addr, zmq_info_addr=zmq_info_addr)
Expand Down Expand Up @@ -66,77 +65,30 @@ def synthesis_queue(
# ir = float(ir)
# stl = int(stl)

# flowplan = BPlan('set_group_infuse2', [sl], [pl],
# rate_list = [ir],
# target_vol_list = [tvl],
# set_target_list = [stl],
# syringe_mater_list = [sml],
# rate_unit = rate_unit)
# RM.item_add(flowplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
"args": [[sl], [pl]],
"kwargs": {
"rate_list":[ir],
"target_vol_list":[tvl],
"set_target_list":[stl],
"syringe_mater_list":[sml],
"rate_unit":rate_unit},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
flowplan = BPlan('set_group_infuse2', [sl], [pl],
rate_list = [ir],
target_vol_list = [tvl],
set_target_list = [stl],
syringe_mater_list = [sml],
rate_unit = rate_unit)
RM.item_add(flowplan, pos=pos)


## 2. Start infuese

# flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])
# RM.item_add(flowplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
"args": [pump_list, rate_list[i]],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
flowplan = BPlan('start_group_infuse', pump_list, rate_list[i])
RM.item_add(flowplan, pos=pos)


## 3. Wait for equilibrium

# mixer_pump_list = []
# for i in range(len(mixer)):
# mixer_pump = [mixer[i], pump]
if len(mixer) == 1:
mixer_pump_list = [[mixer[0], *pump_list]]
elif len(mixer) == 2:
mixer_pump_list = [[mixer[0], *pump_list[:3]], [mixer[1], *pump_list]]

if dummy_qserver:

# restplan = BPlan('sleep_sec_q', 5)
# RM.item_add(restplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
"args":[5],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
restplan = BPlan('sleep_sec_q', 5)
RM.item_add(restplan, pos=pos)

else:
if is_iteration:
Expand All @@ -149,209 +101,108 @@ def synthesis_queue(
elif len(resident_t_ratio) > 1 and i>0:
rest_time = resident_t_ratio[-1]

# restplan = BPlan('wait_equilibrium2', mixer_pump_list, ratio=rest_time)
# RM.item_add(restplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"wait_equilibrium2",
"args": [mixer_pump_list],
"kwargs": {"ratio":rest_time},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
restplan = BPlan('wait_equilibrium2', mixer_pump_list, ratio=rest_time)
RM.item_add(restplan, pos=pos)



## 4-1. Take a fluorescence peak to check reaction

# scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i], spectrum_type='Corrected Sample'
# correction_type='Dark', pump_list=pump_list, precursor_list=precursor_list,
scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i],
spectrum_type='Corrected Sample',
correction_type='Dark',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer)
RM.item_add(scanplan, pos=pos)


# ## 4-2. Take a Absorption spectra to check reaction
# scanplan = BPlan('take_a_uvvis_csv_q', sample_type=sample[i],
# spectrum_type='Absorbtion',
# correction_type='Reference',
# pump_list=pump_list,
# precursor_list=precursor_list,
# mixer=mixer)
# RM.item_add(scanplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"take_a_uvvis_csv_q",
"kwargs": {
'sample_type':sample[i],
'spectrum_type':'Corrected Sample',
'correction_type':'Dark',
'pump_list':pump_list,
'precursor_list':precursor_list,
'mixer':mixer},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})

# ## 4-2. Take a Absorption spectra to check reaction
# zmq_single_request(
# method='queue_item_add',
# params={
# 'item':{
# "name":"take_a_uvvis_csv_q",
# "kwargs": {
# 'sample_type':sample[i],
# 'spectrum_type':'Absorbtion',
# 'correction_type':'Reference',
# 'pump_list':pump_list,
# 'precursor_list':precursor_list,
# 'mixer':mixer},
# "item_type":"plan"},
# 'pos': pos,
# 'user_group':'primary',
# 'user':'chlin'})

#### Kafka check data here.

## 5. Sleep for 5 seconds for Kafak to check good/bad data

# restplan = BPlan('sleep_sec_q', 2)
# RM.item_add(restplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
"args":[2],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
restplan = BPlan('sleep_sec_q', 2)
RM.item_add(restplan, pos=pos)




## 6. Start xray_uvvis bundle plan to take real data

# scanplan = BPlan('xray_uvvis_plan', pe2c, qepro, )

scanplan = BPlan('xray_uvvis_plan', pe2c, qepro,
num_abs=num_abs,
num_flu=num_flu,
sample_type=sample[i],
spectrum_type='Absorbtion',
correction_type='Reference',
pump_list=pump_list,
precursor_list=precursor_list,
mixer=mixer)
RM.item_add(scanplan, pos=pos)

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"xray_uvvis_plan",
"args":['pe2c', 'qepro'],
"kwargs":{
'num_abs':num_abs,
'num_flu':num_flu,
'sample_type':sample[i],
'spectrum_type':'Absorbtion',
'correction_type':'Reference',
'pump_list':pump_list,
'precursor_list':precursor_list,
'mixer':mixer},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})



###### Kafka analyze data here. #######


## 7. Wash the loop and mixer
### 7-1. Stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
"args": [pump_list],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})
wash_tube_queue(pump_list, wash_tube, rate_unit,
pos=[pos,pos,pos,pos,pos],
zmq_control_addr=zmq_control_addr,
zmq_info_addr=zmq_info_addr)


# 8. stop infuese for all pumps
flowplan = BPlan('stop_group', pump_list)
RM.item_add(flowplan, pos=pos)






def wash_tube_queue(pump_list, wash_tube, rate_unit,
pos=[0,1,2,3,4],
zmq_control_addr='tcp://localhost:60615',
zmq_info_addr='tcp://localhost:60625'):

RM = REManagerAPI(zmq_control_addr=zmq_control_addr, zmq_info_addr=zmq_info_addr)

### Stop all infusing pumps
flowplan = BPlan('stop_group', pump_list)
RM.item_add(flowplan, pos=pos[0])


### Set up washing tube/loop
flowplan = BPlan('set_group_infuse2', [wash_tube[0]], [wash_tube[1]],
rate_list=[wash_tube[2]],
target_vol_list=['30 ml'],
set_target_list=[False],
rate_unit=rate_unit)
RM.item_add(flowplan, pos=pos[1])


### Start washing tube/loop
flowplan = BPlan('start_group_infuse', [wash_tube[1]], [wash_tube[2]])
RM.item_add(flowplan, pos=pos[2])


### Wash loop/tube for xxx seconds
restplan = BPlan('sleep_sec_q', [wash_tube[3]])
RM.item_add(restplan, pos=pos[3])



### Stop washing
flowplan = BPlan('stop_group', [wash_tube[1]])
RM.item_add(flowplan, pos=pos[4])


### 7-2. Set infuse rate for washing loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
"args": [[wash_tube[0]], [wash_tube[1]]],
"kwargs": {
"rate_list":[wash_tube[2]],
"target_vol_list":['30 ml'],
"set_target_list":[False],
"rate_unit":rate_unit},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})


### 7-3. Start to wash loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
"args": [[wash_tube[1]], [wash_tube[2]]],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})


### 7-4. Wash loop/tube for xxx seconds
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
"args":[wash_tube[3]],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})


### 7-5. stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
"args": [[wash_tube[1]]],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})



# 8. stop infuese for all pumps
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
"args": [pump_list],
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})



Expand Down
Loading

0 comments on commit 6642ee2

Please sign in to comment.