Skip to content

Commit

Permalink
debug workflow at beamline for two topics
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD_Operator committed Aug 19, 2024
1 parent 588fee4 commit 4e18def
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 266 deletions.
1 change: 0 additions & 1 deletion scripts/.~lock.inputs_qserver_kafka_v2.xlsx#

This file was deleted.

214 changes: 128 additions & 86 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
import time
import pprint
import json
from scipy import integrate

import _data_export as de
from _plot_helper import plot_uvvis
import _data_analysis as da
import _pdf_calculator as pc
sq = importlib.import_module("_synthesis_queue_RM")
de = importlib.import_module("_data_export")
da = importlib.import_module("_data_analysis")
pc = importlib.import_module("_pdf_calculator")

from scipy import integrate
from diffpy.pdfgetx import PDFConfig
gp = importlib.import_module("_get_pdf")

build_agent = importlib.import_module("prepare_agent_pdf").build_agent
import torch
# from diffpy.pdfgetx import PDFConfig
# import _get_pdf as gp
from tiled.client import from_uri

from tiled.client import from_uri
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst

Expand Down Expand Up @@ -109,7 +111,6 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False
## Assign Agent to self.agent
self.agent
print('\n***** Start to initialize blop agent ***** \n')
build_agent = importlib.import_module("prepare_agent_pdf").build_agent
self.agent = build_agent(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])
Expand Down Expand Up @@ -247,7 +248,6 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):
print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])
Expand All @@ -262,7 +262,6 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):
if is_1st:
pass
else:
build_agent = importlib.import_module("prepare_agent_pdf").build_agent
self.agent = build_agent(
peak_target = peak_target,
agent_data_path = self.inputs.agent_data_path[0],
Expand Down Expand Up @@ -304,7 +303,78 @@ def macro_agent(self, qserver_process, RM, check_target=False, is_1st=False):
return new_points


def macro_01_get_iq(self, iq_I_uid):

def macro_00_print_start(self, message):
"""macro to print metadata when doc name is start and reset self.uid to an empty list
Args:
message (dict): message in RE document
"""

if 'uid' in message.keys():
print(f"uid: {message['uid']}")
if 'plan_name' in message.keys():
print(f"plan name: {message['plan_name']}")
if 'detectors' in message.keys():
print(f"detectors: {message['detectors']}")
if 'pumps' in message.keys():
print(f"pumps: {message['pumps']}")
if 'detectors' in message.keys():
print(f"detectors: {message['detectors']}")
if 'uvvis' in message.keys() and (len(message['uvvis'])==3):
print(f"uvvis mode:\n"
f" integration time: {message['uvvis'][0]} ms\n"
f" num spectra averaged: {message['uvvis'][1]}\n"
f" buffer capacity: {message['uvvis'][2]}"
)
elif 'uvvis' in message.keys() and (len(message['uvvis'])==5):
print(f"uvvis mode:\n"
f" spectrum type: {message['uvvis'][0]}\n"
f" integration time: {message['uvvis'][2]} ms\n"
f" num spectra averaged: {message['uvvis'][3]}\n"
f" buffer capacity: {message['uvvis'][4]}"
)
if 'mixer' in message.keys():
print(f"mixer: {message['mixer']}")
if 'sample_type' in message.keys():
print(f"sample type: {message['sample_type']}")

## Reset self.uid to an empty list
self.uid = []




def macro_01_stop_queue_uid(self, RM, message):
"""macro to stop queue and get raw data uid, used in kafka consumer
while taking a Uv-Vis, no X-ray data but still do analysis of pdfstream
This macro will
1. Stop queue
2. Assign raw data uid to self.uid
3. Append raw data uid to self.uid_catalog
4. Update self.stream_list
Args:
RM (REManagerAPI): Run Engine Manager API.
message (dict): message in RE document
"""
inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
## wait 1 second for databroker to save data
time.sleep(1)
self.uid = message['run_start']
self.uid_catalog.append(self.uid)
stream_list = list(message['num_events'].keys())
## Reset self.stream_list to an empty list
self.stream_list = []
for stream_name in stream_list:
self.stream_list.append(stream_name)




def macro_02_get_iq(self, iq_I_uid):
"""macro to get iq data, used in kafka consumer
whiel taking xray_uvvis_plan and analysis of pdfstream finished
Expand Down Expand Up @@ -349,7 +419,7 @@ def macro_01_get_iq(self, iq_I_uid):



def macro_02_get_uid(self):
def macro_03_get_uid(self):
"""macro to get raw data uid, used in kafka consumer
This macro will
Expand All @@ -369,33 +439,6 @@ def macro_02_get_uid(self):



def macro_03_stop_queue_uid(self, RM, message):
"""macro to stop queue and get raw data uid, used in kafka consumer
while taking a Uv-Vis, no X-ray data but still do analysis of pdfstream
This macro will
1. Stop queue
2. Assign raw data uid to self.uid
3. Append raw data uid to self.uid_catalog
4. Update self.stream_list
Args:
RM (REManagerAPI): Run Engine Manager API.
message (dict): message in RE document
"""
inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
## wait 1 second for databroker to save data
time.sleep(1)
self.uid = message['run_start']
self.uid_catalog.append(self.uid)
stream_list = list(message['num_events'].keys())
## Reset self.stream_list to an empty list
self.stream_list = []
for stream_name in stream_list:
self.stream_list.append(stream_name)



def macro_04_dummy_pdf(self):
"""macro to setup a dummy pdf data for testing, used in kafka consumer
Expand Down Expand Up @@ -426,46 +469,46 @@ def macro_04_dummy_pdf(self):



# def macro_05_iq_to_gr(self, beamline_acronym):
# """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
def macro_05_iq_to_gr(self, beamline_acronym):
"""macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
# This macro will
# 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
# 2. Read pdf config file from self.inputs.cfg_fn[-1]
# 3. Read pdf background file from self.inputs.bkg_fn[-1]
# 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
# 5. Read saved g(r) into pd.DataFrame and save again to remove the headers
# 6. Update g(r) data path and data frame to self.gr_data
# self.gr_data[0]: gr_data (path)
# self.gr_data[1]: gr_df

# Args:
# beamline_acronym (str): catalog name for tiled to access data
# """
# # Grab metadat from stream_name = fluorescence for naming gr file
# fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
# gr_fn = f'{fn_uid}_scattering.gr'

# ### dummy test, e.g., CsPbBr2
# if self.inputs.dummy_pdf[0]:
# gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# # Build pdf config file from a scratch
# pdfconfig = PDFConfig()
# pdfconfig.readConfig(self.inputs.cfg_fn[-1])
# pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
# sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
# plot_setting={'marker':'.','color':'green'}, test=True,
# gr_fn=gr_fn)
# gr_data = sqfqgr_path['gr']

# ## Remove headers by reading gr_data into pd.Dataframe and save again
# gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
# gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

# self.gr_data = []
# self.gr_data.append(gr_data)
# self.gr_data.append(gr_df)
This macro will
1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
2. Read pdf config file from self.inputs.cfg_fn[-1]
3. Read pdf background file from self.inputs.bkg_fn[-1]
4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
5. Read saved g(r) into pd.DataFrame and save again to remove the headers
6. Update g(r) data path and data frame to self.gr_data
self.gr_data[0]: gr_data (path)
self.gr_data[1]: gr_df
Args:
beamline_acronym (str): catalog name for tiled to access data
"""
# Grab metadat from stream_name = fluorescence for naming gr file
fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
gr_fn = f'{fn_uid}_scattering.gr'

### dummy test, e.g., CsPbBr2
if self.inputs.dummy_pdf[0]:
gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# Build pdf config file from a scratch
pdfconfig = PDFConfig()
pdfconfig.readConfig(self.inputs.cfg_fn[-1])
pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
plot_setting={'marker':'.','color':'green'}, test=True,
gr_fn=gr_fn)
gr_data = sqfqgr_path['gr']

## Remove headers by reading gr_data into pd.Dataframe and save again
gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

self.gr_data = []
self.gr_data.append(gr_data)
self.gr_data.append(gr_df)



Expand Down Expand Up @@ -986,18 +1029,14 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):
print('*** qsever aborted due to too many bad scans, please check setup ***\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])

RM.queue_stop()

elif (len(self.good_data) <= 2) and (self.inputs.use_good_bad[0]):
print('*** Add another fluorescence and absorption scan to the fron of qsever ***\n')

restplan = BPlan('sleep_sec_q', 5)
RM.item_add(restplan, pos=0)
print('*** Add another fluorescence scan to the front of qsever ***\n')

scanplan = BPlan('take_a_uvvis_csv_q',
sample_type=self.metadata_dic['sample_type'],
Expand All @@ -1007,6 +1046,10 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):
precursor_list=qin.precursor_list,
mixer=qin.mixer)
RM.item_add(scanplan, pos=1)

restplan = BPlan('sleep_sec_q', 5)
RM.item_add(restplan, pos=2)

RM.queue_start()

elif (len(self.good_data) > 2) and (self.inputs.use_good_bad[0]):
Expand All @@ -1021,7 +1064,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):

## Add predicted new points from ML agent into qserver
elif (stream_name == 'fluorescence') and (self.inputs.USE_AGENT_iterate[0]) and (self.agent_iteration[-1]):
print('*** Add new points from agent to the fron of qsever ***\n')
print('*** Add new points from agent to the front of qsever ***\n')

new_points = self.macro_agent(qserver_process, RM, check_target=True)

Expand All @@ -1034,7 +1077,6 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):

qin.sample = de._auto_name_sample(rate_list, prefix=qin.prefix[1:])
qin.infuse_rates = rate_list
sq = importlib.import_module("_synthesis_queue_RM")
sq.synthesis_queue_xlsx(qserver_process)

else:
Expand Down
4 changes: 2 additions & 2 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import numpy as np
import pandas as pd
# import pandas as pd
# from bluesky_queueserver.manager.comms import zmq_single_request
import _data_export as de
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst
from ophyd.sim import det, noisy_det
from _LDRD_Kafka import xlsx_to_inputs
# from _LDRD_Kafka import xlsx_to_inputs

## Pass qsever parameters by xlsx_to_inputs
## Arrange tasks of for PQDs synthesis
Expand Down
Loading

0 comments on commit 4e18def

Please sign in to comment.