Skip to content

Commit

Permalink
update kafka for RM and agent
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed May 8, 2024
1 parent 6642ee2 commit 5244e49
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 185 deletions.
218 changes: 119 additions & 99 deletions scripts/_data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,14 @@ def export_qepro_by_stream(uid, csv_path, stream_name='primary', data_agent='til
print(f'Export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(csv_path)} done!')


def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled'):
def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled', beamline_acronym='xpd-ldrd20-31'):

if data_agent == 'catalog':
if data_agent == 'db':
try:
catalog
except NameError:
import databroker
# catalog = databroker.catalog['xpd-ldrd20-31']
catalog = databroker.catalog['xpd']
catalog = databroker.catalog[beamline_acronym]
run = catalog[uid]
meta = run.metadata

Expand All @@ -173,8 +172,7 @@ def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled'):
from_profile
except NameError:
from tiled.client import from_profile
# tiled_client = from_profile("xpd-ldrd20-31")
tiled_client = from_profile("xpd")
tiled_client = from_profile(beamline_acronym)

run = tiled_client[uid]
meta = run.metadata
Expand Down Expand Up @@ -203,103 +201,25 @@ def read_qepro_by_stream(uid, stream_name='primary', data_agent='tiled'):



## Convert device parameters (infuse rate) into ML parameters (non-dimensional)
def device_to_ML_parameters(metadata_dic, exp_name='ldrd-2031'):

## Get data from metadata_dic
pump_status = metadata_dic['pump_status']
precursors = metadata_dic['precursors']
infuse_rate = np.float32(metadata_dic['infuse_rate'])
infuse_rate_unit = metadata_dic['infuse_rate_unit']
## Get uids from data folders through daily experiments for export database for ML
## Read uid from **fluorescence** folder
def get_daily_uids(data_folder_list):
uid_list = []
for data_folder in data_folder_list:
subfolder = glob.glob(data_folder + '/**fluorescence')
subfolder.sort()

## Turn pump_status into boolean array
pump_status = list(map(lambda status: status.lower().capitalize() == "Infusing", pump_status))
pump_status = np.float32(pump_status)

## Generate ruc (rate unit converter) to unify rate units as infuse_rate_unit[0]
ruc = []
for i in range(len(infuse_rate_unit)):
rr = rate_unit_converter(r0=infuse_rate_unit[i], r1=infuse_rate_unit[0])
ruc.append(rr)
ruc = np.float32(ruc)

unified_rate = infuse_rate * pump_status * ruc
normalized_rate = unified_rate / unified_rate.sum()


## Arrange normalized_rate into an array as input_names
if exp_name == 'ldrd-2031':
input_names = ['CsPb', 'Br', 'ZnI', 'ZnCl', 'Toluene']
else:
input_names = exp_name

ML_parameters = np.zeros(len(input_names))
for i in range(len(input_names)):
for j in range(len(precursors)):
if input_names[i].lower() in precursors[j].lower():
ML_parameters[i] = normalized_rate[j]

return ML_parameters
for folder in subfolder:
fn = glob.glob(folder + '/**csv')[-1]
with open(fn, 'r') as f:
uid_ = f.readlines()[0]
uid = uid_.strip('\n').split(',')[-1]
uid_list.append(uid)
f.close()
return uid_list



## Convert ML parameters (non-dimensional) into device parameters (infuse rate)
## Note: Before covertion, it is recommended take a dummy scan
## to let BlueSky know the precursor list and total flow rate
def ML_to_device_parameters(ML_dof, metadata_dic, unit = 'ul/min',
check_sum = True, check_negative = True,
exp_name='ldrd-2031'):

ML_dof = np.float32(ML_dof)

_is_sum_equal_one = (ML_dof.sum() == 1.0)
_is_any_negative = (len(np.argwhere(ML_dof<0)) > 0)

if check_sum:
if _is_sum_equal_one == False:
raise ValueError(f'Sum of {ML_dof} is not equal to one.')


if check_negative:
if _is_any_negative == True:
raise ValueError(f'Found negative value in {ML_dof}.')


## Get data from metadata_dic
pump_status = metadata_dic['pump_status']
precursors = metadata_dic['precursors']
infuse_rate = np.float32(metadata_dic['infuse_rate'])
infuse_rate_unit = metadata_dic['infuse_rate_unit']

## Turn pump_status into boolean array
pump_status = list(map(lambda status: status.lower().capitalize() == "Infusing", pump_status))
pump_status = np.float32(pump_status)

## Generate ruc (rate unit converter) to unify rate units as assigned unit
ruc = []
for i in range(len(infuse_rate_unit)):
rr = rate_unit_converter(r0=infuse_rate_unit[i], r1=unit)
ruc.append(rr)
ruc = np.float32(ruc)

unified_rate = infuse_rate * pump_status * ruc
predicted_rate = np.round(ML_dof*unified_rate.sum(), decimals=2)

## Arrange ML_dof into an array as infuse_rate / infuse_rate_unit
if exp_name == 'ldrd-2031':
input_names = ['CsPb', 'Br', 'ZnI', 'ZnCl', 'Toluene']
else:
input_names = exp_name

device_parameters = np.zeros(len(infuse_rate))
for i in range(len(input_names)):
for j in range(len(precursors)):
if input_names[i].lower() in precursors[j].lower():
device_parameters[j] = predicted_rate[i]

return device_parameters


def dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name='primary', fitting=None, plqy_dic=None):
# to save fitting results for good data, fitting needs be a dict with two keys:
# fitting = {'fit_function': da._1gauss, 'curve_fit': popt}
Expand Down Expand Up @@ -603,3 +523,103 @@ def dic_to_csv(csv_path, qepro_dic, metadata_dic):
else:
fp.write(f'{x_axis_data[i]},{dark_data[i]},{sample_data[i]},{output_data[i]}\n')




## Convert ML parameters (non-dimensional) into device parameters (infuse rate)
## Note: Before covertion, it is recommended take a dummy scan
## to let BlueSky know the precursor list and total flow rate
def ML_to_device_parameters(ML_dof, metadata_dic, unit = 'ul/min',
check_sum = True, check_negative = True,
exp_name='ldrd-2031'):

ML_dof = np.float32(ML_dof)

_is_sum_equal_one = (ML_dof.sum() == 1.0)
_is_any_negative = (len(np.argwhere(ML_dof<0)) > 0)

if check_sum:
if _is_sum_equal_one == False:
raise ValueError(f'Sum of {ML_dof} is not equal to one.')


if check_negative:
if _is_any_negative == True:
raise ValueError(f'Found negative value in {ML_dof}.')


## Get data from metadata_dic
pump_status = metadata_dic['pump_status']
precursors = metadata_dic['precursors']
infuse_rate = np.float32(metadata_dic['infuse_rate'])
infuse_rate_unit = metadata_dic['infuse_rate_unit']

## Turn pump_status into boolean array
pump_status = list(map(lambda status: status.lower().capitalize() == "Infusing", pump_status))
pump_status = np.float32(pump_status)

## Generate ruc (rate unit converter) to unify rate units as assigned unit
ruc = []
for i in range(len(infuse_rate_unit)):
rr = rate_unit_converter(r0=infuse_rate_unit[i], r1=unit)
ruc.append(rr)
ruc = np.float32(ruc)

unified_rate = infuse_rate * pump_status * ruc
predicted_rate = np.round(ML_dof*unified_rate.sum(), decimals=2)

## Arrange ML_dof into an array as infuse_rate / infuse_rate_unit
if exp_name == 'ldrd-2031':
input_names = ['CsPb', 'Br', 'ZnI', 'ZnCl', 'Toluene']
else:
input_names = exp_name

device_parameters = np.zeros(len(infuse_rate))
for i in range(len(input_names)):
for j in range(len(precursors)):
if input_names[i].lower() in precursors[j].lower():
device_parameters[j] = predicted_rate[i]

return device_parameters





## Convert device parameters (infuse rate) into ML parameters (non-dimensional)
def device_to_ML_parameters(metadata_dic, exp_name='ldrd-2031'):

## Get data from metadata_dic
pump_status = metadata_dic['pump_status']
precursors = metadata_dic['precursors']
infuse_rate = np.float32(metadata_dic['infuse_rate'])
infuse_rate_unit = metadata_dic['infuse_rate_unit']

## Turn pump_status into boolean array
pump_status = list(map(lambda status: status.lower().capitalize() == "Infusing", pump_status))
pump_status = np.float32(pump_status)

## Generate ruc (rate unit converter) to unify rate units as infuse_rate_unit[0]
ruc = []
for i in range(len(infuse_rate_unit)):
rr = rate_unit_converter(r0=infuse_rate_unit[i], r1=infuse_rate_unit[0])
ruc.append(rr)
ruc = np.float32(ruc)

unified_rate = infuse_rate * pump_status * ruc
normalized_rate = unified_rate / unified_rate.sum()


## Arrange normalized_rate into an array as input_names
if exp_name == 'ldrd-2031':
input_names = ['CsPb', 'Br', 'ZnI', 'ZnCl', 'Toluene']
else:
input_names = exp_name

ML_parameters = np.zeros(len(input_names))
for i in range(len(input_names)):
for j in range(len(precursors)):
if input_names[i].lower() in precursors[j].lower():
ML_parameters[i] = normalized_rate[j]

return ML_parameters
Loading

0 comments on commit 5244e49

Please sign in to comment.