Skip to content

Commit

Permalink
Issue1155 AMQP ack refactor (#1174)
Browse files Browse the repository at this point in the history
* - moth's ack now returns a boolean and passes off handling of ack
  failures to the caller (message.gather's ack method)
- message.gather's ack now deletes the ack_id from messages instead of
  the moth classes
- AMQP: refuses to attempt to ack a message if it was received on a
  different channel, connection or broker (preventing unecessary
connection tear down and re-establish)
- AMQP: when ack is attempted and fails because the connection to the
  broker is broken, don't loop and retry. After re-connecting, acking
that message would always fail, prevents a potential infinite loop.

* Add Ubuntu 24

* Move deletion of ack_id back into moth classes, because how to deal with the ack_id after a failed ack is protocol specific
  • Loading branch information
reidsunderland authored Aug 19, 2024
1 parent 86a205a commit 4148111
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow_amqp_consumer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
fail-fast: false
matrix:
which_test: [ static_flow, no_mirror, flakey_broker, dynamic_flow, restart_server ]
osver: [ "ubuntu-20.04", "ubuntu-22.04" ]
osver: [ "ubuntu-20.04", "ubuntu-22.04", "ubuntu-24.04" ]

runs-on: ${{ matrix.osver }}

Expand Down
3 changes: 2 additions & 1 deletion sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,12 @@ def __init__(self, props=None, is_subscriber=True) -> None:
logging.basicConfig(format=self.o['logFormat'],
level=getattr(logging, self.o['logLevel'].upper()))

def ack(self, message: sarracenia.Message ) -> None:
def ack(self, message: sarracenia.Message ) -> bool:
"""
tell broker that a given message has been received.
ack uses the 'ack_id' property to send an acknowledgement back to the broker.
If there's no 'ack_id' in the message, you should return True.
"""
logger.error("ack unimplemented")

Expand Down
75 changes: 53 additions & 22 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import amqp
import copy
import json
import uuid

import logging

Expand Down Expand Up @@ -142,12 +143,17 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
# keep track of which connection and channel the msg came from
msg['ack_id'] = { 'delivery_tag': raw_msg.delivery_info['delivery_tag'],
'channel_id': raw_msg.channel.channel_id,
'connection_id': self.connection_id,
'broker': self.broker,
}
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
if hasattr(self,'channel'):
self.channel.basic_ack(msg['ack_id'])
self.channel.basic_ack(msg['ack_id']['delivery_tag'])
logger.error('message acknowledged and discarded: %s' % msg)
msg = None
else:
Expand Down Expand Up @@ -188,6 +194,9 @@ def __init__(self, props, is_subscriber) -> None:
logger.setLevel(self.o['logLevel'].upper())

self.connection = None
self.connection_id = None
self.broker = None

def __connect(self, broker) -> bool:
"""
connect to broker.
Expand Down Expand Up @@ -223,6 +232,8 @@ def __connect(self, broker) -> bool:
login_method=broker.login_method,
virtual_host=vhost,
ssl=(broker.url.scheme[-1] == 's'))
self.connection_id = str(uuid.uuid4()) + ("_sub" if self.is_subscriber else "_pub")
self.broker = host + '/' + vhost
if hasattr(self.connection, 'connect'):
# check for amqp 1.3.3 and 1.4.9 because connect doesn't exist in those older versions
self.connection.connect()
Expand Down Expand Up @@ -315,7 +326,6 @@ def getSetup(self) -> None:
if message_strategy is stubborn, will loop here forever.
connect, declare queue, apply bindings.
"""

ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
Expand Down Expand Up @@ -574,41 +584,60 @@ def getNewMessage(self) -> sarracenia.Message:
def ack(self, m: sarracenia.Message) -> None:
"""
do what you need to acknowledge that processing of a message is done.
NOTE: AMQP delivery tags (we call them ack_id) are scoped per channel. "Deliveries must be
acknowledged on the same channel they were received on. Acknowledging on a different channel
will result in an "unknown delivery tag" protocol exception and close the channel."
"""
if not self.is_subscriber: #build_consumer
logger.error("getting from a publisher")
return
return False


# silent success. retry messages will not have an ack_id, and so will not require acknowledgement.
if not 'ack_id' in m:
#logger.warning( f"no ackid present" )
return

return True

# when the connection/channel/broker doesn't match the current, don't attempt to ack, it will fail
if (m['ack_id']['connection_id'] != self.connection_id
or m['ack_id']['channel_id'] != self.channel.channel_id
or m['ack_id']['broker'] != self.broker):
logger.warning(f"failed for {m['ack_id']}. Does not match the current connection {self.connection_id}," +
f" channel {self.channel.channel_id} or broker {self.broker}")
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

#logger.info( f"acknowledging {m['ack_id']}" )
ebo = 1
while True:
try:
if hasattr(self, 'channel'):
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
# Break loop if no exceptions encountered
return

self.channel.basic_ack(m['ack_id']['delivery_tag'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return True
else:
logger.warning(f"Can't ack {m['ack_id']}, don't have a channel")
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)

# Cleanly close partially broken connection and restablish
self.close()
self.getSetup()

if ebo < 60: ebo *= 2

logger.info(
"Sleeping {} seconds before re-trying ack...".format(ebo))
if type(err) == BrokenPipeError or type(err) == ConnectionResetError:
# Cleanly close partially broken connection
self.close()
# No point in trying to ack again if the connection is broken
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

if ebo < 60:
ebo *= 2
logger.info("Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)
# TODO maybe implement message strategy stubborn here and give up after retrying?

def putNewMessage(self,
message: sarracenia.Message,
Expand Down Expand Up @@ -770,3 +799,5 @@ def close(self) -> None:
# FIXME toclose not useful as we don't close channels anymore
self.metricsDisconnect()
self.connection = None
self.connection_id = None
self.broker = None
1 change: 1 addition & 0 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ def ack(self, m: sarracenia.Message ) -> None:
self.client.ack( m['ack_id'], m['qos'] )
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return True

def putNewMessage(self,
message: sarracenia.Message,
Expand Down

0 comments on commit 4148111

Please sign in to comment.