Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CADC-10810 - hdf5 support for TAOSII #187

Merged
merged 42 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
53e43b5
CADC-12805 - stop partial Function creation from keywords, so that Ra…
SharonGoliath Oct 30, 2023
6e3b94b
CADC-12805 - interim commit.
SharonGoliath Nov 2, 2023
e982c3e
CADC-12805 - interim commit.
SharonGoliath Nov 3, 2023
bced568
CADC-12805 - interim commit - clean up finished, regression testing t…
SharonGoliath Nov 5, 2023
e82a5e6
CADC-12805 - interim commit.
SharonGoliath Nov 5, 2023
ad79c3a
CADC-12805 - remove dependency on deprecated imp package.
SharonGoliath Nov 6, 2023
d8140fe
CADC-12805 - remove dependency that is not installed in test environm…
SharonGoliath Nov 6, 2023
cbcbb83
CADC-12805 - flake8.
SharonGoliath Nov 7, 2023
5fcbb7e
CADC-12805 - flake8.
SharonGoliath Nov 7, 2023
5ac4790
CADC-12805 - code review comments.
SharonGoliath Nov 16, 2023
7809eb1
CADC-12805 - bump versions.
SharonGoliath Nov 16, 2023
2dafefc
CADC-12805 - remove 'import imp' from setup.py
SharonGoliath Nov 17, 2023
4d6886e
CADC-12805 - update the default resource id.
SharonGoliath Nov 21, 2023
8a14c35
Merge branch 'main' into CADC-12805
SharonGoliath Nov 23, 2023
cb48cd5
CADC-12858 - comma in keywords for splitting
SharonGoliath Jan 3, 2024
bad24bf
CADC-12858 - add WcsParser handling for non-file based content.
SharonGoliath Jan 6, 2024
73624e6
CADC-12858/CADC-13010 - refactor triggered by LoTSS collection, which…
SharonGoliath Jan 8, 2024
261d3c8
CADC-12858 - addresss code review comments.
SharonGoliath Jan 16, 2024
1afe62d
CADC-12858 - flake8
SharonGoliath Jan 16, 2024
bdc2e3f
Refactor caom2blueprint.py content into separate modules.
SharonGoliath Jan 17, 2024
142cfca
refactor_caom2utils - black.
SharonGoliath Jan 17, 2024
baa71e8
CADC-12017 - set the cardinality of parser:wcs_parser as 1:n.
SharonGoliath Jan 18, 2024
2be7de6
CADC-13017 - interim commit, add handling for extensions in apply_blu…
SharonGoliath Jan 22, 2024
6f03d77
CADC-13017 - update __init__.py
SharonGoliath Jan 22, 2024
2f1a944
CADC-13017 - test cases cover DerivedObservation.members, differening…
SharonGoliath Jan 25, 2024
5494288
Refactor comment line length.
SharonGoliath Feb 1, 2024
de2b1ab
Merge branch 'refactor_caom2utils' into CADC-13017
SharonGoliath Feb 1, 2024
f92ef93
CADC-13017 - support bz2 extensions for Content-Type.
SharonGoliath Feb 6, 2024
acaceb2
CADC-10810 - interim commit.
SharonGoliath Feb 19, 2024
b7b0edc
CADC-13223 - regression with black formatting changes.
SharonGoliath Feb 28, 2024
0367a8c
CADC-13204 - get the HDF5 test cases working.
SharonGoliath Feb 28, 2024
13368cd
CADC-10810 - interim commit.
SharonGoliath Mar 5, 2024
e0c8092
Merge branch 'main' into CADC-10810
SharonGoliath Oct 9, 2024
48fe01f
CADC-10810 - handle the TAOSII case of image and lightcurve data in t…
SharonGoliath Oct 18, 2024
b5c5e78
Merge branch 'main' into CADC-10810
SharonGoliath Feb 6, 2025
cc81abb
refactor(parsers.py): remove spurious error logging
SharonGoliath Feb 6, 2025
5aed9d5
refactor(code-review): address code review comments
SharonGoliath Feb 12, 2025
2287d4b
refactor(code-review): address code review comments
SharonGoliath Feb 12, 2025
c2fe914
Merge branch 'main' into CADC-10810
SharonGoliath Feb 12, 2025
d0af8ec
build(setup.cfg): increment version number
SharonGoliath Feb 12, 2025
58fc8ab
docs(parsers.py): add comment on the behaviour of a specialized method
SharonGoliath Feb 13, 2025
e1729f6
style(parsers.py): flake8
SharonGoliath Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion caom2utils/caom2utils/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,10 +1187,14 @@ class Hdf5ObsBlueprint(ObsBlueprint):
# lookup value starting with // means rooted at base of the hdf5 file
ob.add_attribute('Observation.target.name', '//header/object/obj_id')

# lookup value starting with / means rooted at the base of the "find_roots_here" parameter for Hdf5Parser
# lookup value starting with / means rooted at the base of one of the extension_names parameter for Hdf5Parser
# (integer) means return only the value with the index of "integer" from a list
ob.add_attribute('Chunk.position.axis.function.refCoord.coord1.pix', '/header/wcs/crpix(0)')

# lookup values starting with / and with "{}" in the path will cause the blueprint application to attempt to
# guess the extension names from the file content
ob.add_attribute('Chunk.position.axis.function.refCoord.coord1.pix', '/sitedata/site{}/header/wcs/crpix(0)')

# (integer:integer) means return only the value with the index of "integer" from a list, followed by "integer"
# from the list in the list
ob.add_attribute('Chunk.position.axis.function.cd11', '/header/wcs/cd(0:0)')
Expand Down
151 changes: 104 additions & 47 deletions caom2utils/caom2utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
self.logger.debug(f'Could not find \'{lookup}\' in caom2blueprint ' f'configuration.')

# if there's something useful as a value in the keywords, extract it
if keywords:
if keywords is not None and any(keywords):
if ObsBlueprint.needs_lookup(keywords):
# if there's a default value use it
if keywords[1]:
Expand Down Expand Up @@ -351,17 +351,18 @@
self.logger.debug(tb)
self.logger.error(e)
return result
try:
result = execute(parameter)
self.logger.debug(f'Key {key} calculated value of {result} using {value} type {type(result)}')
except Exception as e:
msg = f'Failed to execute {execute.__name__} for {key} in {self.uri}'
self.logger.error(msg)
self.logger.debug(f'Input parameter was {parameter}, value was {value}')
self._errors.append(msg)
tb = traceback.format_exc()
self.logger.debug(tb)
self.logger.error(e)
if execute:
try:
result = execute(parameter)
self.logger.debug(f'Key {key} calculated value of {result} using {value} type {type(result)}')
except Exception as e:
msg = f'Failed to execute {execute.__name__} for {key} in {self.uri}'
self.logger.error(msg)
self.logger.debug(f'Input parameter was {parameter}, value was {value}')
self._errors.append(msg)
tb = traceback.format_exc()
self.logger.debug(tb)
self.logger.error(e)
return result

def _execute_external_instance(self, value, key, extension):
Expand Down Expand Up @@ -423,6 +424,7 @@
# CFHT 2003/03/29,01:34:54
# CFHT 2003/03/29
# DDO 12/02/95
# TAOSII 2024-01-26T14:52:49Z
for dt_format in [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%f',
Expand All @@ -435,6 +437,7 @@
'%d/%m/%y',
'%d/%m/%y %H:%M:%S',
'%d-%m-%Y',
'%Y-%m-%dT%H:%M:%SZ',
]:
try:
result = datetime.strptime(from_value, dt_format)
Expand Down Expand Up @@ -545,13 +548,17 @@
if name:
prov = caom2.Provenance(name, p_version, project, producer, run_id, reference, last_executed)
ContentParser._add_keywords(keywords, current, prov)
if inputs:
if inputs is not None and any(inputs):
if isinstance(inputs, caom2.TypedSet):
for i in inputs:
prov.inputs.add(i)
else:
for i in inputs.split():
prov.inputs.add(caom2.PlaneURI(str(i)))
if isinstance(inputs, str):
for i in inputs.split():
prov.inputs.add(caom2.PlaneURI(str(i)))
else:
for i in inputs:
prov.inputs.add(caom2.PlaneURI(str(i)))

Check warning on line 561 in caom2utils/caom2utils/parsers.py

View check run for this annotation

Codecov / codecov/patch

caom2utils/caom2utils/parsers.py#L560-L561

Added lines #L560 - L561 were not covered by tests
else:
if current is not None and len(current.inputs) > 0:
# preserve the original value
Expand All @@ -572,10 +579,13 @@


class ContentParser(BlueprintParser):
def __init__(self, obs_blueprint=None, uri=None):
def __init__(self, obs_blueprint=None, uri=None, extension_start_index=0, extension_end_index=None):
super().__init__(obs_blueprint, uri)
# for those cases where the extensions of interest are not all the extensions in the original file
self._extension_start_index = extension_start_index
self._extension_end_index = extension_end_index if extension_end_index else self._get_num_parts()
self._wcs_parsers = {}
self._wcs_parsers[0] = WcsParser(obs_blueprint, extension=0)
self._set_wcs_parsers(obs_blueprint)

def _get_chunk_naxis(self, chunk, index):
chunk.naxis = self._get_from_list('Chunk.naxis', index, self.blueprint.get_configed_axes_count())
Expand All @@ -585,30 +595,36 @@
"""
return len(self._blueprint._extensions) + 1

def _set_wcs_parsers(self, obs_blueprint):
self._wcs_parsers[0] = WcsParser(obs_blueprint, extension=self._extension_start_index)

def augment_artifact(self, artifact):
"""
Augments a given CAOM2 artifact with available content information
:param artifact: existing CAOM2 artifact to be augmented
:param index: int Part name
"""
super().augment_artifact(artifact)

self.logger.debug(f'Begin content artifact augmentation for {artifact.uri}')

if self.blueprint.get_configed_axes_count() == 0:
raise TypeError(f'No WCS Data. End content artifact augmentation for ' f'{artifact.uri}.')

for index in range(0, self._get_num_parts()):
for index in range(self._extension_start_index, self._extension_end_index):
if self.add_parts(artifact, index):
part = artifact.parts[str(index)]
part.product_type = self._get_from_list('Part.productType', index)
part.meta_producer = self._get_from_list('Part.metaProducer', index=0, current=part.meta_producer)
part.meta_producer = self._get_from_list(
'Part.metaProducer', index=self._extension_start_index, current=part.meta_producer
)

# each Part has one Chunk, if it's not an empty part as determined just previously
if not part.chunks:
part.chunks.append(caom2.Chunk())
chunk = part.chunks[0]
chunk.meta_producer = self._get_from_list('Chunk.metaProducer', index=0, current=chunk.meta_producer)
chunk.meta_producer = self._get_from_list(
'Chunk.metaProducer', index=self._extension_start_index, current=chunk.meta_producer
)

self._get_chunk_naxis(chunk, index)

Expand Down Expand Up @@ -847,7 +863,8 @@
return members

def _get_axis_wcs(self, label, wcs, index):
"""Helper function to construct a CoordAxis1D instance, with all its members, from the blueprint.
"""Helper function to construct a CoordAxis1D instance, with all
it's members, from the blueprint.

:param label: axis name - must be one of 'custom', 'energy', 'time', or 'polarization', as it's used for the
blueprint lookup.
Expand Down Expand Up @@ -1460,7 +1477,7 @@

"""

def __init__(self, src, obs_blueprint=None, uri=None):
def __init__(self, src, obs_blueprint=None, uri=None, extension_start_index=0, extension_end_index=None):
"""
Ctor
:param src: List of headers (dictionary of FITS keywords:value) with one header for each extension or a FITS
Expand All @@ -1487,6 +1504,8 @@
self._errors = []
# for command-line parameter to module execution
self.uri = uri
self._extension_start_index = extension_start_index
self._extension_end_index = extension_end_index if extension_end_index is not None else self._get_num_parts()
self.apply_blueprint()

def _get_num_parts(self):
Expand Down Expand Up @@ -1845,22 +1864,19 @@
- use the astropy.wcs instance and other blueprint metadata to fill the CAOM2 record.
"""

def __init__(self, obs_blueprint, uri, h5_file, find_roots_here='sitedata'):
def __init__(self, obs_blueprint, uri, h5_file, extension_names=None, extension_start_index=0,
extension_end_index=None):
"""
:param obs_blueprint: Hdf5ObsBlueprint instance
:param uri: which artifact augmentation is based on
:param h5_file: h5py file handle
:param find_roots_here: str location where Chunk metadata starts
:param extension_names: list of str where Chunk metadata starts. There is one Part/Chunk per list entry
"""
self._file = h5_file
# where N Chunk metadata starts
self._find_roots_here = find_roots_here
# the length of the array is the number of Parts in an HDF5 file, and the values are HDF5 lookup path names.
self._extension_names = []
super().__init__(obs_blueprint, uri)
# used to set the astropy wcs info, resulting in a validated wcs that can be used to construct a valid CAOM2
# record
self._wcs_parsers = {}
# the length of the array is the number of Parts in an HDF5 file,
# and the values are HDF5 lookup path names.
self._extension_names = extension_names
super().__init__(obs_blueprint, uri, extension_start_index, extension_end_index)

def _get_num_parts(self):
"""return the number of Parts to create for a CAOM record
Expand All @@ -1871,6 +1887,13 @@
result = 1
return result

def _set_wcs_parsers(self, obs_blueprint):
# used to set the astropy wcs info, resulting in a validated wcs that can be used to construct a valid CAOM2
# record
# This method call is over-writing the default behaviour in the ContentParser class. The default behaviour
# uses the obs_blueprint. This method is called in the ContentParser constructor.
self._wcs_parsers = {}
SharonGoliath marked this conversation as resolved.
Show resolved Hide resolved

def apply_blueprint_from_file(self):
"""
Retrieve metadata from file, cache in the blueprint.
Expand All @@ -1879,7 +1902,13 @@
# h5py is an extra in this package since most collections do not require it
import h5py

individual, multi, attributes = self._extract_path_names_from_blueprint()
individual, multi, attributes, candidate_extensions = self._extract_path_names_from_blueprint()
if self._extension_names is None and len(candidate_extensions) > 0:
self._find_extension_names(candidate_extensions)
for index, _ in enumerate(self._extension_names):
self._blueprint._extensions[index] = {}
else:
self._blueprint._extensions[0] = {}
filtered_individual = [ii for ii in individual.keys() if '(' in ii]

def _extract_from_item(name, object):
Expand All @@ -1890,16 +1919,8 @@
:param name: fully-qualified HDF5 path name
:param object: what the HDF5 path name points to
"""
if name == self._find_roots_here:
for ii, path_name in enumerate(object.keys()):
# store the names and locations of the Part/Chunk metadata
temp = f'{name}/{path_name}'
self.logger.debug(f'Adding extension {temp}')
self._extension_names.append(temp)
self._blueprint._extensions[ii] = {}

# If it's the Part/Chunk metadata, capture it to extensions. Syntax of the keys described in
# Hdf5ObsBlueprint class.
# If it's the Part/Chunk metadata, capture it to extensions.
# Syntax of the keys described in Hdf5ObsBlueprint class.
for part_index, part_name in enumerate(self._extension_names):
if name.startswith(part_name) and isinstance(object, h5py.Dataset) and object.dtype.names is not None:
for d_name in object.dtype.names:
Expand Down Expand Up @@ -1973,20 +1994,54 @@
are _CAOM2_ELEMENT strings.
attributes - a dictionary of lists, keys reference expected content from the h5py.File().attrs data
structure and its keys.
extensions - a list of prefixes for identifying extensions
"""
individual = defaultdict(list)
multi = defaultdict(list)
attributes = defaultdict(list)
extensions = []
for key, value in self._blueprint._plan.items():
if ObsBlueprint.needs_lookup(value):
for ii in value[0]:
if ii.startswith('//'):
individual[ii].append(key)
elif ii.startswith('/'):
multi[ii].append(key)
if '{}' in ii:
bits = ii.split('{}')
extensions.append(bits[0])
multi[bits[1]].append(key)
else:
multi[ii].append(key)

Check warning on line 2014 in caom2utils/caom2utils/parsers.py

View check run for this annotation

Codecov / codecov/patch

caom2utils/caom2utils/parsers.py#L2014

Added line #L2014 was not covered by tests
else:
attributes[ii].append(key)
return individual, multi, attributes

return individual, multi, attributes, list(set(extensions))

def _find_extension_names(self, candidates):
""" if the HDF5 file has a structure where-by more than one Chunk (the equivalent of a FITS HDU extension)
is defined, try to guess that structure
"""
candidate_extension_names = []

def _extract_extension_prefixes(name, object):
"""
Function signature dictated by h5py visititems implementation. Executed for each dataset/group in an
HDF5 file.

:param name: fully-qualified HDF5 path name
:param object: what the HDF5 path name points to
"""
for part_name in candidates:
y = part_name.replace('/', '', 1)
if name.startswith(y):
x = name.split(y)[1].split('/')
temp = f'{y}{x[0]}'
candidate_extension_names.append(temp)
self._extension_names = list(sorted(set(candidate_extension_names)))

self._file.visititems(_extract_extension_prefixes)
msg = '\n'.join(ii for ii in self._extension_names)
self.logger.info(f'Found extension_names:\n{msg}')

def apply_blueprint(self):
self.logger.debug('Begin apply_blueprint')
Expand Down Expand Up @@ -2015,6 +2070,7 @@
else:
exts[extension][key] = self._execute_external_instance(value, key, extension)

# apply overrides
# blueprint already contains all the overrides, only need to make sure the overrides get applied to all the
# extensions
for extension in exts:
Expand All @@ -2032,6 +2088,7 @@
exts[extension][key] = value
self.logger.debug(f'{key}: set to {value} in extension {extension}')

# apply defaults
# if no values have been set by file lookups, function execution, or applying overrides, apply defaults,
# including to all extensions
for key, value in plan.items():
Expand All @@ -2054,7 +2111,7 @@
return

def augment_artifact(self, artifact):
for ii in range(0, self._get_num_parts()):
for ii in range(self._extension_start_index, self._extension_end_index):
# one WCS parser per Part/Chunk
self._wcs_parsers[ii] = Hdf5WcsParser(self._blueprint, ii)
super().augment_artifact(artifact)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ Plane.dataProductType = timeseries
Plane.calibrationLevel = 2
Chunk.position.axis.function.dimension.naxis1 = 1920
Chunk.position.axis.function.dimension.naxis2 = 4608
Chunk.position.axis.function.refCoord.coord1.pix = [/header/wcs/crpix(0)]
Chunk.position.axis.function.refCoord.coord1.val = [/header/wcs/crval(0)]
Chunk.position.axis.function.refCoord.coord2.pix = [/header/wcs/crpix(1)]
Chunk.position.axis.function.refCoord.coord2.val = [/header/wcs/crval(1)]
Chunk.position.axis.axis1.ctype = [/header/wcs/ctype(0)]
Chunk.position.axis.axis1.cunit = [/header/wcs/cunit(0)]
Chunk.position.axis.axis2.ctype = [/header/wcs/ctype(1)]
Chunk.position.axis.axis2.cunit = [/header/wcs/cunit(1)]
Chunk.position.axis.function.cd11 = [/header/wcs/cd(0:0)]
Chunk.position.axis.function.cd12 = [/header/wcs/cd(0:1)]
Chunk.position.axis.function.cd21 = [/header/wcs/cd(1:0)]
Chunk.position.axis.function.cd22 = [/header/wcs/cd(1:1)]
Chunk.position.axis.function.refCoord.coord1.pix = [/sitedata/site{}/header/wcs/crpix(0)]
Chunk.position.axis.function.refCoord.coord1.val = [/sitedata/site{}/header/wcs/crval(0)]
Chunk.position.axis.function.refCoord.coord2.pix = [/sitedata/site{}/header/wcs/crpix(1)]
Chunk.position.axis.function.refCoord.coord2.val = [/sitedata/site{}/header/wcs/crval(1)]
Chunk.position.axis.axis1.ctype = [/sitedata/site{}/header/wcs/ctype(0)]
Chunk.position.axis.axis1.cunit = [/sitedata/site{}/header/wcs/cunit(0)]
Chunk.position.axis.axis2.ctype = [/sitedata/site{}/header/wcs/ctype(1)]
Chunk.position.axis.axis2.cunit = [/sitedata/site{}/header/wcs/cunit(1)]
Chunk.position.axis.function.cd11 = [/sitedata/site{}/header/wcs/cd(0:0)]
Chunk.position.axis.function.cd12 = [/sitedata/site{}/header/wcs/cd(0:1)]
Chunk.position.axis.function.cd21 = [/sitedata/site{}/header/wcs/cd(1:0)]
Chunk.position.axis.function.cd22 = [/sitedata/site{}/header/wcs/cd(1:1)]
Chunk.position.equinox = [//header/object/epoch]
Chunk.position.axis.error1.syser = None
Chunk.position.axis.error1.rnder= None
Expand Down
17 changes: 17 additions & 0 deletions caom2utils/caom2utils/tests/test_fits2caom2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,23 @@ def test_generic_parser1():
assert test_parser._blueprint._plan[test_key] == test_value, 'original value over-ridden'


def test_generic_parser_imported_module_error_handling():
# this test exercises the error handling code for executing functions defined by blueprints
test_key = 'Plane.metaRelease'
test_key_2 = 'Plane.dataRelease'
test_value = '2013-10-10'
test_blueprint = ObsBlueprint()
test_blueprint.set(test_key, '2013-10-10')
# pick __sizeof__ as an attribute that will fail to execute for any module
test_blueprint.set(test_key_2, '__sizeof__()')
test_parser = BlueprintParser()
assert test_parser._blueprint._plan[test_key] == (['RELEASE', 'REL_DATE'], None), 'default value changed'
test_parser.blueprint = test_blueprint
assert test_parser._blueprint._plan[test_key] == test_value, 'original value over-ridden'
test_result = test_parser._execute_external('__sizeof__(uri)', test_key_2, 0)
assert test_result == '', 'wrong result'


def test_get_external_headers():
test_uri = 'http://localhost/obs23/collection/obsid-1'
with patch('requests.Session.get') as session_get_mock:
Expand Down
Loading