Skip to content

Commit

Permalink
Update for Qsever & kafka at XPD
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Apr 2, 2024
2 parents d6335aa + fecf1c5 commit e5f2918
Show file tree
Hide file tree
Showing 9 changed files with 3,549 additions and 16 deletions.
20 changes: 17 additions & 3 deletions scripts/_synthesis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def synthesis_queue(
pos='back',
dummy_qserver=False,
is_iteration=False,
zmq_server_address=None,
):

if name_by_prefix:
Expand Down Expand Up @@ -60,7 +61,8 @@ def synthesis_queue(
# stl = int(stl)

zmq_single_request(
method='queue_item_add',
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
Expand All @@ -80,6 +82,7 @@ def synthesis_queue(
## 2. Start infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -98,11 +101,12 @@ def synthesis_queue(
if len(mixer) == 1:
mixer_pump_list = [[mixer[0], *pump_list]]
elif len(mixer) == 2:
mixer_pump_list = [[mixer[0], *pump_list[:2]], [mixer[1], *pump_list]]
mixer_pump_list = [[mixer[0], *pump_list[:3]], [mixer[1], *pump_list]]

if dummy_qserver:
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -125,6 +129,7 @@ def synthesis_queue(

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"wait_equilibrium2",
Expand All @@ -140,6 +145,7 @@ def synthesis_queue(
## 4-1. Take a fluorescence peak to check reaction
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"take_a_uvvis_csv_q",
Expand Down Expand Up @@ -178,6 +184,7 @@ def synthesis_queue(
## 5. Sleep for 5 seconds for Kafak to check good/bad data
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -194,10 +201,11 @@ def synthesis_queue(
## 6. Start xray_uvvis bundle plan to take real data
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"xray_uvvis_plan",
"args":['det', 'qepro'],
"args":['pe2c', 'qepro'],
"kwargs":{
'num_abs':num_abs,
'num_flu':num_flu,
Expand All @@ -221,6 +229,7 @@ def synthesis_queue(
### 7-1. Stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -234,6 +243,7 @@ def synthesis_queue(
### 7-2. Set infuse rate for washing loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
Expand All @@ -252,6 +262,7 @@ def synthesis_queue(
### 7-3. Start to wash loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -265,6 +276,7 @@ def synthesis_queue(
### 7-4. Wash loop/tube for xxx seconds
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -278,6 +290,7 @@ def synthesis_queue(
### 7-5. stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -292,6 +305,7 @@ def synthesis_queue(
# 8. stop infuese for all pumps
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand Down
Loading

0 comments on commit e5f2918

Please sign in to comment.