You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It seems like the retry-functionality in the PubSub-client is not working correctly.
Reproduce Error
To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 62, in call_with_retry
return do()
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 860, in <lambda>
lambda: command(*args, **kwargs),
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 877, in try_read
if not conn.can_read(timeout=timeout):
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 538, in can_read
return self._parser.can_read(timeout)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 80, in can_read
return self.read_from_socket(timeout=timeout, raise_on_timeout=False)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 91, in read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
valkey.exceptions.ConnectionError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 560, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 129, in read_response
self.read_from_socket()
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 89, in read_from_socket
bufflen = self._sock.recv_into(self._buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1210, in run
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1072, in get_message
response = self.parse_response(block=(timeout is None), timeout=timeout)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 883, in parse_response
response = self._execute(conn, try_read)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 859, in _execute
return conn.retry.call_with_retry(
File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 65, in call_with_retry
fail(error)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 861, in <lambda>
lambda error: self._disconnect_raise_connect(conn, error),
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 849, in _disconnect_raise_connect
conn.connect()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 328, in connect
self.on_connect()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 430, in on_connect
self.read_response()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 568, in read_response
raise ConnectionError(
valkey.exceptions.ConnectionError: Error while reading from localhost:6379 : (104, 'Connection reset by peer')
Possible Cause
From my debug and the traces, it seems that the issue is related to the connect method since the second part of the error trace occurs outside the retry scope.
Quick Patch
A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:
defconnect(self):
"""Connects to the Valkey server if not already connected."""ifself._sock:
returndef_full_connect():
self._sock=self._connect()
ifself.valkey_connect_funcisNone:
# Use the default on_connect functionself.on_connect()
else:
# Use the passed function valkey_connect_funcself.valkey_connect_func(self)
# run any user callbacks. right now the only internal callback# is for pubsub channel/pattern resubscription# first, remove any dead weakrefsself._connect_callbacks= [refforrefinself._connect_callbacksifref()]
forrefinself._connect_callbacks:
callback=ref()
ifcallback:
callback(self)
try:
sock=self.retry.call_with_retry(
lambda: _full_connect(),
lambdaerror: self.disconnect(error)
)
exceptsocket.timeout:
raiseTimeoutError("Timeout connecting to server")
exceptOSErrorase:
raiseConnectionError(self._error_message(e))
Thanks in advance for any help 🙂
The text was updated successfully, but these errors were encountered:
Hey y'all!
It seems like the retry-functionality in the PubSub-client is not working correctly.
Reproduce Error
To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:
Error Trace
Possible Cause
From my debug and the traces, it seems that the issue is related to the
connect
method since the second part of the error trace occurs outside the retry scope.Quick Patch
A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:
Thanks in advance for any help 🙂
The text was updated successfully, but these errors were encountered: