Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frames cease to be read while processing amqp_method. #37

Open
w3iBStime opened this issue Apr 10, 2014 · 5 comments
Open

Frames cease to be read while processing amqp_method. #37

w3iBStime opened this issue Apr 10, 2014 · 5 comments

Comments

@w3iBStime
Copy link

While long-running message handlers (amqp_method) run, inbound frames need to continue to be processed in the background so that heartbeat frames (frame_type == 8) can continue to increment the bytes_recd attribute of the MethodReader and update the condition checks in connection.heartbeat_tick.

http://stackoverflow.com/questions/14817181/django-celery-connectionerror-too-many-heartbeats-missed

@ask
Copy link
Contributor

ask commented Apr 15, 2014

There's no way to do that so message handlers must return within the heartbeat interval

@w3iBStime
Copy link
Author

I have it working under Python 3 using asyncio (though it should be translatable to Py2/3 using regular threads). The snippet is below.

Essentially, there are three 'co-routines' that run asynchronously in a loop:

  1. A loop to check the socket for new frames and build them up as they arrive into messages. Essentially, it calls connection.read_timeout() repeatedly. As fully-formed messages arrive, they're enqueued in local memory under connection.channels[channel_id_from_message].method_queue .
  2. A loop to drain_events from connection.channels[channel_id_from_message].method_queue (right now, acting from the outside, it needs to check that at least one method_queue is populated otherwise it'll read the socket and conflict with the previous loop though this could be done a little more elegantly from inside).
  3. A loop to check and send heart beats

The main problem I have with it so far is that I need to use dedicated connection instances for receiving and sending (maybe not a bad idea anyway and again maybe not necessary with some adjustments to the package internals rather than from the outside). Otherwise, it seems to generally work.

    def maybe_drain_with_timeout(self, conn, timeout):
        ret = None
        for channel_id, channel in conn.connection.channels.items():
            if len(channel.method_queue) > 0:
                ret = conn.drain_events(timeout=timeout)
        return ret

    @asyncio.coroutine
    def drain_events(self, conn, timeout, ignore_timeouts):
        while conn.connected and not self._amqp_closing:
            # Yoink: https://github.com/celery/kombu/blob/master/kombu/common.py#L146-L185
            try:
                yield from self.loop.run_in_executor(None,\
                    self.maybe_drain_with_timeout, conn, timeout)
            except socket.timeout:
                if timeout and not ignore_timeouts: # pragma: no cover
                    raise
            except socket.error: # pragma: no cover
                pass

    def readWithTimeout(self, conn, timeout):
        return conn.connection.read_timeout(timeout)

    # Continue to read frames from the socket while processing dense messages so that we receive heartbeats from the peer. Requeue everything (including queue system messages) on the local method queue so it can get picked up in drain_events.
    @asyncio.coroutine
    def read_frames(self, conn, timeout, ignore_timeouts):
        while conn.connected and not self._amqp_closing:
            try:
                channel, method_sig, args, content = yield from\
                    self.loop.run_in_executor(None,\
                    self.readWithTimeout, conn, timeout)

                conn.connection.channels[channel].method_queue.append(\
                    (method_sig, args, content),)
            except socket.timeout:
                if timeout and not ignore_timeouts:
                    raise
            except socket.error:
                self.err_out("socket error")
                pass

    # Keep the connection alive while processing dense messages
    @asyncio.coroutine
    def heartbeat_check(self, conn):
        while conn.connected and not self._amqp_closing:
            yield from self.loop.run_in_executor(None, conn.heartbeat_check)
            # self.out("Ran heartbeat check")
            yield from asyncio.sleep(self._heartbeat_interval)

    def run(self):
        try:
            self.out("Consuming at {0}, on {1}".format(self._amqp_url,
                self.AMQP_QUEUE))
            self.loop = asyncio.get_event_loop()
            with Connection(self._amqp_url,\
                    heartbeat=self._heartbeat_interval) as conn:
                self.on_before_consume()
                conn.ensure_connection()
                self._amqp_connection = conn
                channel = conn.channel()
                channel.basic_qos(0,1,False)
                queue = AmqpQueue(name=self.AMQP_QUEUE, channel=channel)
                queue.declare()
                with channel.Consumer(queue, callbacks=[self.on_amqp_message])\
                        as consumer:
                    consumer.qos(0, prefetch_count=1)
                    self.loop.set_default_executor(ThreadPoolExecutor(4))
                    asyncio.async(self.read_frames(conn, 30, True))
                    asyncio.async(self.drain_events(conn, 30, True))
                    asyncio.async(self.heartbeat_check(conn))
                    self.loop.run_forever()
        except KeyboardInterrupt:
            self._amqp_closing = True
            loop.stop()
            self.out("Shutting down")

@ask
Copy link
Contributor

ask commented May 6, 2014

Right, using Async I/O would be the solution, we cannot use threads for this.

I'm already working on async support so this is forthcoming, including eventual support for tulip/asyncio

@ask
Copy link
Contributor

ask commented May 6, 2014

Btw, it does not really help for heartbeat frame processing/sending since the 'message processing' callbacks are still the problem. I.e. if you have a callback anywhere that blocks without yielding control back to the event loop. This is gradually happening in the celery worker, the only actions blocking at this point (above the amqp client layer) is message acknowledgments and sending result messages when a task process crashes (WorkerLostError, or task is revoked/terminated). I'm hoping to solve these for celery 3.2/3.3 and the improvements will be in kombu+pyamqp.

@ask
Copy link
Contributor

ask commented Jun 20, 2014

My work is in the celery/py-amqp@callbacks branch, it's not finished and is cheating in a number of places but it's one hell of a medal-deserving refactoring job ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants