Skip to content

Commit

Permalink
reportIdentifier set to uuid4
Browse files Browse the repository at this point in the history
  • Loading branch information
david-i-berry committed Nov 21, 2024
1 parent b361379 commit dbc0293
Showing 1 changed file with 59 additions and 41 deletions.
100 changes: 59 additions & 41 deletions bufr2geojson/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import re
import tempfile
from typing import Iterator, Union
import uuid

from cfunits import Units
from eccodes import (codes_bufr_new_from_file, codes_clone,
Expand All @@ -50,39 +51,44 @@

LOGGER = logging.getLogger(__name__)

# some 'constants'
# some 'constants' / env variables
SUCCESS = True
NUMBERS = (float, int, complex)
MISSING = ("NA", "NaN", "NAN", "None")
NULLIFY_INVALID = True # TODO: move to env. variable

BUFR_TABLE_VERSION = 42 # default BUFR table version
NULLIFY_INVALID = os.environ("BUFR2GEOJSON_NULLIFY_INVALID", True)
THISDIR = os.path.dirname(os.path.realpath(__file__))
RESOURCES = f"{THISDIR}{os.sep}resources"
CODETABLES = {}

ECCODES_DEFINITION_PATH = codes_definition_path()
if not os.path.exists(ECCODES_DEFINITION_PATH):
LOGGER.debug('ecCodes definition path does not exist, trying environment')
ECCODES_DEFINITION_PATH = os.environ.get('ECCODES_DEFINITION_PATH')
LOGGER.debug(f'ECCODES_DEFINITION_PATH: {ECCODES_DEFINITION_PATH}')
if ECCODES_DEFINITION_PATH is None:
raise EnvironmentError('Cannot find ecCodes definition path')

TABLEDIR = Path(ECCODES_DEFINITION_PATH) / 'bufr' / 'tables' / '0' / 'wmo'

# ToDo - read preferred units from config file
# PREFERRED UNITS
PREFERRED_UNITS = {
"K": "Celsius",
"Pa": "hPa"
}

with open(f"{RESOURCES}{os.sep}031021.json") as fh:
ASSOCIATED_FIELDS = json.load(fh)
# The following is required as the code table from ECMWF is incomplete
# and that from github/wmo-im not very usable.
try:
with open(f"{RESOURCES}{os.sep}031021.json") as fh:
ASSOCIATED_FIELDS = json.load(fh)
except Exception as e:
LOGGER.error(f"Error loading associated field table (031021) - {e}")
raise e

# list of BUFR attributes
ATTRIBUTES = ['code', 'units', 'scale', 'reference', 'width']

# Dictionary to store attributes for each element, caching is more
# efficient
_ATTRIBUTES_ = {}

# list of ecCodes keys for BUFR headers
Expand All @@ -97,6 +103,7 @@

UNEXPANDED_DESCRIPTORS = ["unexpandedDescriptors"]

# list of headers added by ECMWF and ecCodes
ECMWF_HEADERS = ["rdb", "rdbType", "oldSubtype", "localYear", "localMonth",
"localDay", "localHour", "localMinute", "localSecond",
"rdbtimeDay", "rdbtimeHour", "rdbtimeMinute",
Expand Down Expand Up @@ -174,7 +181,8 @@ def __init__(self, raise_on_error=False):

self.raise_on_error = raise_on_error

# dict to store qualifiers in force and for accounting
# dict to store qualifiers in force and for accounting, strictly only
# those < 9 remain in force but some others in practice are assumed to
self.qualifiers = {
"01": {}, # identification
"02": {}, # instrumentation
Expand Down Expand Up @@ -208,28 +216,29 @@ def set_qualifier(self, fxxyyy: str, key: str, value: Union[NUMBERS],
:returns: None
"""

# get class of descriptor
xx = fxxyyy[1:3]
# first check whether the value is None, if so remove and exit
if [value, description] == [None, None]:
if key in self.qualifiers[xx]:
del self.qualifiers[xx][key]
else:
if key in self.qualifiers[xx] and append:
self.qualifiers[xx][key]["value"] = \
[self.qualifiers[xx][key]["value"], value]
try:
# get class of descriptor
xx = fxxyyy[1:3]
# first check whether the value is None, if so remove and exit
if [value, description] == [None, None]:
if key in self.qualifiers[xx]:
del self.qualifiers[xx][key]
else:
self.qualifiers[xx][key] = {
"code": fxxyyy,
"key": key,
"value": value,
"attributes": attributes,
"description": description
}

def set_time_displacement(self, key, value, append=False):
raise NotImplementedError()
if key in self.qualifiers[xx] and append:
self.qualifiers[xx][key]["value"] = \
[self.qualifiers[xx][key]["value"], value]
else:
self.qualifiers[xx][key] = {
"code": fxxyyy,
"key": key,
"value": value,
"attributes": attributes,
"description": description
}
except Exception as e:
LOGGER.error(f"Error in BUFRParser.set_qualifier: {e}")
if self.raise_on_error:
raise e

def get_qualifier(self, xx: str, key: str, default=None) -> Union[NUMBERS]:
"""
Expand All @@ -253,24 +262,25 @@ def get_qualifier(self, xx: str, key: str, default=None) -> Union[NUMBERS]:

return value

def get_qualifiers(self) -> list:
def get_qualifiers(self) -> dict:
"""
Function to return all qualifiers set (excluding special qualifiers
such as date and time)
:returns: List containing qualifiers, their values and units
:returns: Dictionary containing qualifiers, their values and units -
grouped by class.
"""

classes = ("01", "02", "03", "04", "05", "06",
"07", "08", "22", "25", "31", "33", "35")
classes = list(self.qualifiers.keys())

identification = {}
wigos_md = {}
qualifiers = {}
processing = {}
monitoring = {}
quality = {}
associated_field = {}

result = list()
# name, value, units
for c in classes:
for k in self.qualifiers[c]:
Expand All @@ -284,15 +294,17 @@ def get_qualifiers(self) -> list:
if c in ("04", "05", "06"): # , "07"):
LOGGER.warning(f"Unhandled location information {k}")
# now remaining qualifiers
name = k
value = self.qualifiers[c][k]["value"]
units = self.qualifiers[c][k]["attributes"]["units"]
description = self.qualifiers[c][k]["description"]
try:
description = strip2(description)
except AttributeError:
pass
except Exception as e:
LOGGER.error(f"{e}")

# set the qualifier value, result depends on type
if units in ("CODE TABLE", "FLAG TABLE"):
q = {
"value": value.copy()
Expand All @@ -306,6 +318,7 @@ def get_qualifiers(self) -> list:
"description": description
}

# now assign to type of qualifier
if c == "01":
identification[k] = q.copy()
if c in ("02", "03", "22"):
Expand All @@ -314,6 +327,10 @@ def get_qualifiers(self) -> list:
qualifiers[k] = q.copy()
if c == "25":
processing[k] = q.copy()
if c == "31":
associated_field[k] = q.copy()
if c == "33":
quality[k] = q.copy()
if c == "35":
monitoring[k] = q.copy()

Expand All @@ -322,7 +339,9 @@ def get_qualifiers(self) -> list:
"instrumentation": wigos_md,
"qualifiers": qualifiers,
"processing": processing,
"monitoring": monitoring
"monitoring": monitoring,
"quality": quality,
"associated_field": associated_field
}

return result
Expand All @@ -338,7 +357,6 @@ def get_location(self) -> Union[dict, None]:
"""

# first get latitude
#if not (("005001" in self.qualifiers["05"]) ^ ("005002" in self.qualifiers["05"])): # noqa
if "latitude" not in self.qualifiers["05"]:
LOGGER.warning("Invalid location in BUFR message, no latitude")
LOGGER.warning(self.qualifiers["05"])
Expand All @@ -352,8 +370,7 @@ def get_location(self) -> Union[dict, None]:
if "latitude_displacement" in self.qualifiers["05"]: # noqa
y_displacement = deepcopy(self.qualifiers["05"]["latitude_displacement"]) # noqa
latitude["value"] += y_displacement["value"]
latitude = round(latitude["value"],
latitude["attributes"]["scale"])
latitude = round(latitude["value"], latitude["attributes"]["scale"]) # noqa

# now get longitude
if "longitude" not in self.qualifiers["06"]:
Expand Down Expand Up @@ -1125,6 +1142,7 @@ def transform(data: bytes, guess_wsi: bool = False,
LOGGER.info(f"{nsubsets} subsets")

for idx in range(nsubsets):
reportIdentifier = uuid.uuid4()
if nsubsets > 1: # noqa this is only required if more than one subset (and will crash if only 1)
LOGGER.debug(f"Extracting subset {idx+1} of {nsubsets}")
codes_set(bufr_handle, "extractSubset", idx+1)
Expand Down Expand Up @@ -1154,7 +1172,7 @@ def transform(data: bytes, guess_wsi: bool = False,
if source_identifier in ("", None):
source_identifier = obs.get('geojson', {}).get('properties',{}).get('host', "")
obs['geojson']['id'] = f"{source_identifier}-{imsg}-{idx}-{id}"
obs['geojson']['properties']['parameter']['reportIdentifier'] = f"{source_identifier}-{imsg}-{idx}"
obs['geojson']['properties']['parameter']['reportIdentifier'] = f"{reportIdentifier}"
# now set prov data
prov = {
"prefix": {
Expand Down

0 comments on commit dbc0293

Please sign in to comment.