Skip to content

Commit

Permalink
Update kafka for Qserver and xrun
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD_Operator committed Apr 1, 2024
1 parent 09be901 commit 36c8aaa
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 32 deletions.
8 changes: 4 additions & 4 deletions scripts/_data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled'):
catalog
except NameError:
import databroker
catalog = databroker.catalog['xpd-ldrd20-31']
# catalog = databroker.catalog['xpd']
# catalog = databroker.catalog['xpd-ldrd20-31']
catalog = databroker.catalog['xpd']
run = catalog[uid]
meta = run.metadata

Expand All @@ -173,8 +173,8 @@ def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled'):
from_profile
except NameError:
from tiled.client import from_profile
tiled_client = from_profile("xpd-ldrd20-31")
# tiled_client = from_profile("xpd")
# tiled_client = from_profile("xpd-ldrd20-31")
tiled_client = from_profile("xpd")

run = tiled_client[uid]
meta = run.metadata
Expand Down
18 changes: 16 additions & 2 deletions scripts/_synthesis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def synthesis_queue(
pos='back',
dummy_qserver=False,
is_iteration=False,
zmq_server_address=None,
):

if name_by_prefix:
Expand Down Expand Up @@ -60,7 +61,8 @@ def synthesis_queue(
# stl = int(stl)

zmq_single_request(
method='queue_item_add',
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
Expand All @@ -80,6 +82,7 @@ def synthesis_queue(
## 2. Start infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -103,6 +106,7 @@ def synthesis_queue(
if dummy_qserver:
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -125,6 +129,7 @@ def synthesis_queue(

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"wait_equilibrium2",
Expand All @@ -140,6 +145,7 @@ def synthesis_queue(
## 4-1. Take a fluorescence peak to check reaction
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"take_a_uvvis_csv_q",
Expand Down Expand Up @@ -178,6 +184,7 @@ def synthesis_queue(
## 5. Sleep for 5 seconds for Kafak to check good/bad data
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -194,10 +201,11 @@ def synthesis_queue(
## 6. Start xray_uvvis bundle plan to take real data
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"xray_uvvis_plan",
"args":['det', 'qepro'],
"args":['pe2c', 'qepro'],
"kwargs":{
'num_abs':num_abs,
'num_flu':num_flu,
Expand All @@ -221,6 +229,7 @@ def synthesis_queue(
### 7-1. Stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -234,6 +243,7 @@ def synthesis_queue(
### 7-2. Set infuse rate for washing loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"set_group_infuse2",
Expand All @@ -252,6 +262,7 @@ def synthesis_queue(
### 7-3. Start to wash loop/tube
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -265,6 +276,7 @@ def synthesis_queue(
### 7-4. Wash loop/tube for xxx seconds
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -278,6 +290,7 @@ def synthesis_queue(
### 7-5. stop infuese
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -292,6 +305,7 @@ def synthesis_queue(
# 8. stop infuese for all pumps
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand Down
64 changes: 42 additions & 22 deletions scripts/kafka_consumer_iterate_qserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

## Input varaibales: read from inputs_qserver_kafka.xlsx
xlsx = '/home/xf28id2/Documents/ChengHung/inputs_qserver_kafka_ML.xlsx'
input_dic = de._read_input_xlsx(xlsx, sheet_name='inputs_OA')
input_dic = de._read_input_xlsx(xlsx, sheet_name='inputs_xrun')

##################################################################
# Define namespace for tasks in Qserver and Kafa
Expand All @@ -65,6 +65,7 @@
num_uvvis = input_dic['num_uvvis']
###################################################################
## Add tasks into Qsever
zmq_server_address = 'tcp://xf28id2-ca2:60615'
import _synthesis_queue as sq
sq.synthesis_queue(
syringe_list=syringe_list,
Expand All @@ -82,6 +83,7 @@
name_by_prefix=bool(prefix[0]),
num_abs=num_uvvis[0],
num_flu=num_uvvis[1],
zmq_server_address=zmq_server_address,
)

if bool(prefix[0]):
Expand All @@ -98,27 +100,27 @@

# agent_data_path = '/home/xf28id2/data_ZnCl2'
# agent_data_path = '/home/xf28id2/data_ZnI2_60mM'
agent_data_path = '/home/xf28id2/data_halide'
agent_data_path = '/home/xf28id2/Documents/ChengHung/data_halide'
# agent_data_path = '/home/xf28id2/data_dilute_halide'

# dofs = [
# DOF(description="CsPb(oleate)3", name="infusion_rate_CsPb", units="uL/min", search_bounds=(5, 110)),
# DOF(description="TOABr", name="infusion_rate_Br", units="uL/min", search_bounds=(70, 170)),
# DOF(description="ZnCl2", name="infusion_rate_Cl", units="uL/min", search_bounds=(0, 150)),
# DOF(description="ZnI2", name="infusion_rate_I2", units="uL/min", search_bounds=(0, 150)),
# ]
dofs = [
DOF(description="CsPb(oleate)3", name="infusion_rate_CsPb", units="uL/min", search_bounds=(5, 110)),
DOF(description="TOABr", name="infusion_rate_Br", units="uL/min", search_bounds=(70, 170)),
DOF(description="ZnCl2", name="infusion_rate_Cl", units="uL/min", search_bounds=(0, 150)),
DOF(description="ZnI2", name="infusion_rate_I2", units="uL/min", search_bounds=(0, 150)),
]

# objectives = [
# Objective(description="Peak emission", name="Peak", target=660, weight=10, max_noise=0.25),
# Objective(description="Peak width", name="FWHM", target="min", log=True, weight=2., max_noise=0.25),
# Objective(description="Quantum yield", name="PLQY", target="max", log=True, weight=1., max_noise=0.25),
# ]
objectives = [
Objective(description="Peak emission", name="Peak", target=660, weight=10, max_noise=0.25),
Objective(description="Peak width", name="FWHM", target="min", log=True, weight=2., max_noise=0.25),
Objective(description="Quantum yield", name="PLQY", target="max", log=True, weight=1., max_noise=0.25),
]

use_good_bad = True
USE_AGENT_iterate = False
USE_AGENT_iterate = True
peak_target = 590

write_agent_data = False
write_agent_data = True
# rate_label = ['infusion_rate_CsPb', 'infusion_rate_Br', 'infusion_rate_Cl', 'infusion_rate_I2']
# rate_label = ['infusion_rate_CsPb', 'infusion_rate_Br', 'infusion_rate_I2', 'infusion_rate_Cl']
rate_label_dic = {'CsPb':'infusion_rate_CsPb',
Expand Down Expand Up @@ -236,7 +238,7 @@ def print_message(consumer, doctype, doc,
print(f"sample type: {message['sample_type']}")

if name == 'stop':
zmq_single_request(method='queue_stop')
zmq_single_request(method='queue_stop', zmq_server_address=zmq_server_address)
print('\n*** qsever stop for data export, identification, and fitting ***\n')

print(f"{datetime.datetime.now().isoformat()} documents {name}\n"
Expand Down Expand Up @@ -426,6 +428,7 @@ def print_message(consumer, doctype, doc,
### Stop all infusing pumps
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -438,6 +441,7 @@ def print_message(consumer, doctype, doc,
### Set up washing tube/loop
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -450,6 +454,7 @@ def print_message(consumer, doctype, doc,
### Wash loop/tube for xxx seconds
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -463,6 +468,7 @@ def print_message(consumer, doctype, doc,
### Stop washing
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -472,7 +478,7 @@ def print_message(consumer, doctype, doc,
'user_group':'primary',
'user':'chlin'})

zmq_single_request(method='queue_stop')
zmq_single_request(method='queue_stop', zmq_server_address=zmq_server_address)
# zmq_single_request(method='re_abort')
agent_iteration.append(False)

Expand Down Expand Up @@ -519,6 +525,7 @@ def print_message(consumer, doctype, doc,
### Stop all infusing pumps
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -531,6 +538,7 @@ def print_message(consumer, doctype, doc,
### Set up washing tube/loop
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"start_group_infuse",
Expand All @@ -540,9 +548,17 @@ def print_message(consumer, doctype, doc,
'user_group':'primary',
'user':'chlin'})

# from bluesky_queueserver_api.zmq import REManagerAPI
# from bluesky_queueserver_api import BPlan
# RM = REManagerAPI( < address etc. >)
# plan = BPlan("start_group_infuse", [wash_tube[1]], [wash_tube[2]])
# plan.meta = ...
# RM.item_add(plan)

### Wash loop/tube for xxx seconds
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"sleep_sec_q",
Expand All @@ -556,6 +572,7 @@ def print_message(consumer, doctype, doc,
### Stop washing
zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"stop_group",
Expand All @@ -565,14 +582,15 @@ def print_message(consumer, doctype, doc,
'user_group':'primary',
'user':'chlin'})

zmq_single_request(method='queue_stop')
zmq_single_request(method='queue_stop', zmq_server_address=zmq_server_address)
# zmq_single_request(method='re_abort')

elif len(good_data) <= 2 and use_good_bad:
print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n')

zmq_single_request(
method='queue_item_add',
zmq_server_address=zmq_server_address,
params={
'item':{
"name":"take_a_uvvis_csv_q",
Expand All @@ -588,7 +606,7 @@ def print_message(consumer, doctype, doc,
'user_group':'primary',
'user':'chlin'})

zmq_single_request(method='queue_start')
zmq_single_request(method='queue_start', zmq_server_address=zmq_server_address)

elif len(good_data) > 2 and use_good_bad:
print('*** # of good data is enough so go to the next: bundle plan ***\n')
Expand All @@ -597,7 +615,8 @@ def print_message(consumer, doctype, doc,
finished.append(metadata_dic['sample_type'])
print(f'After event: good_data = {good_data}\n')
print(f'After event: finished sample = {finished}\n')
zmq_single_request(method='queue_start')
zmq_single_request(method='queue_start', zmq_server_address=zmq_server_address)
# RM.queue_start()

elif stream_name == 'fluorescence' and USE_AGENT_iterate and agent_iteration[-1]:
print('*** Add new points from agent to the fron of qsever ***\n')
Expand All @@ -623,13 +642,14 @@ def print_message(consumer, doctype, doc,
num_abs=num_uvvis[0],
num_flu=num_uvvis[1],
is_iteration=True,
zmq_server_address=zmq_server_address,
)

zmq_single_request(method='queue_start')
zmq_single_request(method='queue_start', zmq_server_address=zmq_server_address)

# elif use_good_bad:
else:
zmq_single_request(method='queue_start')
zmq_single_request(method='queue_start', zmq_server_address=zmq_server_address)


kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")
Expand Down
2 changes: 1 addition & 1 deletion scripts/kafka_consumer_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
# num_uvvis = input_dic['num_uvvis']
###################################################################

from blop import Agent, DOF, Objective
# from blop import Agent, DOF, Objective
# agent_data_path = '/home/xf28id2/data_ZnCl2'
# agent_data_path = '/home/xf28id2/data_ZnI2_60mM'
# agent_data_path = '/home/xf28id2/data_halide'
Expand Down
2 changes: 2 additions & 0 deletions scripts/tests/print_kafka_with_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
except (ImportError, AttributeError):
from nslsii.kafka_utils import _read_bluesky_kafka_config_file # nslsii >=0.7.0

import nslsii
print(f'nslsii version: {nslsii.__version__}')

def print_kafka_messages(beamline_acronym):
print(f"Listening for Kafka messages for {beamline_acronym}")
Expand Down
7 changes: 4 additions & 3 deletions startup/00-startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@

RE = RunEngine({})

db = Broker.named("xpd-ldrd20-31")
# db = Broker.named("xpd")
# db = Broker.named("xpd-ldrd20-31")
db = Broker.named("xpd")
bec = BestEffortCallback()

RE.subscribe(db.insert)
RE.subscribe(bec)
res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd-ldrd20-31")
res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd")
# res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd-ldrd20-31")



Expand Down

0 comments on commit 36c8aaa

Please sign in to comment.