Skip to content

Commit

Permalink
add more comments into kafka plan
Browse files Browse the repository at this point in the history
  • Loading branch information
cheng-hung committed Aug 1, 2024
1 parent d18dc12 commit 48b0e3a
Show file tree
Hide file tree
Showing 3 changed files with 791 additions and 125 deletions.
161 changes: 36 additions & 125 deletions scripts/kafka_consumer_iterate_1LL09_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
# import _pdf_calculator as pc
import _LDRD_Kafka as LK

# from bluesky_queueserver.manager.comms import zmq_single_request
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst

Expand All @@ -40,112 +39,31 @@
plt.ion()
plt.rcParams["figure.raise_window"] = False

## Input varaibales: read from inputs_qserver_kafka.xlsx
xlsx_fn = '/home/xf28id2/Documents/ChengHung/inputs_qserver_kafka_v2.xlsx'

## Input varaibales for Qserver, reading from xlsx_fn by given sheet name
qserver_process = LK.xlsx_to_inputs(LK._qserver_inputs(), xlsx_fn=xlsx_fn, sheet_name='qserver_1LL09')
qin = qserver_process.inputs

## Input varaibales for Kafka, reading from xlsx_fn by given sheet name
kafka_process = LK.xlsx_to_inputs(LK._kafka_process(), xlsx_fn=xlsx_fn, sheet_name='kafka_process')
kin = kafka_process.inputs

# ##################################################################
# # Define namespace for tasks in Qserver and Kafa
# dummy_kafka = bool(input_dic['dummy_test'][0])
# dummy_qserver = bool(input_dic['dummy_test'][1])
# csv_path = input_dic['csv_path'][0]
# key_height = input_dic['key_height']
# height = input_dic['height']
# distance = input_dic['distance']
# pump_list = input_dic['pump_list']
# precursor_list = input_dic['precursor_list']
# syringe_mater_list = input_dic['syringe_mater_list']
# syringe_list = input_dic['syringe_list']
# target_vol_list = input_dic['target_vol_list']
# set_target_list = input_dic['set_target_list']
# infuse_rates = input_dic['infuse_rates']
# sample = input_dic['sample']
# mixer = input_dic['mixer']
# wash_tube = input_dic['wash_tube']
# resident_t_ratio = input_dic['resident_t_ratio']
# PLQY = input_dic['PLQY']
# prefix = input_dic['prefix']
# num_uvvis = input_dic['num_uvvis']
# ###################################################################
# ## Add tasks into Qsever
# # zmq_control_addr='tcp://xf28id2-ca2:60615',
# # zmq_info_addr='tcp://xf28id2-ca2:60625'
# # zmq_control_addr='tcp://localhost:60615'
# zmq_control_addr='tcp://localhost:60615'
# zmq_info_addr='tcp://localhost:60625'

## Define RE Manager API as RM
RM = REManagerAPI(zmq_control_addr=qin.zmq_control_addr[0], zmq_info_addr=qin.zmq_info_addr[0])

## Import Qserver parameters to RE Manager
import _synthesis_queue_RM as sq
sq.synthesis_queue_xlsx(qserver_1LL09)
sq.synthesis_queue_xlsx(qin)

## Auto name samples by prefix
if qin.name_by_prefix[0]:
sample = de._auto_name_sample(qin.infuse_rates, prefix=qin.prefix)
print(f'Sample: {sample}')

# rate_label_dic = {'CsPb':'infusion_rate_CsPb',
# 'Br':'infusion_rate_Br',
# 'ZnI':'infusion_rate_I2',
# 'ZnCl':'infusion_rate_Cl'}

# new_points_label = ['infusion_rate_CsPb', 'infusion_rate_Br', 'infusion_rate_I2', 'infusion_rate_Cl']

# use_good_bad = True
# post_dilute = True
# write_agent_data = True
# agent_data_path = '/home/xf28id2/data_post_dilute_66mM_PF'

# USE_AGENT_iterate = False
# peak_target = 515
# if USE_AGENT_iterate:
# import torch
# from prepare_agent_pdf import build_agen
# agent = build_agen(peak_target=peak_target, agent_data_path=agent_data_path)

# iq_to_gr = False
# if iq_to_gr:
# from diffpy.pdfgetx import PDFConfig
# global gr_path, cfg_fn, iq_fn, bkg_fn
# gr_path = '/home/xf28id2/Documents/ChengHung/pdfstream_test/'
# cfg_fn = '/home/xf28id2/Documents/ChengHung/pdfstream_test/pdfgetx3.cfg'
# iq_fn = glob.glob(os.path.join(gr_path, '**CsPb**.chi'))
# # bkg_fn = glob.glob(os.path.join(gr_path, '**bkg**.chi'))
# bkg_fn = ['/nsls2/data/xpd-new/legacy/processed/xpdUser/tiff_base/Toluene_OleAcid_mask/integration/Toluene_OleAcid_mask_20240602-122852_c49480_primary-1_mean_q.chi']

# search_and_match = False
# if search_and_match:
# from updated_pipeline_pdffit2 import Refinery
# mystery_path = "/home/xf28id2/Documents/ChengHung/pdffit2_example/CsPbBr3"
# # mystery_path = "'/home/xf28id2/Documents/ChengHung/pdfstream_test/gr"
# results_path = "/home/xf28id2/Documents/ChengHung/pdffit2_example/results_CsPbBr_chemsys_search"

# fitting_pdf = False
# if fitting_pdf:
# import _pdf_calculator as pc
# global pdf_cif_dir, cif_list, gr_data
# pdf_cif_dir = '/home/xf28id2/Documents/ChengHung/pdffit2_example/CsPbBr3/'
# cif_list = [os.path.join(pdf_cif_dir, 'CsPbBr3_Orthorhombic.cif')]
# gr_data = os.path.join(pdf_cif_dir, 'CsPbBr3.gr')

# global sandbox_tiled_client
# use_sandbox = True
# if use_sandbox:
# sandbox_tiled_client = from_uri("https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox")

# write_to_sandbox = True
# if write_to_sandbox:
# sandbox_tiled_client = from_uri("https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox")


def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ):
# csv_path=csv_path,
# key_height=key_height, height=height, distance=distance,
# pump_list=pump_list, sample=sample, precursor_list=precursor_list,
# mixer=mixer, dummy_test=dummy_kafka, plqy=PLQY, prefix=prefix,
# agent_data_path=agent_data_path, peak_target=peak_target,
# rate_label_dic=rate_label_dic):

"""Print kafka message from beamline_acronym
Args:
Expand Down Expand Up @@ -182,14 +100,6 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ):
f'{bool(kin.write_to_sandbox[0]) = }\n'
f'{qin.zmq_control_addr[0] = }')

/////////////////////////////////////////////////////////////////////////

# global tiled_client, path_0, path_1
# # tiled_client = from_profile("nsls2")[beamline_acronym]["raw"]
# tiled_client = from_profile(beamline_acronym)
# path_0 = csv_path
# path_1 = csv_path + '/good_bad'

## Append raw data tiled_client
kin.tiled_client.append(beamline_acronym)
kin.tiled_client.append(from_profile(beamline_acronym))
Expand All @@ -203,24 +113,19 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ):
except FileExistsError:
pass

import palettable.colorbrewer.diverging as pld
palette = pld.RdYlGn_4_r
cmap = palette.mpl_colormap
color_idx = np.linspace(0, 1, len(sample))

# plt.figure()
# def print_message(name, doc):
def print_message(consumer, doctype, doc,
bad_data = [], good_data = [], check_abs365 = False, finished = [],
agent_iteration = [], ):
# pump_list=pump_list, sample=sample, precursor_list=precursor_list,
# mixer=mixer, dummy_test=dummy_test):

name, message = doc
# print(
# f"{datetime.datetime.now().isoformat()} document: {name}\n"
# f"document keys: {list(message.keys())}\n"
# f"contents: {pprint.pformat(message)}\n"
# )
# print(f"contents: {pprint.pformat(message)}\n")

######### While document name == 'start' ##########
## ##
## Print metadata ##
## ##
###################################################
if name == 'start':
print(
f"{datetime.datetime.now().isoformat()} documents {name}\n"
Expand All @@ -234,8 +139,8 @@ def print_message(consumer, doctype, doc,
print(f"detectors: {message['detectors']}")
if 'pumps' in message.keys():
print(f"pumps: {message['pumps']}")
# if 'detectors' in message.keys():
# print(f"detectors: {message['detectors']}")
if 'detectors' in message.keys():
print(f"detectors: {message['detectors']}")
if 'uvvis' in message.keys() and message['plan_name']!='count':
print(f"uvvis mode:\n"
f" integration time: {message['uvvis'][0]} ms\n"
Expand All @@ -253,32 +158,38 @@ def print_message(consumer, doctype, doc,
print(f"mixer: {message['mixer']}")
if 'sample_type' in message.keys():
print(f"sample type: {message['sample_type']}")



######### While document name == 'stop' ###########
## Read data from tiled, process data ##
## Plot data, Agent prediction ##
## Export data, Save data to tiled ##
###################################################
if name == 'stop':
RM.queue_stop()
print('\n*** qsever stop for data export, identification, and fitting ***\n')
print(f"{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}"
)
)
# num_events = len(message['num_events'])

## wait 2 seconds for databroker to save data
time.sleep(2)

## Obtain uid from message['run_start']
uid = message['run_start']
print(f'\n**** start to export uid: {uid} ****\n')
# print(list(message['num_events'].keys())[0])

## Put stream name of scans into stream_list
stream_list = list(message['num_events'].keys())

## Set good/bad data condictions to the corresponding sample
try:
kh = key_height[len(finished)]
hei = height[len(finished)]
dis = distance[len(finished)]
except IndexError:
kh = key_height[0]
hei = height[0]
dis = distance[0]
kh = kin.key_height[0]
hei = kin.height[0]
dis = kin.distance[0]

//////////////////////////////////////////////////////////////////////////////////////////////////
## obtain phase fraction & particle size from g(r)
if 'scattering' in stream_list:
if fitting_pdf:
Expand Down
Loading

0 comments on commit 48b0e3a

Please sign in to comment.