Skip to content

Commit

Permalink
Merge pull request #870 from MetPX/issue860
Browse files Browse the repository at this point in the history
Issue860 fixing sourceFromExchange
  • Loading branch information
petersilva authored Dec 18, 2023
2 parents 4fb0f6f + aa4f66f commit 92a6a6d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
11 changes: 11 additions & 0 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __repr__(self) -> str:
'report': False,
'retryEmptyBeforeExit': False,
'sanity_log_dead': 9999,
'sourceFromExchange': False,
'sundew_compat_regex_first_match_is_zero': False,
'sourceFromExchange': False,
'v2compatRenameDoublePost': False,
Expand Down Expand Up @@ -1415,6 +1416,7 @@ def parse_file(self, cfg, component=None):
logger.debug( f'found {cfgfilepath}')

lineno=0
saved_lineno=0
self.files.append(cfgfilepath)

for l in open(cfgfilepath, "r").readlines():
Expand Down Expand Up @@ -1792,6 +1794,15 @@ def finalize(self, component=None, config=None):

if self.sourceFromExchange and self.exchange:
self.source = self.get_source_from_exchange(self.exchange)
if not self.source and self.post_exchange:
self.source = self.get_source_from_exchange(self.post_exchange)

if not hasattr(self,'source') and hasattr(self, 'post_broker') and \
hasattr(self.post_broker,'url') and self.post_broker.url.username:
self.source = self.post_broker.url.username
if not hasattr(self,'source') and hasattr(self, 'broker') and \
hasattr(self.broker,'url') and self.broker.url.username:
self.source = self.broker.url.username

if self.broker and self.broker.url and self.broker.url.username:
self._resolve_exchange()
Expand Down
6 changes: 6 additions & 0 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
topic = raw_msg.delivery_info['routing_key'].replace(
'%23', '#').replace('%22', '*')
msg['exchange'] = raw_msg.delivery_info['exchange']
if self.o['sourceFromExchange']:
source = self.o.get_source_from_exchange(msg['exchange'])
if source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

msg['subtopic'] = topic.split('.')[len(self.o['topicPrefix']):]
msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
Expand Down

0 comments on commit 92a6a6d

Please sign in to comment.