You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
if we cancel the task during the second line, a socket will be leaked unclosed (port 34496 is the AMQP socket we're connecting to).
[2022-04-12T09:49:48.925Z] > warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
[2022-04-12T09:49:48.925Z] E pytest.PytestUnraisableExceptionWarning: Exception ignored in: <socket.socket fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
[2022-04-12T09:49:48.925Z] E
[2022-04-12T09:49:48.925Z] E Traceback (most recent call last):
[2022-04-12T09:49:48.925Z] E File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiohttp/test_utils.py", line 560, in teardown_test_loop
[2022-04-12T09:49:48.925Z] E gc.collect()
[2022-04-12T09:49:48.925Z] E ResourceWarning: unclosed <socket.socket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 60574), raddr=('127.0.0.1', 34496)>
[2022-04-12T09:49:48.925Z]
Tracing through, the problem appears to be that in aiormq.Connection.channel, the call to await channel.open can block and later be cancelled. This only seems to reproduce on our test suite when the test is very quick, and we can see from the debug logs:
[2022-04-12T09:49:43.109Z] DEBUG aio_pika.connection:connection.py:201 Creating AMQP channel for connection: <RobustConnection: "amqp://guest:******@127.0.0.1:34496" 0 channels>
[2022-04-12T09:49:43.109Z] DEBUG aio_pika.connection:connection.py:210 Channel created: <RobustChannel #Not initialized channel "None">
[2022-04-12T09:49:43.109Z] -------------------------------- live log call ---------------------------------
2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x7f938da8dfc0>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Channel.Open object at 0x7f938da8dfc0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
<some test output>
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:484 Received frame <Channel.OpenOk object at 0x7f938da92400> in channel #1 weight=16 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x7f938da8d540>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Confirm.Select object at 0x7f938da8d540> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
<more test output>
[2022-04-12T09:49:43.109Z] PASSED [ 16%]
[2022-04-12T09:49:43.109Z] ------------------------------ live log teardown -------------------------------
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:484 Received frame <Confirm.SelectOk object at 0x7f938dac6e80> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] <cancellation of the above task here>
[2022-04-12T09:49:43.109Z] <await the task>
[2022-04-12T09:49:43.109Z] WARNING aiormq.channel:channel.py:179 Closing channel <Channel: "1" at 0x7f938d9672c0> because RPC call <Confirm.Select object at 0x7f938da8d540> cancelled
<we close the whole connection>
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:625 Closing connection <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0> cause: <class 'asyncio.exceptions.CancelledError'>
[2022-04-12T09:49:43.109Z] DEBUG aio_pika.robust_connection:robust_connection.py:68 Closing AMQP connection <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938d947680>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938d947680> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.110Z] DEBUG aiormq.connection:connection.py:594 Writer exited for <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.110Z] DEBUG aiormq.connection:connection.py:463 Reader exited for <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
In contrast, the following flow for basically the same test doesn't leak a socket
[2022-04-12T09:49:44.053Z] DEBUG aio_pika.connection:connection.py:201 Creating AMQP channel for connection: <RobustConnection: "amqp://guest:******@127.0.0.1:34496" 0 channels>
[2022-04-12T09:49:44.053Z] DEBUG aio_pika.connection:connection.py:210 Channel created: <RobustChannel #Not initialized channel "None">
[2022-04-12T09:49:44.053Z] -------------------------------- live log call ---------------------------------
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x7f938dc0b2c0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Channel.Open object at 0x7f938dc0b2c0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
<some test output>
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:484 Received frame <Channel.OpenOk object at 0x7f938d855780> in channel #1 weight=16 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x7f938dc1a8c0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Confirm.Select object at 0x7f938dc1a8c0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:484 Received frame <Confirm.SelectOk object at 0x7f938d8e5280> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x7f938d8a4db0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Basic.Qos object at 0x7f938d8a4db0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:484 Received frame <Basic.QosOk object at 0x7f938d85c580> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG aio_pika.queue:queue.py:91 Declaring queue: <RobustQueue(PSResponses): auto_delete=False, durable=True, exclusive=False, arguments=None
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x7f938d8a8c10>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Queue.Declare object at 0x7f938d8a8c10> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
<more test output>
[2022-04-12T09:49:44.054Z] PASSED [ 33%]
[2022-04-12T09:49:44.054Z] ------------------------------ live log teardown -------------------------------
[2022-04-12T09:49:44.054Z] <cancellation of the above task here>
[2022-04-12T09:49:44.054Z] <await the task>
[2022-04-12T09:49:44.054Z] WARNING aiormq.channel:channel.py:179 Closing channel <Channel: "1" at 0x7f938dc01590> because RPC call <Queue.Declare object at 0x7f938d8a8c10> cancelled
[2022-04-12T09:49:44.054Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938dc0a950>], drain_future=None)
[2022-04-12T09:49:44.054Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938dc0a950> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG aiormq.connection:connection.py:484 Received frame <Queue.DeclareOk object at 0x7f938dc01860> in channel #1 weight=32 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938dbff400>], drain_future=None)
[2022-04-12T09:49:44.315Z] DEBUG aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938dbff400> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG aio_pika.robust_channel:robust_channel.py:107 Robust channel <RobustChannel #Not initialized channel "None"> has been closed.
[2022-04-12T09:49:44.315Z] DEBUG aiormq.connection:connection.py:484 Received frame <Channel.CloseOk object at 0x7f938d85d5e0> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
In the failing case, async with self.connection.channel() as channel: never completes, but in the success case, you can see we got through channel.set_qos and channel.declare_queue as well.
The call-stack for the failure case:
[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 190, in rpc
[2022-04-12T09:03:08.983Z] raise e
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 169, in rpc
[2022-04-12T09:03:08.983Z] result = await countdown(self.rpc_frames.get())
[2022-04-12T09:03:08.983Z] File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
[2022-04-12T09:03:08.983Z] await getter
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError
[2022-04-12T09:03:08.983Z]
[2022-04-12T09:03:08.983Z] During handling of the above exception, another exception occurred:
[2022-04-12T09:03:08.983Z]
[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/abc.py", line 40, in __inner
[2022-04-12T09:03:08.983Z] return await self.task
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError
[2022-04-12T09:03:08.983Z]
[2022-04-12T09:03:08.983Z] The above exception was the direct cause of the following exception:
[2022-04-12T09:03:08.983Z]
[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/src/webservice/status_update_api.py", line 33, in message_dispatcher
[2022-04-12T09:03:08.983Z] async for message in app["rmq"].message_queue():
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/lib/shared/src/ps/shared/rabbitmq.py", line 68, in message_queue
[2022-04-12T09:03:08.983Z] async with self.connection.channel() as channel:
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 142, in __aenter__
[2022-04-12T09:03:08.983Z] await self.initialize()
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 213, in initialize
[2022-04-12T09:03:08.983Z] channel: aiormq.abc.AbstractChannel = await self._create_channel(
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 200, in _create_channel
[2022-04-12T09:03:08.983Z] return await self.connection.connection.channel(
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/connection.py", line 704, in channel
[2022-04-12T09:03:08.983Z] await channel.open(timeout=timeout)
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 198, in open
[2022-04-12T09:03:08.983Z] await self.rpc(spec.Confirm.Select())
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/base.py", line 162, in wrap
[2022-04-12T09:03:08.983Z] return await self.create_task(func(self, *args, **kwargs))
[2022-04-12T09:03:08.983Z] File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/abc.py", line 42, in __inner
[2022-04-12T09:03:08.983Z] raise self.exception from e
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError
My suspicion is that when the call to channel.open fails in Connection.channel, the channel is not stored in self.channels, but I'm not sure why that leads to a leaked socket.
The text was updated successfully, but these errors were encountered:
Tracing a leaked socket in our unit tests, I have observed that in a task of the form
if we cancel the task during the second line, a socket will be leaked unclosed (port 34496 is the AMQP socket we're connecting to).
Tracing through, the problem appears to be that in
aiormq.Connection.channel
, the call toawait channel.open
can block and later be cancelled. This only seems to reproduce on our test suite when the test is very quick, and we can see from the debug logs:In contrast, the following flow for basically the same test doesn't leak a socket
In the failing case,
async with self.connection.channel() as channel:
never completes, but in the success case, you can see we got throughchannel.set_qos
andchannel.declare_queue
as well.The call-stack for the failure case:
My suspicion is that when the call to
channel.open
fails inConnection.channel
, the channel is not stored inself.channels
, but I'm not sure why that leads to a leaked socket.The text was updated successfully, but these errors were encountered: