Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#ack following #requeue causes exit #168

Closed
wants to merge 1 commit into from

Conversation

erithmetic
Copy link
Contributor

I have a hutch worker similar to the following:

class MyConsumer
  queue 'foo'

  def process(message)
    begin
      # ...
    rescue => e
      if e.is_a?(EphemeralError)
        requeue!
      else
        raise e
      end
    end
  end
end

This causes the process to shut down gracefully. Apparently, the requeue! sends off a @channel.reject(delivery_tag, true) and then exits the Consumer process method. Within the Worker#handle_message method, it then calls broker.ack(delivery_info.delivery_tag). After this point the channel is closed, which seems to indicate that rabbitmq drops the connection when an ack follows a reject.

I created a test to show that ack is called after a reject.

We need some way to prevent this. A couple of ideas:

  1. Add a property to Hutch::Message that indicates if there was already some ack/nack/reject action taken on the message within the process method. If so, do not attempt ack/nack from Worker#handle_message.

  2. Remove the nack from the rescue block of Worker#handle_message. Instead, rely on a default ErrorHandler to do the nack and allow users to specify their own error handler that could examine the error and decide wither to nack/reject/requeue. This might have to cause a breaking change for anyone who manually sets Hutch::Config.set :error_handlers, [..]

@michaelklishin
Copy link
Member

You cannot double ack/reject a delivery. Have you checked RabbitMQ logs?

@teodor-pripoae
Copy link
Contributor

@michaelklishin I think he's saying that there is no method for requeuing a message. He is calling the requeue message and then worker acks the message.

begin
  message = Message.new(delivery_info, properties, payload)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  broker.ack(delivery_info.delivery_tag)
rescue StandardError => ex
  broker.nack(delivery_info.delivery_tag)
  handle_error(properties.message_id, payload, consumer, ex)
end

@michaelklishin
Copy link
Member

This is directly relevant to #165 then.

@teodor-pripoae submitted a proposed solution in #169. If we merged it, #165 would be hairier. I am not sure this is really an improvement and how relevant it would be if we introduce user-driven acknowledgements.

I consider this a duplicate of #165. What the user really wants here is the ability to ack/nack deliveries explicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants