-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Getting OSError: Socket closed against new RabbitMQ server 3.6.2 (on 2.0.3) #101
Comments
I'll be looking into this, but it's now better to use |
Btw, I'm not seeing this here, even with |
Thanks. Sure, here's a snippet of how to reproduce the error. https://gist.github.com/jakeczyz/78ba1d5021b688a544b7cc8a22fefb65 It happens with both method calls. Note that it's not happening with an older RabbitMQ server. One other minor note is that the new server is actually a cluster. But, regular gets and publishing (and other cluster behavior) is fine. Any advice or pointers would be highly appreciated. |
After more investigation, I believe this problem was due to client TCP timeout in the load balancer used for my RabbitMQ cluster. I'll post more details after I can confirm my fix, but wanted to let you know right away so as not to waste your time. |
Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case). According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature. It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this. Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.last_heartbeat_{received,sent}: [2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5 As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results. The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :) |
I've encountered the same problem when celery (3.1.25) broker has heartbeats enabled and worker is idle (empty queue). The RabbitMQ eventually closes the connection because connection is idle and heartbeats aren't sent. |
python : 3.7 facing error
also this queue is not available post this error message: could anyone suggest any fix/ workaround to get rid of this error |
I had these warnings when using celery 4.3.0 with --pool=eventlet, RabbitMQ 3.6.5
|
Hello, I don't know if we should continue to write in this issue but our team is almost in the same position as the one described above. import logging
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection
def callback(body, message):
logging.debug("before")
sleep(300)
logging.debug("after")
message.ack()
if __name__ == "__main__":
is_running = True
queue = Queue("my_queue", routing_key="my_routing_key", no_declare=True)
with Connection("amqps://blabla", heartbeat=10) as conn:
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while is_running:
try:
conn.drain_events(timeout=1)
except timeout:
conn.heartbeat_check() I use this code to reproduce the error in aws environment (with aws classic load balancer). In this case i'm unable to ack the message in the I tried to use another process to manage heartbeat like this: import logging
from amqp.method_framing import frame_handler
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection
logging.basicConfig(level=logging.DEBUG)
def callback(body, message):
logging.debug("before")
sleep(300)
logging.debug("after")
message.ack()
def manage_heartbeat(conn: Connection):
while True:
sleep(1)
conn.heartbeat_check()
if __name__ == "__main__":
is_running = True
queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
with Connection("blabla", heartbeat=10) as conn:
try:
heartbeat_process = Process(target=manage_heartbeat, args=(conn,))
heartbeat_process.start()
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while is_running:
conn.drain_events()
finally:
heartbeat_process.close() In this way, an exception is raised:
Is there a way to send heartbeats in a different process with kombu/py-amqp? |
as celery use billiard you should not use multiprocessing i think. also what is your full setup and you are using newest versions? |
I'm not using celery. Only Kombu. import signal
import logging
import threading
from amqp.method_framing import frame_handler
from socket import timeout
from time import sleep
from kombu import Consumer, Queue, Connection
logging.basicConfig(level=logging.DEBUG)
def callback(body, message):
logging.debug("before")
sleep(60)
logging.debug("after")
message.ack()
def handle_heartbeat(conn, event):
while not event.is_set():
print("ping")
sleep(1)
if conn.connection:
conn.connection.send_heartbeat()
if __name__ == "__main__":
event = threading.Event()
queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
with Connection("amqps://blabla", heartbeat=10) as conn:
try:
t = threading.Thread(target=handle_heartbeat, args=(conn,event))
t.start()
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while not event.is_set():
try:
conn.drain_events(timeout=1)
except timeout:
conn.heartbeat_check()
except KeyboardInterrupt:
event.set()
finally:
t.join() |
Nope, it does not work. I have the same stacktrace as this issue in Celery (celery/celery#3773). |
recent related discussion celery/celery#6528 (comment) |
About 60 seconds after starting a channel.basic_consume followed by a channel.wait(), the connection closes with a Socket Closed error, as here:
Traceback (most recent call last):
File "./rmqtools.py", line 437, in run_self_test_loop
qh.channel[handle].wait(None)
File "/usr/local/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 436, in drain_events
return self.blocking_read(timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 440, in blocking_read
frame = self.transport.read_frame()
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 221, in read_frame
frame_header = read(7, True)
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 369, in _read
raise IOError('Socket closed')
OSError: Socket closed
On the server, the message logged is:
=INFO REPORT==== 12-Jul-2016::19:09:02 ===
accepting AMQP connection <0.9396.19> (172....1:19638 -> 172....5:5672)
=WARNING REPORT==== 12-Jul-2016::19:10:01 ===
closing AMQP connection <0.9396.19> (172...1:19638 -> 172....5:5672):
client unexpectedly closed TCP connection
Here's something that may be relevant: https://www.rabbitmq.com/heartbeats.html. However, the connection shows a negotiated heartbeat (Connection.heartbeat) of 0, and, when changing it to, say, 5 during connection creation, the same above error happens after 3x the chosen heartbeat (e.g. 15 sec.).
Relatedly, the channel.wait() method is no longer documented in the web docs. Is there a different method that should be called after registering a callback with basic_callback?
A colleague reports this on the 1.4.6 client too.
Thanks!
The text was updated successfully, but these errors were encountered: