Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug in retry-handling in the PubSub #169

Open
ManelCoutinhoSensei opened this issue Jan 31, 2025 · 0 comments
Open

Bug in retry-handling in the PubSub #169

ManelCoutinhoSensei opened this issue Jan 31, 2025 · 0 comments
Assignees

Comments

@ManelCoutinhoSensei
Copy link

Hey y'all!

It seems like the retry-functionality in the PubSub-client is not working correctly.

Reproduce Error

To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:

def event_handler(msg):
    print(msg)

vk_pub_sub.psubscribe(**{"test": event_handler})

def exception_handler(ex, pubsub, thread):
    print(traceback.format_exc())

vk_pub_sub.run_in_thread(sleep_time=0.1, exception_handler=exception_handler)

# Close VK connection

Error Trace

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 62, in call_with_retry
    return do()
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 860, in <lambda>
    lambda: command(*args, **kwargs),
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 877, in try_read
    if not conn.can_read(timeout=timeout):
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 538, in can_read
    return self._parser.can_read(timeout)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 80, in can_read
    return self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 91, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
valkey.exceptions.ConnectionError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 560, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 129, in read_response
    self.read_from_socket()
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 89, in read_from_socket
    bufflen = self._sock.recv_into(self._buffer)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1210, in run
    pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1072, in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 883, in parse_response
    response = self._execute(conn, try_read)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 859, in _execute
    return conn.retry.call_with_retry(
  File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 65, in call_with_retry
    fail(error)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 861, in <lambda>
    lambda error: self._disconnect_raise_connect(conn, error),
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 849, in _disconnect_raise_connect
    conn.connect()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 328, in connect
    self.on_connect()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 430, in on_connect
    self.read_response()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 568, in read_response
    raise ConnectionError(
valkey.exceptions.ConnectionError: Error while reading from localhost:6379 : (104, 'Connection reset by peer')

Possible Cause

From my debug and the traces, it seems that the issue is related to the connect method since the second part of the error trace occurs outside the retry scope.

Quick Patch

A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:

def connect(self):
    """Connects to the Valkey server if not already connected."""
    if self._sock:
        return

    def _full_connect():
        self._sock = self._connect()
        if self.valkey_connect_func is None:
            # Use the default on_connect function
            self.on_connect()
        else:
            # Use the passed function valkey_connect_func
            self.valkey_connect_func(self)

        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        # first, remove any dead weakrefs
        self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()]
        for ref in self._connect_callbacks:
            callback = ref()
            if callback:
                callback(self)

    try:
        sock = self.retry.call_with_retry(
            lambda: _full_connect(),
            lambda error: self.disconnect(error)
        )
    except socket.timeout:
        raise TimeoutError("Timeout connecting to server")
    except OSError as e:
        raise ConnectionError(self._error_message(e))

Thanks in advance for any help 🙂

@mkmkme mkmkme self-assigned this Feb 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants