Skip to content

Commit

Permalink
pass timeout through to _sock_exact_recv when _socket_pool does not h…
Browse files Browse the repository at this point in the history
…ave timeout attribute
  • Loading branch information
rjauquet committed Nov 8, 2023
1 parent 926846c commit 0851307
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
rcs = []

while True:
rc = self._wait_for_msg()
rc = self._wait_for_msg(timeout=timeout)
if rc is not None:
rcs.append(rc)
if time.monotonic() - stamp > timeout:
Expand All @@ -1013,11 +1013,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:

return rcs if rcs else None

def _wait_for_msg(self) -> Optional[int]:
def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
# pylint: disable = too-many-return-statements

"""Reads and processes network events.
Return the packet type or None if there is nothing to be received.
:param float timeout: return after this timeout, in seconds.
"""
# CPython socket module contains a timeout attribute
if hasattr(self._socket_pool, "timeout"):
Expand All @@ -1027,7 +1029,7 @@ def _wait_for_msg(self) -> Optional[int]:
return None
else: # socketpool, esp32spi
try:
res = self._sock_exact_recv(1)
res = self._sock_exact_recv(1, timeout=timeout)
except OSError as error:
if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
# raised by a socket timeout if 0 bytes were present
Expand Down Expand Up @@ -1093,7 +1095,7 @@ def _recv_len(self) -> int:
return n
sh += 7

def _sock_exact_recv(self, bufsize: int) -> bytearray:
def _sock_exact_recv(self, bufsize: int, timeout: Optional[float] = None) -> bytearray:
"""Reads _exact_ number of bytes from the connected socket. Will only return
bytearray with the exact number of bytes requested.
Expand All @@ -1104,6 +1106,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
bytes is returned or trigger a timeout exception.
:param int bufsize: number of bytes to receive
:param float timeout: timeout, in seconds. Defaults to keep_alive
:return: byte array
"""
stamp = time.monotonic()
Expand All @@ -1115,7 +1118,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
to_read = bufsize - recv_len
if to_read < 0:
raise MMQTTException(f"negative number of bytes to read: {to_read}")
read_timeout = self.keep_alive
read_timeout = timeout if timeout is not None else self.keep_alive
mv = mv[recv_len:]
while to_read > 0:
recv_len = self._sock.recv_into(mv, to_read)
Expand Down

0 comments on commit 0851307

Please sign in to comment.