Skip to content

Commit

Permalink
update xlsx_to_inputs() into Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 5, 2024
1 parent cbe1912 commit b1f045f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 61 deletions.
12 changes: 6 additions & 6 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def _kafka_process():
'iq_to_gr', 'iq_to_gr_path', 'cfg_fn', 'bkg_fn', 'iq_fn',
'search_and_match', 'mystery_path', 'results_path',
'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn',
'use_sandbox', 'write_to_sandbox', 'sandbox_tiled_client', 'tiled_client',
'use_sandbox', 'write_to_sandbox', 'sandbox_uri', 'sandbox_tiled_client', 'tiled_client',
'fn_TBD',
'entry', 'iq_q', 'iq_I', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream',
'entry', 'iq_data', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream',
''
]

Expand Down Expand Up @@ -85,10 +85,10 @@ def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'):
agent_data_path=self.inputs.agent_data_path[0])
)

## self.inputs.sandbox_tiled_client[0] is just the uri of sandbox
## so, turn uri into client and append it in self.inputs.sandbox_tiled_client
if type(self.inputs.sandbox_tiled_client[0]) is str:
self.inputs.sandbox_tiled_client.append(from_uri(self.inputs.sandbox_tiled_client[0]))
## self.inputs.sandbox_uri[0] is just the uri of sandbox
## so, turn uri into client and assign it to self.inputs.sandbox_tiled_client
if type(self.inputs.sandbox_uri[0]) is str:
self.inputs.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0])


## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn
Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
16 changes: 10 additions & 6 deletions scripts/kafka_consumer_iterate_1LL09_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ def print_kafka_messages(beamline_acronym, kin=kin, qin=qin, RM=RM, ):
f'{bool(kin.write_to_sandbox[0]) = }\n'
f'{qin.zmq_control_addr[0] = }')

## Append raw data tiled_client
kin.tiled_client.append(beamline_acronym)
kin.tiled_client.append(from_profile(beamline_acronym))
## Assignt raw data tiled clients
kin.tiled_client.append = from_profile(beamline_acronym)

## Append good/bad data folder to csv_path
kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad'))
Expand Down Expand Up @@ -177,19 +176,24 @@ def print_message(consumer, doctype, doc,
time.sleep(2)

## Obtain uid from message['run_start']
uid = message['run_start']
print(f'\n**** start to export uid: {uid} ****\n')
kin.uid = message['run_start']
kin.uid_catalog.append(kin.uid)
print(f'\n**** start to export uid: {kin.uid} ****\n')
# print(list(message['num_events'].keys())[0])

## Put stream name of scans into stream_list
stream_list = list(message['num_events'].keys())
kin.stream_list = []
for stream_name in syringe_list:
kin.stream_list.append(stream_name)


//////////////////////////////////////////////////////////////////////////////////////////////////
## Set good/bad data condictions to the corresponding sample
kh = kin.key_height[0]
hei = kin.height[0]
dis = kin.distance[0]

//////////////////////////////////////////////////////////////////////////////////////////////////
## obtain phase fraction & particle size from g(r)
if 'scattering' in stream_list:
if fitting_pdf:
Expand Down
100 changes: 51 additions & 49 deletions scripts/kafka_consumer_iterate_XPD_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,10 @@ def print_kafka_messages(beamline_acronym_01, beamline_acronym_02, kin=kin, qin=
f'{bool(kin.write_to_sandbox[0]) = }\n'
f'{qin.zmq_control_addr[0] = }')

## Append raw data & analysis data tiled clients
kin.tiled_client.append(beamline_acronym_01)
kin.tiled_client.append(from_profile(beamline_acronym_01))
## Assignt raw data tiled clients
kin.tiled_client.append = from_profile(beamline_acronym_01)
## 'xpd-analysis' is not a catalog name so can't be accessed in databroker

kin.sandbox_tiled_client.append(from_uri(kin.sandbox_tiled_client[0]))

## Append good/bad data folder to csv_path
kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad'))

Expand Down Expand Up @@ -163,7 +160,7 @@ def print_message(consumer, doctype, doc,
if 'sample_type' in message.keys():
print(f"sample type: {message['sample_type']}")

## Reset kin.uid to as empty list
## Reset kin.uid to an empty list
kin.uid = []


Expand All @@ -178,12 +175,14 @@ def print_message(consumer, doctype, doc,

iq_I_uid = doc[1]['data']['chi_I']
kin.uid_pdfstream.append(iq_I_uid)
kin.entry.append(sandbox_tiled_client[iq_I_uid])
df = kin.entry[-1].read()
kin.iq_Q.append(df['chi_Q'].to_numpy())
kin.iq_I.append(df['chi_I'].to_numpy())
kin.entry = kin.sandbox_tiled_client[iq_I_uid]
df = kin.entry.read()
# Before appending I(Q) data, reset kin.iq_data as an empty list
kin.iq_data = []
kin.iq_data.append(df['chi_Q'].to_numpy())
kin.iq_data.append(df['chi_I'].to_numpy())

## Reset kin.uid to as empty list
## Reset kin.uid to an empty list
kin.uid = []


Expand All @@ -202,6 +201,7 @@ def print_message(consumer, doctype, doc,
# RM.item_add(inst1, pos='front')
## wait 1 second for databroker to save data
time.sleep(1)
## Reset kin.uid to an empty list
kin.uid = []


Expand All @@ -216,9 +216,10 @@ def print_message(consumer, doctype, doc,
)
## wait 1 second for databroker to save data
time.sleep(1)
kin.uid = kin.entry[-1].metadata['run_start']
kin.uid = kin.entry.metadata['run_start']
kin.uid_catalog.append(kin.uid)
stream_list = kin.tiled_client[-1][kin.uid].metadata['summary']['stream_names']
stream_list = kin.tiled_client[kin.uid].metadata['summary']['stream_names']
## Reset kin.stream_list to an empty list
kin.stream_list = []
for stream_name in syringe_list:
kin.stream_list.append(stream_name)
Expand All @@ -243,62 +244,63 @@ def print_message(consumer, doctype, doc,
kin.uid = message['run_start']
kin.uid_catalog.append(kin.uid)
stream_list = list(message['num_events'].keys())
## Reset kin.stream_list to an empty list
kin.stream_list = []
for stream_name in syringe_list:
kin.stream_list.append(stream_name)

/////////////////////////////////////////////////////////////////////////////////////////////////

## When uid is assigned and type is a string, move to data fitting, calculation
if (name == 'stop') and (type(uid) is str):
print(f'\n**** start to export uid: {uid} ****\n')
print(f'\n**** with stream name in {stream_list} ****\n')


################## (name == 'stop') and (type(kin.uid) is str) ####################
## ##
## When uid is assigned and type is a string, move to data fitting, calculation ##
## ##
#####################################################################################
if (name == 'stop') and (type(kin.uid) is str):
print(f'\n**** start to export uid: {kin.uid} ****\n')
print(f'\n**** with stream name in {kin.stream_list} ****\n')

## Set good/bad data condictions to the corresponding sample
try:
kh = key_height[len(finished)]
hei = height[len(finished)]
dis = distance[len(finished)]
except IndexError:
kh = key_height[0]
hei = height[0]
dis = distance[0]
kh = kin.key_height[0]
hei = kin.height[0]
dis = kin.distance[0]

## obtain phase fraction & particle size from g(r)
if 'scattering' in stream_list:
# Get metadat from stream_name fluorescence for plotting
if 'scattering' in kin.stream_list:
# Get metadata from stream_name fluorescence for plotting
qepro_dic, metadata_dic = de.read_qepro_by_stream(
uid, stream_name='fluorescence', data_agent='tiled',
kin.uid, stream_name='fluorescence', data_agent='tiled',
beamline_acronym=beamline_acronym_01)
u = plot_uvvis(qepro_dic, metadata_dic)

if use_sandbox:
iq_df = np.asarray([iq_Q, iq_I])
iq_df2 = pd.DataFrame()
iq_df2['q'] = iq_Q
iq_df2['I(q)'] = iq_I
if kin.use_sandbox[0]:
iq_array = np.asarray([kin.iq_data[0], kin.iq_data[1]])
kin.iq_data.append(iq_array)

iq_df = pd.DataFrame()
iq_df['q'] = kin.iq_data[0]
iq_df['I(q)'] = kin.iq_data[1]
kin.iq_data.append(iq_df)

# ### CsPbBr2 test
# iq_df = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T
# iq_df2 = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ')

else:
pass
# iq_df = iq_fn[0]
# iq_array = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ').to_numpy().T
# kin.iq_data.append(iq_array)
# iq_df = pd.read_csv(iq_fn, skiprows=1, names=['q', 'I(q)'], sep=' ')
# kin.iq_data.append(iq_df)


if iq_to_gr:
if kin.iq_to_gr[0]:
# Grab metadat from stream_name = fluorescence for naming gr file
fn_uid = de._fn_generator(uid, beamline_acronym=beamline_acronym_01)
fn_uid = de._fn_generator(kin.uid, beamline_acronym=beamline_acronym_01)
gr_fn = f'{fn_uid}_scattering.gr'

# ### CsPbBr2 test
# gr_fn = f'{iq_fn[:-4]}.gr'

# Build pdf config file from a scratch
pdfconfig = PDFConfig()
pdfconfig.readConfig(cfg_fn)
pdfconfig.backgroundfiles = bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, iq_df, output_dir=gr_path,
pdfconfig.readConfig(kin.cfg_fn[-1])
pdfconfig.backgroundfiles = kin.bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, iq_array, output_dir=gr_path,
plot_setting={'marker':'.','color':'green'}, test=True,
gr_fn=gr_fn)
gr_data = sqfqgr_path['gr']
Expand Down Expand Up @@ -365,7 +367,7 @@ def print_message(consumer, doctype, doc,
pdf_property={'Br_ratio': np.nan, 'Br_size': np.nan}

if iq_to_gr:
u.plot_iq_to_gr(iq_df, gr_df.to_numpy().T, gr_fit=gr_fit)
u.plot_iq_to_gr(iq_array, gr_df.to_numpy().T, gr_fit=gr_fit)
## remove 'scattering' from stream_list to avoid redundant work in next for loop
stream_list.remove('scattering')

Expand Down Expand Up @@ -616,7 +618,7 @@ def print_message(consumer, doctype, doc,
df['fluorescence_fitting'] = f_fit(x0, *popt)

## use pd.concat to add various length data together
df_new = pd.concat([df, iq_df2, gr_df, gr_fit_df], ignore_index=False, axis=1)
df_new = pd.concat([df, iq_df, gr_df, gr_fit_df], ignore_index=False, axis=1)

# entry = sandbox_tiled_client.write_dataframe(df, metadata=agent_data)
entry = sandbox_tiled_client.write_dataframe(df_new, metadata=agent_data)
Expand Down

0 comments on commit b1f045f

Please sign in to comment.