Skip to content

Commit

Permalink
Merge pull request #795 from eclipse/fix_is_connected
Browse files Browse the repository at this point in the history
Fix is_connected property when not using loop_forever
  • Loading branch information
PierreF authored Jan 20, 2024
2 parents 2c73adb + a66ef8f commit fb42d41
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 4 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ v2.0.0 - 2023-xx-xx
- Fix loading too weak TLS CA file but setting allowed ciphers before loading CA. Closes #676.
- Allow to manually ack QoS > 0 messages. Closes #753 & #348.
- Improve tests & linters. Modernize build (drop setup.py, use pyproject.toml)
- Fix is_connected property to correctly return False when connection is lost
and loop_start/loop_forever isn't used. Closes #525.



v1.6.1 - 2021-10-21
===================
Expand Down
13 changes: 13 additions & 0 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,10 +1290,19 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
socklist = select.select(rlist, wlist, [], timeout)
except TypeError:
# Socket isn't correct type, in likelihood connection is lost
# ... or we called disconnect(). In that case the socket will
# be closed but some loop (like loop_forever) will continue to
# call _loop(). We still want to break that loop by returning an
# rc != MQTT_ERR_SUCCESS and we don't want state to change from
# mqtt_cs_disconnecting.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except Exception:
# Note that KeyboardInterrupt, etc. can still terminate since they
Expand Down Expand Up @@ -1768,6 +1777,7 @@ def loop_misc(self) -> MQTTErrorCode:
if self._state == mqtt_cs_disconnecting:
rc = MQTTErrorCode.MQTT_ERR_SUCCESS
else:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE

self._do_on_disconnect(rc)
Expand Down Expand Up @@ -2580,6 +2590,9 @@ def _loop_rc_handle(

self._do_on_disconnect(rc, properties)

if rc == MQTT_ERR_CONN_LOST:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST

return rc

def _packet_read(self) -> MQTTErrorCode:
Expand Down
1 change: 1 addition & 0 deletions src/paho/mqtt/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ConnectionState(enum.IntEnum):
MQTT_CS_CONNECTED = 1
MQTT_CS_DISCONNECTING = 2
MQTT_CS_CONNECT_ASYNC = 3
MQTT_CS_CONNECTION_LOST = 4


class MessageState(enum.IntEnum):
Expand Down
116 changes: 112 additions & 4 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import threading
import time
import unicodedata

import paho.mqtt.client as client
import pytest
from paho.mqtt.enums import MQTTErrorCode, MQTTProtocolVersion
from paho.mqtt.reasoncodes import ReasonCodes

import tests.paho_test as paho_test

# Import test fixture
from tests.testsupport.broker import fake_broker # noqa: F401
from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401


@pytest.mark.parametrize("proto_ver", [
(client.MQTTv31),
(client.MQTTv311),
(MQTTProtocolVersion.MQTTv31),
(MQTTProtocolVersion.MQTTv311),
])
class Test_connect:
"""
Expand Down Expand Up @@ -100,7 +102,7 @@ class Test_connect_v5:

def test_01_broker_no_support(self, fake_broker):
mqttc = client.Client(
"01-broker-no-support", protocol=client.MQTTv5)
"01-broker-no-support", protocol=MQTTProtocolVersion.MQTTv5)

def on_connect(mqttc, obj, flags, reason, properties):
assert reason == 132
Expand Down Expand Up @@ -137,6 +139,112 @@ def on_connect(mqttc, obj, flags, reason, properties):
mqttc.loop_stop()


class TestConnectionLost:
def test_with_loop_start(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop_start",
protocol=MQTTProtocolVersion.MQTTv311,
reconnect_on_failure=False,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.connect_async("localhost", fake_broker.port)
mqttc.loop_start()

try:
fake_broker.start()

connect_packet = paho_test.gen_connect(
"test_with_loop_start", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()

finally:
mqttc.loop_stop()

def test_with_loop(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop",
clean_session=True,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.connect("localhost", fake_broker.port)

fake_broker.start()

# not yet connected, packet are not yet processed by loop()
assert not mqttc.is_connected()

# connect packet is sent during connect() call
connect_packet = paho_test.gen_connect(
"test_with_loop", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

# call loop() to process the connack packet
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

# call loop() to detect the connection lost
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()


class TestPublishBroker2Client:

def test_invalid_utf8_topic(self, fake_broker):
Expand Down

0 comments on commit fb42d41

Please sign in to comment.