Skip to content

Commit

Permalink
puting back things ommited from run_analysis.py
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvbrt committed Jan 25, 2024
1 parent 25750c3 commit 815dfeb
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 31 deletions.
6 changes: 6 additions & 0 deletions python/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def setup_run_parser(parser):
help='output benchmark results to a JSON file')
parser.add_argument('--ncpus', type=int, default=-1,
help='set number of threads')
parser.add_argument(
'--use-data-source', action='store_true', default=False,
help='use EDM4hep RDataSource to construct dataframe')
parser.add_argument(
'--use-legacy-source', action='store_true', default=False,
help='use EDM4hep Legacy RDataSource to construct dataframe')

# Internal argument, not to be used by the users
parser.add_argument('--batch', action='store_true', default=False,
Expand Down
127 changes: 97 additions & 30 deletions python/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,27 +311,52 @@ def run_rdf(rdf_module,
'''
Create RDataFrame and snapshot it.
'''
df = ROOT.RDataFrame("events", input_list)
if args.use_data_source:
LOGGER.info('Loading events through EDM4hep RDataSource...')
ROOT.gSystem.Load("libe4hsource")
if ROOT.loadEDM4hepDataSource():
LOGGER.debug('EDM4hep RDataSource loaded.')
try:
dframe = ROOT.FCCAnalyses.FromEDM4hep(input_list)
except TypeError as excp:
LOGGER.error('Unable to build dataframe using RDataSource!\n%s',
excp)
sys.exit(3)
elif args.use_legacy_source:
LOGGER.info('Loading events through legacy EDM4hep RDataSource...')
ROOT.gSystem.Load("libe4hlegacysource")
if ROOT.loadEDM4hepLegacySource():
LOGGER.debug('Legacy EDM4hep RDataSource loaded.')
try:
dframe = ROOT.FCCAnalyses.FromEDM4hepLegacy(input_list)
except TypeError as excp:
LOGGER.error('Unable to build dataframe using legacy '
'RDataSource!\n%s', excp)
sys.exit(3)
else:
dframe = ROOT.RDataFrame("events", input_list)

# limit number of events processed
if args.nevents > 0:
df = df.Range(0, args.nevents)
dframe = dframe.Range(0, args.nevents)

try:
df2 = get_element(rdf_module.RDFanalysis, "analysers")(df)
dframe2 = get_element(rdf_module.RDFanalysis, "analysers")(dframe)

branch_list = ROOT.vector('string')()
blist = get_element(rdf_module.RDFanalysis, "output")()
for bname in blist:
branch_list.push_back(bname)

df2.Snapshot("events", out_file, branch_list)
except Exception as excp:
LOGGER.error('During the execution of the analysis file exception '
'occurred:\n%s', excp)
# Registering Count before Snapshot to awoid additional event loops
count = dframe2.Count()
dframe2.Snapshot("events", out_file, branch_list)
except Exception as excp: # pylint: disable=broad-except
LOGGER.error('During the execution of the analysis script an '
'exception occurred:\n%s', excp)
sys.exit(3)

return df2.Count()
return count


# _____________________________________________________________________________
Expand Down Expand Up @@ -457,7 +482,8 @@ def run_local(rdf_module, infile_list, args):
nevents_local = 0
for filepath in infile_list:

filepath = apply_filepath_rewrites(filepath)
if not args.use_data_source and not args.use_legacy_source:
filepath = apply_filepath_rewrites(filepath)

file_list.push_back(filepath)
info_msg += f'- {filepath}\t\n'
Expand Down Expand Up @@ -700,27 +726,27 @@ def run_histmaker(args, rdf_module, anapath):
evtcounts = [] # event count of the input file
# number of events processed per process, in a potential previous step
events_processed_dict = {}
for process in process_list:
for process_name, process_dict in process_list.items():
file_list, event_list = get_process_info(
process,
process_name,
get_element(rdf_module, "prodTag"),
get_element(rdf_module, "inputDir"))
if len(file_list) == 0:
LOGGER.error('No files to process!\nAborting...')
sys.exit(3)
fraction = 1
output = process
output = process_name
chunks = 1
try:
if get_element_dict(process_list[process], 'fraction') is not None:
fraction = get_element_dict(process_list[process], 'fraction')
if get_element_dict(process_list[process], 'output') is not None:
output = get_element_dict(process_list[process], 'output')
if get_element_dict(process_list[process], 'chunks') is not None:
chunks = get_element_dict(process_list[process], 'chunks')
if get_element_dict(process_dict, 'fraction') is not None:
fraction = get_element_dict(process_dict, 'fraction')
if get_element_dict(process_dict, 'output') is not None:
output = get_element_dict(process_dict, 'output')
if get_element_dict(process_dict, 'chunks') is not None:
chunks = get_element_dict(process_dict, 'chunks')
except TypeError:
LOGGER.warning('No values set for process %s will use default '
'values!', process)
'values!', process_name)
if fraction < 1:
file_list = get_subfile_list(file_list, event_list, fraction)

Expand All @@ -730,7 +756,8 @@ def run_histmaker(args, rdf_module, anapath):
# stage)
nevents_meta = 0
for file_name in file_list:
file_name = apply_filepath_rewrites(file_name)
if not (args.use_data_source or args.use_legacy_source):
file_name = apply_filepath_rewrites(file_name)
file_list_root.push_back(file_name)
# Skip check for processed events in case of first stage
if get_element(rdf_module, "prodTag") is None:
Expand All @@ -742,16 +769,39 @@ def run_histmaker(args, rdf_module, anapath):
infile.Close()
if args.test:
break
events_processed_dict[process] = nevents_meta
info_msg = f'Add process "{process}" with:'
events_processed_dict[process_name] = nevents_meta
info_msg = f'Add process "{process_name}" with:'
info_msg += f'\n\tfraction = {fraction}'
info_msg += f'\n\tnFiles = {len(file_list_root):,}'
info_msg += f'\n\toutput = {output}\n\tchunks = {chunks}'
LOGGER.info(info_msg)

df = ROOT.ROOT.RDataFrame("events", file_list_root)
evtcount = df.Count()
res, hweight = graph_function(df, process)
if args.use_data_source:
LOGGER.info('Loading events through EDM4hep RDataSource...')
ROOT.gSystem.Load("libe4hsource")
if ROOT.loadEDM4hepDataSource():
LOGGER.debug('EDM4hep RDataSource loaded.')
try:
dframe = ROOT.FCCAnalyses.FromEDM4hep(file_list_root)
except TypeError as excp:
LOGGER.error('Unable to build dataframe using EDM4hep '
'RDataSource!\n%s', excp)
sys.exit(3)
elif args.use_legacy_source:
LOGGER.info('Loading events through EDM4hep Legacy RDataSource...')
ROOT.gSystem.Load("libe4hlegacysource")
if ROOT.loadEDM4hepLegacySource():
LOGGER.debug('EDM4hep Legacy RDataSource loaded.')
try:
dframe = ROOT.FCCAnalyses.FromEDM4hepLegacy(file_list_root)
except TypeError as excp:
LOGGER.error('Unable to build dataframe using legacy EDM4hep '
'RDataSource!\n%s', excp)
sys.exit(3)
else:
dframe = ROOT.ROOT.RDataFrame("events", file_list_root)
evtcount = dframe.Count()
res, hweight = graph_function(dframe, process_name)
results.append(res)
hweights.append(hweight)
evtcounts.append(evtcount)
Expand Down Expand Up @@ -860,6 +910,9 @@ def run(parser):
LOGGER.error('Unknow sub-command "%s"!\nAborting...')
sys.exit(3)

# Work with absolute path of the analysis script
anapath = os.path.abspath(args.anascript_path)

# Check that the analysis file exists
anapath = args.anascript_path
if not os.path.isfile(anapath):
Expand Down Expand Up @@ -900,11 +953,25 @@ def run(parser):

# Load the analysis script as a module
anapath = os.path.abspath(anapath)
LOGGER.info('Loading analysis file:\n%s', anapath)
rdf_spec = importlib.util.spec_from_file_location("rdfanalysis",
anapath)
rdf_module = importlib.util.module_from_spec(rdf_spec)
rdf_spec.loader.exec_module(rdf_module)
LOGGER.info('Loading analysis script:\n%s', anapath)
try:
rdf_spec = importlib.util.spec_from_file_location('rdfanalysis',
anapath)
rdf_module = importlib.util.module_from_spec(rdf_spec)
rdf_spec.loader.exec_module(rdf_module)
except SyntaxError as err:
LOGGER.error('Syntax error encountered in the analysis script:\n%s',
err)
sys.exit(3)

# Merge command line arguments with anascript
# Check whether to use RDataSource to load the events
use_data_source = get_element(rdf_module, "useDataSource")
if use_data_source:
args.use_data_source = True
use_legacy_source = get_element(rdf_module, "useLegacyDataSource")
if use_legacy_source:
args.use_legacy_source = True

if hasattr(rdf_module, "build_graph") and \
hasattr(rdf_module, "RDFanalysis"):
Expand Down
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ add_integration_test("examples/FCCee/vertex_lcfiplus/analysis_V0.py")
add_integration_test("e4hsource/test/histmaker_source.py")
add_integration_test("e4hsource/test/stages_source.py")
add_standalone_test("e4hsource/test/standalone.py")
add_standalone_test("e4hsource/test/standalone_legacy.py")
# add_standalone_test("e4hsource/test/standalone_legacy.py")

# TODO: make this test run in the spack build environment
#add_generic_test(build_new_case_study "tests/build_new_case_study.sh")

0 comments on commit 815dfeb

Please sign in to comment.