Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

None val end bugfix. #96

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ingester/datalake_ingester/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click
from datalake.common.conf import load_config
from .ingester import Ingester
from .log import configure_logging


DEFAULT_CONFIG = '/etc/datalake-ingester.env'
Expand All @@ -23,6 +24,7 @@
@click.pass_context
def cli(ctx, **kwargs):
conf = kwargs.pop('config')
configure_logging()
load_config(conf, DEFAULT_CONFIG, **kwargs)


Expand All @@ -35,3 +37,4 @@ def _subcommand_or_fail(ctx):
def listen():
i = Ingester.from_config()
i.listen()

1 change: 0 additions & 1 deletion ingester/datalake_ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
UnsupportedS3Event
]


class IngesterReport(dict):

def start(self):
Expand Down
14 changes: 11 additions & 3 deletions ingester/datalake_ingester/log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import sentry_sdk

_log_configured = False

conf = {
'version': 1,
Expand All @@ -24,8 +25,15 @@
}
}

def configure_logging():
global _log_configured
if not _log_configured:
sentry_sdk.init()
logging.config.dictConfig(conf)
log = logging.getLogger()
level = logging.INFO
log.setLevel(level)
log.info(f"Logging initialized with provided conf {conf}.")
_log_configured = True

sentry_sdk.init()


logging.config.dictConfig(conf)
13 changes: 9 additions & 4 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class DynamoDBStorage(object):
def __init__(self, table_name=None, latest_table_name=None, connection=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')
self._prepare_connection(connection)


@classmethod
def from_config(cls):
Expand All @@ -40,6 +41,7 @@ def from_config(cls):
return cls(table_name, latest_table_name)

def _prepare_connection(self, connection):
self.logger.info("Preparing connection...")
region = os.environ.get('AWS_REGION')
if connection:
self._connection = connection
Expand Down Expand Up @@ -89,6 +91,11 @@ def store_latest(self, record):
else:
work_id_value = {'S': str(record['metadata']['work_id'])}

if record['metadata']['end'] is None:
end_time_value = {'NULL': True}
else:
end_time_value = {'N': str(record['metadata']['end'])}

record = {
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
'time_index_key': {"S": record['time_index_key']},
Expand All @@ -98,9 +105,7 @@ def store_latest(self, record):
'start': {
'N': str(record['metadata']['start'])
},
'end': {
'N': str(record['metadata']['end'])
},
'end': end_time_value,
'id': {
'S': str(record['metadata']['id'])
},
Expand Down
Loading