Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If a malformed message gets into Rabbit, Celery worker fails to start #277

Open
djsmith42 opened this issue Oct 21, 2015 · 9 comments
Open
Assignees

Comments

@djsmith42
Copy link

If a publisher manages to publish a corrupted, non-UTF8 decodable message into Rabbit, the Celery worker will fail to start. I propose that it should instead leave the message unacknowledged, log the error, and continue working on other messages.

Here's an example stack trace when this happens:

[2015-10-17 23:34:57,419: ERROR/MainProcess] Unrecoverable error: UnicodeDecodeError('utf8', 'qr=-1&qf=[date:between:2015-09-01,2015-09-Traceback (most recent call last):
  File "venv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "venv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 70, in asynloop
    next(loop)
  File "venv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 272, in create_loop
    item()
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 42, in __call__
    self.set_error_state(exc)
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 39, in __call__
    **dict(self.kwargs, **kwargs) if self.kwargs else kwargs
  File "venv/local/lib/python2.7/site-packages/kombu/transport/base.py", line 144, in _read
    drain_events(timeout=0)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 302, in drain_events
    chanmap, None, timeout=timeout,
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 365, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 336, in read_timeout
    return self.method_reader.read_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 126, in _next_method
    self._process_content_header(channel, payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 154, in _process_content_header
    partial.add_header(payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 54, in add_header
    self.msg._load_properties(payload[12:])
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 476, in _load_properties
    d[key] = getattr(r, 'read_' + proptype)()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 191, in read_item
    val = self.read_table()  # recurse
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 150, in read_item
    val = self.read_longstr()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 131, in read_longstr
    return self.input.read(slen).decode('utf-8')
  File "venv/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 80: invalid start byte

Versions:

Celery: 3.1.16
Kombu: 3.0.23

Using pyamqp://

@ask
Copy link
Contributor

ask commented Dec 11, 2015

It cannot leave the message unacknowledged, as that means it will reduce the prefetch limit.

The only other way it could deal with it is to log the event and discard the message, as requeueing it will cause a loop.

@djsmith42
Copy link
Author

@ask That works for me.

@djsmith42
Copy link
Author

Correction! This bug is actually in amqp/serialization.py (separate project). The version I have (1.4.9) assumes all headers are UTF-8 encoded with this code:

    def read_longstr(self):
        """Read a string that's up to 2**32 bytes.

        The encoding isn't specified in the AMQP spec, so
        assume it's utf-8

        """
        self.bitcount = self.bits = 0
        slen = unpack('>I', self.input.read(4))[0]
        return self.input.read(slen).decode('utf-8')

But that is not a safe assumption, as header values are totally unconstrained. They could be binary data, for example.

Which is why the UnicodeDecodeError is thrown. I see that the current amqp/serialization.py code is very different in master. Does the current code not have this limitation?

@djsmith42
Copy link
Author

Here's a minimal snippet that can repro this crash and put your Celery worker into an unrecoverable loop:

my_task.apply_async(args=['test'], headers={'foo': '\x8d'})

The '\x8d is not UTF-8, and thus when the worker tries to read the header table, it raise UnicodeDecodeError, which will take down your Celery worker and prevent it from coming back up:

[2016-01-23 04:15:51,355: ERROR/MainProcess celery.worker] Unrecoverable error: UnicodeDecodeError('utf8', '\x8d', 0, 1, 'invalid start byte')

I would love to submit a PR to fix this, but I'm not super clear on what version of py-amqp to work on. What do you recommend?

Once I know which version to work on, I'm thinking about modifying the py-amqp serialization module to ignore header values that cannot be decoded with UTF-8.

djsmith42 referenced this issue in djsmith42/py-amqp Jan 23, 2016
Prior to this commit, a non-UTF8 encoded header string would cause an un
caught UnicodeDecodeError, which would cause an unrecoverable Celery
error. Now we log non-UTF8 header strings (names and values), and
continue processing.

Closes https://github.com/celery/celery/issues/2873
@djsmith42
Copy link
Author

Never mind, I found the 1.4 branch. Here's my PR: #78

@danhenriquesc
Copy link

Hi! I'm having the same issue with Celery 4.1.0 and kombu 4.1.0. Any idea?

@auvipy
Copy link
Member

auvipy commented Jan 15, 2018

can you cleanly apply the patch on pyamqp?

@auvipy auvipy closed this as completed Aug 12, 2018
@vazir
Copy link

vazir commented Oct 9, 2018

The bug is still present, it does not require a feedback but a simple fix...

@auvipy auvipy reopened this Oct 13, 2018
@auvipy auvipy self-assigned this Oct 13, 2018
@auvipy
Copy link
Member

auvipy commented Oct 13, 2018

could you tell which versions of celery you are using and facing this issue?

@auvipy auvipy closed this as completed May 30, 2019
@auvipy auvipy transferred this issue from celery/celery May 30, 2019
@auvipy auvipy reopened this Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants