-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Frames cease to be read while processing amqp_method. #37
Comments
There's no way to do that so message handlers must return within the heartbeat interval |
I have it working under Python 3 using asyncio (though it should be translatable to Py2/3 using regular threads). The snippet is below. Essentially, there are three 'co-routines' that run asynchronously in a loop:
The main problem I have with it so far is that I need to use dedicated connection instances for receiving and sending (maybe not a bad idea anyway and again maybe not necessary with some adjustments to the package internals rather than from the outside). Otherwise, it seems to generally work. def maybe_drain_with_timeout(self, conn, timeout):
ret = None
for channel_id, channel in conn.connection.channels.items():
if len(channel.method_queue) > 0:
ret = conn.drain_events(timeout=timeout)
return ret
@asyncio.coroutine
def drain_events(self, conn, timeout, ignore_timeouts):
while conn.connected and not self._amqp_closing:
# Yoink: https://github.com/celery/kombu/blob/master/kombu/common.py#L146-L185
try:
yield from self.loop.run_in_executor(None,\
self.maybe_drain_with_timeout, conn, timeout)
except socket.timeout:
if timeout and not ignore_timeouts: # pragma: no cover
raise
except socket.error: # pragma: no cover
pass
def readWithTimeout(self, conn, timeout):
return conn.connection.read_timeout(timeout)
# Continue to read frames from the socket while processing dense messages so that we receive heartbeats from the peer. Requeue everything (including queue system messages) on the local method queue so it can get picked up in drain_events.
@asyncio.coroutine
def read_frames(self, conn, timeout, ignore_timeouts):
while conn.connected and not self._amqp_closing:
try:
channel, method_sig, args, content = yield from\
self.loop.run_in_executor(None,\
self.readWithTimeout, conn, timeout)
conn.connection.channels[channel].method_queue.append(\
(method_sig, args, content),)
except socket.timeout:
if timeout and not ignore_timeouts:
raise
except socket.error:
self.err_out("socket error")
pass
# Keep the connection alive while processing dense messages
@asyncio.coroutine
def heartbeat_check(self, conn):
while conn.connected and not self._amqp_closing:
yield from self.loop.run_in_executor(None, conn.heartbeat_check)
# self.out("Ran heartbeat check")
yield from asyncio.sleep(self._heartbeat_interval)
def run(self):
try:
self.out("Consuming at {0}, on {1}".format(self._amqp_url,
self.AMQP_QUEUE))
self.loop = asyncio.get_event_loop()
with Connection(self._amqp_url,\
heartbeat=self._heartbeat_interval) as conn:
self.on_before_consume()
conn.ensure_connection()
self._amqp_connection = conn
channel = conn.channel()
channel.basic_qos(0,1,False)
queue = AmqpQueue(name=self.AMQP_QUEUE, channel=channel)
queue.declare()
with channel.Consumer(queue, callbacks=[self.on_amqp_message])\
as consumer:
consumer.qos(0, prefetch_count=1)
self.loop.set_default_executor(ThreadPoolExecutor(4))
asyncio.async(self.read_frames(conn, 30, True))
asyncio.async(self.drain_events(conn, 30, True))
asyncio.async(self.heartbeat_check(conn))
self.loop.run_forever()
except KeyboardInterrupt:
self._amqp_closing = True
loop.stop()
self.out("Shutting down") |
Right, using Async I/O would be the solution, we cannot use threads for this. I'm already working on async support so this is forthcoming, including eventual support for tulip/asyncio |
Btw, it does not really help for heartbeat frame processing/sending since the 'message processing' callbacks are still the problem. I.e. if you have a callback anywhere that blocks without yielding control back to the event loop. This is gradually happening in the celery worker, the only actions blocking at this point (above the amqp client layer) is message acknowledgments and sending result messages when a task process crashes (WorkerLostError, or task is revoked/terminated). I'm hoping to solve these for celery 3.2/3.3 and the improvements will be in kombu+pyamqp. |
My work is in the celery/py-amqp@callbacks branch, it's not finished and is cheating in a number of places but it's one hell of a medal-deserving refactoring job ;) |
While long-running message handlers (amqp_method) run, inbound frames need to continue to be processed in the background so that heartbeat frames (frame_type == 8) can continue to increment the bytes_recd attribute of the MethodReader and update the condition checks in connection.heartbeat_tick.
http://stackoverflow.com/questions/14817181/django-celery-connectionerror-too-many-heartbeats-missed
The text was updated successfully, but these errors were encountered: