Skip to content

Commit

Permalink
Merge pull request #26 from SciCatProject/wrdn_message
Browse files Browse the repository at this point in the history
Wrdn message
  • Loading branch information
YooSunYoung authored Jun 11, 2024
2 parents 817d614 + 83c68fa commit 0cd2600
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
18 changes: 18 additions & 0 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ def main() -> None:
# Receive messages
for message in wrdn_messages(consumer, logger):
logger.info("Processing message: %s", message)

# check if we have received a WRDN message
# if message is not a WRDN, we get None back
if message:
# extract nexus file name from message

# extract job id from message

# saves the WRDN message in a file

# instantiate a new process and runs backeground ingestor
# on the nexus file

pass

# check if we need to commit the individual message
if config.kafka_options.individual_message_commit:
consumer.commit(message=message)
10 changes: 8 additions & 2 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,11 @@ def wrdn_messages(
logger.error("Consumer error: %s", message.error())
yield None
else:
logger.info("Received message.")
yield _deserialise_wrdn(message.value(), logger)
# retrieve type of message
message_value = message.value()
message_type = message_value[4:8]
logger.info("Received message. Type : %s", message_type)
if message_type == b"wrdn":
yield _deserialise_wrdn(message_value, logger)
else:
yield None

0 comments on commit 0cd2600

Please sign in to comment.