Skip to content

Commit

Permalink
Update workflow v2 for XPD
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD_Operator committed Sep 11, 2024
1 parent bee5c37 commit 2217309
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 52 deletions.
86 changes: 43 additions & 43 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
sq = importlib.import_module("_synthesis_queue_RM")
de = importlib.import_module("_data_export")
da = importlib.import_module("_data_analysis")
# pc = importlib.import_module("_pdf_calculator")
pc = importlib.import_module("_pdf_calculator")

# from diffpy.pdfgetx import PDFConfig
# gp = importlib.import_module("_get_pdf")
from diffpy.pdfgetx import PDFConfig
gp = importlib.import_module("_get_pdf")

build_agent = importlib.import_module("prepare_agent_pdf").build_agent
import torch
Expand Down Expand Up @@ -48,7 +48,7 @@ def _kafka_inputs():
'rate_label_dic_key', 'rate_label_dic_value', 'new_points_label',
'use_good_bad', 'post_dilute', 'fix_Br_ratio',
'write_agent_data', 'agent_data_path', 'build_agent',
'USE_AGENT_iterate', 'peak_target',
'use_1st_prediction', 'USE_AGENT_iterate', 'peak_target',
'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn',
'search_and_match', 'mystery_path', 'results_path',
'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn',
Expand Down Expand Up @@ -473,46 +473,46 @@ def macro_04_dummy_pdf(self):



# def macro_05_iq_to_gr(self, beamline_acronym):
# """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
def macro_05_iq_to_gr(self, beamline_acronym):
"""macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
# This macro will
# 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
# 2. Read pdf config file from self.inputs.cfg_fn[-1]
# 3. Read pdf background file from self.inputs.bkg_fn[-1]
# 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
# 5. Read saved g(r) into pd.DataFrame and save again to remove the headers
# 6. Update g(r) data path and data frame to self.gr_data
# self.gr_data[0]: gr_data (path)
# self.gr_data[1]: gr_df

# Args:
# beamline_acronym (str): catalog name for tiled to access data
# """
# # Grab metadat from stream_name = fluorescence for naming gr file
# fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
# gr_fn = f'{fn_uid}_scattering.gr'

# ### dummy test, e.g., CsPbBr2
# if self.inputs.dummy_pdf[0]:
# gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# # Build pdf config file from a scratch
# pdfconfig = PDFConfig()
# pdfconfig.readConfig(self.inputs.cfg_fn[-1])
# pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
# sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
# plot_setting={'marker':'.','color':'green'}, test=True,
# gr_fn=gr_fn)
# gr_data = sqfqgr_path['gr']

# ## Remove headers by reading gr_data into pd.Dataframe and save again
# gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
# gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

# self.gr_data = []
# self.gr_data.append(gr_data)
# self.gr_data.append(gr_df)
This macro will
1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
2. Read pdf config file from self.inputs.cfg_fn[-1]
3. Read pdf background file from self.inputs.bkg_fn[-1]
4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
5. Read saved g(r) into pd.DataFrame and save again to remove the headers
6. Update g(r) data path and data frame to self.gr_data
self.gr_data[0]: gr_data (path)
self.gr_data[1]: gr_df
Args:
beamline_acronym (str): catalog name for tiled to access data
"""
# Grab metadat from stream_name = fluorescence for naming gr file
fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
gr_fn = f'{fn_uid}_scattering.gr'

### dummy test, e.g., CsPbBr2
if self.inputs.dummy_pdf[0]:
gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# Build pdf config file from a scratch
pdfconfig = PDFConfig()
pdfconfig.readConfig(self.inputs.cfg_fn[-1])
pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
plot_setting={'marker':'.','color':'green'}, test=True,
gr_fn=gr_fn)
gr_data = sqfqgr_path['gr']

## Remove headers by reading gr_data into pd.Dataframe and save again
gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

self.gr_data = []
self.gr_data.append(gr_data)
self.gr_data.append(gr_df)



Expand Down
2 changes: 1 addition & 1 deletion scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def synthesis_queue_xlsx(parameter_obj):
wait_dilute = qsp.wait_dilute
if_wash = qsp.if_wash
wash_loop = qsp.wash_loop
wash_sapphire = qsp.wash_sapphire
# wash_sapphire = qsp.wash_sapphire
rate_unit = qsp.rate_unit[0]
name_by_prefix = qsp.name_by_prefix[0]
det2 = qsp.uvvis_config[0]
Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
16 changes: 9 additions & 7 deletions scripts/kafka_consumer_iterate_XPD_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import numpy as np
from tiled.client import from_uri, from_profile

# Set limit of numbers of open files
import resource
resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))

import importlib
## Add comments for all the modules
LK = importlib.import_module("_LDRD_Kafka")
sq = importlib.import_module("_synthesis_queue_RM")
de = importlib.import_module("_data_export")
Expand Down Expand Up @@ -41,19 +43,19 @@
qin = qserver_process.inputs

## Input varaibales for Kafka, reading from xlsx_fn by given sheet name
kafka_process = LK.xlsx_to_inputs(LK._kafka_inputs(), xlsx_fn=xlsx_fn, sheet_name='kafka_process', is_kafka=True)
kafka_process = LK.xlsx_to_inputs(LK._kafka_inputs(), xlsx_fn=xlsx_fn, sheet_name='kafka_test', is_kafka=True)
kin = kafka_process.inputs

## Define RE Manager API as RM
RM = REManagerAPI(zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0])

## Make the first prediction from kafka_process.agent
first_points = kafka_process.macro_agent(qserver_process, RM, check_target=False, is_1st=True)
rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio)
if kin.post_dilute[0]:
rate_list.append(sum(rate_list)*kin.post_dilute[1])

qin.infuse_rates = rate_list
if kin.use_1st_prediction[0]:
first_points = kafka_process.macro_agent(qserver_process, RM, check_target=False, is_1st=True)
rate_list = kafka_process.auto_rate_list(qin.pump_list, first_points, kin.fix_Br_ratio)
if kin.post_dilute[0]:
rate_list.append(sum(rate_list)*kin.post_dilute[1])
qin.infuse_rates = rate_list


## Import Qserver parameters to RE Manager
Expand Down
5 changes: 4 additions & 1 deletion scripts/notes/LDRD20-31_workflow_XPD.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
"id": "bb64b6aa-7054-4fe7-b626-707d84d3da2d",
"metadata": {},
"outputs": [],
"source": []
"source": [
"## Add GUI screenshots"
]
},
{
"cell_type": "markdown",
Expand Down Expand Up @@ -120,6 +122,7 @@
"\n",
"\n",
"## Try to write processed data into Sandbox\n",
"## Add .py path\n",
"~$ conda activate 2024-2.2-py310-tiled\n",
"\n",
"~$ PYTHONPATH=$BS_PYTHONPATH_310 python kafka_consumer_iterate_XPD_RM.py xpd xpd-analysis"
Expand Down

0 comments on commit 2217309

Please sign in to comment.