Skip to content

Commit

Permalink
Added "packet id" report for selected operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Dec 18, 2023
1 parent 8e6c80b commit 444afbc
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 10 deletions.
3 changes: 3 additions & 0 deletions client/lib/include/cc_mqtt5_client/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ typedef struct
const char* m_reasonStr; ///< "Reason String" property, can be NULL
const CC_Mqtt5UserProp* m_userProps; ///< Pointer to "User Properties" array, can be NULL
unsigned m_userPropsCount; ///< Amount of elements in the "User Properties" array.
unsigned m_packetId; ///< "Packet Identifier" of the "subscribe" operation.
} CC_Mqtt5SubscribeResponse;

/// @brief Topic filter configuration structure of the "unsubscribe" operation.
Expand All @@ -409,6 +410,7 @@ typedef struct
const char* m_reasonStr; ///< "Reason String" property, can be NULL
const CC_Mqtt5UserProp* m_userProps; ///< Pointer to "User Properties" array, can be NULL
unsigned m_userPropsCount; ///< Amount of elements in the "User Properties" array.
unsigned m_packetId; ///< "Packet Identifier" of the "unsubscribe" operation.
} CC_Mqtt5UnsubscribeResponse;

/// @brief Received message information
Expand Down Expand Up @@ -466,6 +468,7 @@ typedef struct
const char* m_reasonStr; ///< "Reason String" property, can be NULL.
const CC_Mqtt5UserProp* m_userProps; ///< Pointer to array of "User Properties", can be NULL
unsigned m_userPropsCount; ///< Number of elements in "User Properties" array.
unsigned m_packetId; ///< "Packet Identifier" of the "publish" operation.
} CC_Mqtt5PublishResponse;

/// @brief Callback used to request time measurement.
Expand Down
29 changes: 21 additions & 8 deletions client/lib/src/op/SendOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ SendOp::~SendOp()

void SendOp::handle(PubackMsg& msg)
{
if (m_pubMsg.field_packetId().field().value() != msg.field_packetId().value()) {
auto packetId = msg.field_packetId().value();
if (m_pubMsg.field_packetId().field().value() != packetId) {
return;
}

Expand All @@ -58,6 +59,8 @@ void SendOp::handle(PubackMsg& msg)
UserPropsList userProps; // Will be referenced in response
auto response = CC_Mqtt5PublishResponse();

response.m_packetId = packetId;

auto completeOpOnExit =
comms::util::makeScopeGuard(
[this, &status, &response]()
Expand Down Expand Up @@ -123,7 +126,8 @@ void SendOp::handle(PubackMsg& msg)

void SendOp::handle(PubrecMsg& msg)
{
if (m_pubMsg.field_packetId().field().value() != msg.field_packetId().value()) {
auto packetId = msg.field_packetId().value();
if (m_pubMsg.field_packetId().field().value() != packetId) {
return;
}

Expand All @@ -141,6 +145,8 @@ void SendOp::handle(PubrecMsg& msg)
UserPropsList userProps; // Will be referenced in response
auto response = CC_Mqtt5PublishResponse();

response.m_packetId = packetId;

auto completeOpOnExit =
comms::util::makeScopeGuard(
[this, &status, &response]()
Expand Down Expand Up @@ -230,7 +236,8 @@ void SendOp::handle(PubrecMsg& msg)

void SendOp::handle(PubcompMsg& msg)
{
if (m_pubMsg.field_packetId().field().value() != msg.field_packetId().value()) {
auto packetId = msg.field_packetId().value();
if (m_pubMsg.field_packetId().field().value() != packetId) {
return;
}

Expand All @@ -248,6 +255,8 @@ void SendOp::handle(PubcompMsg& msg)
UserPropsList userProps; // Will be referenced in response
auto response = CC_Mqtt5PublishResponse();

response.m_packetId = packetId;

auto completeOpOnExit =
comms::util::makeScopeGuard(
[this, &status, &response]()
Expand Down Expand Up @@ -408,6 +417,10 @@ CC_Mqtt5ErrorCode SendOp::configBasic(const CC_Mqtt5PublishBasicConfig& config)

m_pubMsg.transportField_flags().field_retain().setBitValue_bit(config.m_retain);
m_pubMsg.transportField_flags().field_qos().setValue(config.m_qos);

if (config.m_qos > CC_Mqtt5QoS_AtMostOnceDelivery) {
m_pubMsg.field_packetId().field().setValue(allocPacketId());
}

if (mustAssignTopic) {
auto& topicStr = m_pubMsg.field_topic().value();
Expand Down Expand Up @@ -596,11 +609,6 @@ CC_Mqtt5ErrorCode SendOp::send(CC_Mqtt5PublishCompleteCb cb, void* cbData)
m_cb = cb;
m_cbData = cbData;

using Qos = PublishMsg::TransportField_flags::Field_qos::ValueType;
if (m_pubMsg.transportField_flags().field_qos().value() > Qos::AtMostOnceDelivery) {
m_pubMsg.field_packetId().field().setValue(allocPacketId());
}

m_pubMsg.doRefresh(); // Update packetId presence

m_sendAttempts = 0U;
Expand Down Expand Up @@ -628,6 +636,11 @@ CC_Mqtt5ErrorCode SendOp::cancel()
return CC_Mqtt5ErrorCode_Success;
}

unsigned SendOp::getPacketId() const
{
return m_pubMsg.field_packetId().field().getValue();
}

Op::Type SendOp::typeImpl() const
{
return Type_Send;
Expand Down
1 change: 1 addition & 0 deletions client/lib/src/op/SendOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SendOp final : public Op
unsigned getResendAttempts() const;
CC_Mqtt5ErrorCode send(CC_Mqtt5PublishCompleteCb cb, void* cbData);
CC_Mqtt5ErrorCode cancel();
unsigned getPacketId() const;

protected:
virtual Type typeImpl() const override;
Expand Down
9 changes: 8 additions & 1 deletion client/lib/src/op/SubscribeOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ SubscribeOp::SubscribeOp(ClientImpl& client) :
Base(client),
m_timer(client.timerMgr().allocTimer())
{
m_subMsg.field_packetId().setValue(allocPacketId());
}

SubscribeOp::~SubscribeOp()
Expand Down Expand Up @@ -163,7 +164,6 @@ CC_Mqtt5ErrorCode SubscribeOp::send(CC_Mqtt5SubscribeCompleteCb cb, void* cbData
m_cb = cb;
m_cbData = cbData;

m_subMsg.field_packetId().setValue(allocPacketId());
auto result = client().sendMessage(m_subMsg);
if (result != CC_Mqtt5ErrorCode_Success) {
return result;
Expand All @@ -181,6 +181,11 @@ CC_Mqtt5ErrorCode SubscribeOp::cancel()
return CC_Mqtt5ErrorCode_Success;
}

unsigned SubscribeOp::getPacketId() const
{
return m_subMsg.field_packetId().getValue();
}

void SubscribeOp::handle(SubackMsg& msg)
{
auto packetId = msg.field_packetId().value();
Expand All @@ -196,6 +201,8 @@ void SubscribeOp::handle(SubackMsg& msg)
UserPropsList userProps; // Will be referenced in response
auto response = CC_Mqtt5SubscribeResponse();

response.m_packetId = packetId;

auto terminationReason = DisconnectReason::ProtocolError;
auto terminateOnExit =
comms::util::makeScopeGuard(
Expand Down
1 change: 1 addition & 0 deletions client/lib/src/op/SubscribeOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class SubscribeOp final : public Op
CC_Mqtt5ErrorCode addUserProp(const CC_Mqtt5UserProp& prop);
CC_Mqtt5ErrorCode send(CC_Mqtt5SubscribeCompleteCb cb, void* cbData);
CC_Mqtt5ErrorCode cancel();
unsigned getPacketId() const;

using Base::handle;
virtual void handle(SubackMsg& msg) override;
Expand Down
10 changes: 9 additions & 1 deletion client/lib/src/op/UnsubscribeOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ UnsubscribeOp::UnsubscribeOp(ClientImpl& client) :
Base(client),
m_timer(client.timerMgr().allocTimer())
{
m_unsubMsg.field_packetId().setValue(allocPacketId());
}

UnsubscribeOp::~UnsubscribeOp()
Expand Down Expand Up @@ -137,7 +138,6 @@ CC_Mqtt5ErrorCode UnsubscribeOp::send(CC_Mqtt5UnsubscribeCompleteCb cb, void* cb
m_cb = cb;
m_cbData = cbData;

m_unsubMsg.field_packetId().setValue(allocPacketId());
auto result = client().sendMessage(m_unsubMsg);
if (result != CC_Mqtt5ErrorCode_Success) {
return result;
Expand All @@ -155,8 +155,14 @@ CC_Mqtt5ErrorCode UnsubscribeOp::cancel()
return CC_Mqtt5ErrorCode_Success;
}

unsigned UnsubscribeOp::getPacketId() const
{
return m_unsubMsg.field_packetId().getValue();
}

void UnsubscribeOp::handle(UnsubackMsg& msg)
{
auto packetId = msg.field_packetId().value();
if (msg.field_packetId().value() != m_unsubMsg.field_packetId().value()) {
return;
}
Expand All @@ -169,6 +175,8 @@ void UnsubscribeOp::handle(UnsubackMsg& msg)
UserPropsList userProps; // Will be referenced in response
auto response = CC_Mqtt5UnsubscribeResponse();

response.m_packetId = packetId;

auto terminationReason = DisconnectReason::ProtocolError;
auto terminateOnExit =
comms::util::makeScopeGuard(
Expand Down
1 change: 1 addition & 0 deletions client/lib/src/op/UnsubscribeOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class UnsubscribeOp final : public Op
CC_Mqtt5ErrorCode addUserProp(const CC_Mqtt5UserProp& prop);
CC_Mqtt5ErrorCode send(CC_Mqtt5UnsubscribeCompleteCb cb, void* cbData);
CC_Mqtt5ErrorCode cancel();
unsigned getPacketId() const;

using Base::handle;
virtual void handle(UnsubackMsg& msg) override;
Expand Down
27 changes: 27 additions & 0 deletions client/lib/templ/client.cpp.templ
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_subscribe_cancel(CC_Mqtt5SubscribeHand
return subscribeOpFromHandle(handle)->cancel();
}

unsigned cc_mqtt5_##NAME##client_subscribe_get_packet_id(CC_Mqtt5SubscribeHandle handle)
{
if (handle == nullptr) {
return 0U;
}

return subscribeOpFromHandle(handle)->getPacketId();
}

CC_Mqtt5UnsubscribeHandle cc_mqtt5_##NAME##client_unsubscribe_prepare(CC_Mqtt5ClientHandle handle, CC_Mqtt5ErrorCode* ec)
{
if (handle == nullptr) {
Expand Down Expand Up @@ -648,6 +657,15 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_unsubscribe_cancel(CC_Mqtt5Unsubscribe
return unsubscribeOpFromHandle(handle)->cancel();
}

unsigned cc_mqtt5_##NAME##client_unsubscribe_get_packet_id(CC_Mqtt5UnsubscribeHandle handle)
{
if (handle == nullptr) {
return CC_Mqtt5ErrorCode_BadParam;
}

return unsubscribeOpFromHandle(handle)->getPacketId();
}

CC_Mqtt5PublishHandle cc_mqtt5_##NAME##client_publish_prepare(CC_Mqtt5ClientHandle handle, CC_Mqtt5ErrorCode* ec)
{
if (handle == nullptr) {
Expand Down Expand Up @@ -752,6 +770,15 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_publish_cancel(CC_Mqtt5PublishHandle h
return sendOpFromHandle(handle)->cancel();
}

unsigned cc_mqtt5_##NAME##client_publish_get_packet_id(CC_Mqtt5PublishHandle handle)
{
if (handle == nullptr) {
return 0U;
}

return sendOpFromHandle(handle)->getPacketId();
}

CC_Mqtt5ReauthHandle cc_mqtt5_##NAME##client_reauth_prepare(CC_Mqtt5ClientHandle handle, CC_Mqtt5ErrorCode* ec)
{
if (handle == nullptr) {
Expand Down
39 changes: 39 additions & 0 deletions client/lib/templ/client.h.templ
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_connect_send(CC_Mqtt5ConnectHandle han
/// @ingroup connect
CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_connect_cancel(CC_Mqtt5ConnectHandle handle);


/// @brief Check the inner state of the library of whether it's connected to the broker.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_alloc() function.
/// @ingroup connect
Expand Down Expand Up @@ -431,6 +432,16 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_subscribe_send(CC_Mqtt5SubscribeHandle
/// @ingroup subscribe
CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_subscribe_cancel(CC_Mqtt5SubscribeHandle handle);

/// @brief Retrieve "Packet Identifier" of the "subscribe" operation.
/// @details The retrieved value can be used to differentiate between the "subscribe" operations
/// in case they use the same callback when calling the
/// @ref cc_mqtt5_##NAME##client_subscribe_send() function.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_subscribe_prepare() function.
/// @pre The function can be called while the handle is valid, i.e. until the callback
/// of the operation completion is called or until the operation is cancelled.
/// @return "Packet Identifier" value
unsigned cc_mqtt5_##NAME##client_subscribe_get_packet_id(CC_Mqtt5SubscribeHandle handle);

/// @brief Prepare "unsubscribe" operation.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_alloc() function.
/// @param[out] ec Error code reporting result of the operation. Can be NULL.
Expand Down Expand Up @@ -494,6 +505,16 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_unsubscribe_send(CC_Mqtt5UnsubscribeHa
/// @ingroup unsubscribe
CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_unsubscribe_cancel(CC_Mqtt5UnsubscribeHandle handle);

/// @brief Retrieve "Packet Identifier" of the "unsubscribe" operation.
/// @details The retrieved value can be used to differentiate between the "unsubscribe" operations
/// in case they use the same callback when calling the
/// @ref cc_mqtt5_##NAME##client_unsubscribe_send() function.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_unsubscribe_prepare() function.
/// @pre The function can be called while the handle is valid, i.e. until the callback
/// of the operation completion is called or until the operation is cancelled.
/// @return "Packet Identifier" value
unsigned cc_mqtt5_##NAME##client_unsubscribe_get_packet_id(CC_Mqtt5UnsubscribeHandle handle);

/// @brief Prepare "publish" operation.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_alloc() function.
/// @param[out] ec Error code reporting result of the operation. Can be NULL.
Expand Down Expand Up @@ -586,6 +607,24 @@ CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_publish_send(CC_Mqtt5PublishHandle han
/// @ingroup publish
CC_Mqtt5ErrorCode cc_mqtt5_##NAME##client_publish_cancel(CC_Mqtt5PublishHandle handle);

/// @brief Retrieve "Packet Identifier" of the "publish" operation.
/// @details The retrieved value can be used to differentiate between the "publish" operations
/// in case they use the same callback when calling the
/// @ref cc_mqtt5_##NAME##client_publish_send() function.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_publish_prepare() function.
/// @pre The function is expected to be called after the @ref cc_mqtt5_##NAME##client_publish_config_basic()
/// which sets the QoS value. In case the QoS value is @ref CC_Mqtt5QoS_AtMostOnceDelivery the
/// "Packet Identifier" is not allocated and @b 0 is returned
/// @pre The function can be called while the handle is valid, i.e. until the callback
/// of the operation completion is called or until the operation is cancelled.
/// @note When QoS is @b 0 (@ref CC_Mqtt5QoS_AtMostOnceDelivery), the callback function is
//// invoked immediately from within the @ref cc_mqtt5_##NAME##client_publish_send()
/// invocation immediatelly invalidating the handle, i.e. in such case the
/// @ref cc_mqtt5_##NAME##client_publish_get_packet_id() cannot be called after the
/// @ref cc_mqtt5_##NAME##client_publish_send().
/// @return "Packet Identifier" value when allocated, @b 0 otherwise.
unsigned cc_mqtt5_##NAME##client_publish_get_packet_id(CC_Mqtt5PublishHandle handle);

/// @brief Prepare "reauth" operation.
/// @param[in] handle Handle returned by @ref cc_mqtt5_##NAME##client_alloc() function.
/// @param[out] ec Error code reporting result of the operation. Can be NULL.
Expand Down
3 changes: 3 additions & 0 deletions client/lib/test/unit/UnitTestBmBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const UnitTestBmBase::LibFuncs& UnitTestBmBase::getFuncs()
funcs.m_subscribe_add_user_prop = &cc_mqtt5_bm_client_subscribe_add_user_prop;
funcs.m_subscribe_send = &cc_mqtt5_bm_client_subscribe_send;
funcs.m_subscribe_cancel = &cc_mqtt5_bm_client_subscribe_cancel;
funcs.m_subscribe_get_packet_id = &cc_mqtt5_bm_client_subscribe_get_packet_id;
funcs.m_unsubscribe_prepare = &cc_mqtt5_bm_client_unsubscribe_prepare;
funcs.m_unsubscribe_set_response_timeout = &cc_mqtt5_bm_client_unsubscribe_set_response_timeout;
funcs.m_unsubscribe_get_response_timeout = &cc_mqtt5_bm_client_unsubscribe_get_response_timeout;
Expand All @@ -67,6 +68,7 @@ const UnitTestBmBase::LibFuncs& UnitTestBmBase::getFuncs()
funcs.m_unsubscribe_add_user_prop = &cc_mqtt5_bm_client_unsubscribe_add_user_prop;
funcs.m_unsubscribe_send = &cc_mqtt5_bm_client_unsubscribe_send;
funcs.m_unsubscribe_cancel = &cc_mqtt5_bm_client_unsubscribe_cancel;
funcs.m_unsubscribe_get_packet_id = &cc_mqtt5_bm_client_unsubscribe_get_packet_id;
funcs.m_publish_prepare = &cc_mqtt5_bm_client_publish_prepare;
funcs.m_publish_init_config_basic = &cc_mqtt5_bm_client_publish_init_config_basic;
funcs.m_publish_init_config_extra = &cc_mqtt5_bm_client_publish_init_config_extra;
Expand All @@ -79,6 +81,7 @@ const UnitTestBmBase::LibFuncs& UnitTestBmBase::getFuncs()
funcs.m_publish_add_user_prop = &cc_mqtt5_bm_client_publish_add_user_prop;
funcs.m_publish_send = &cc_mqtt5_bm_client_publish_send;
funcs.m_publish_cancel = &cc_mqtt5_bm_client_publish_cancel;
funcs.m_publish_get_packet_id = &cc_mqtt5_bm_client_publish_get_packet_id;
funcs.m_reauth_prepare = &cc_mqtt5_bm_client_reauth_prepare;
funcs.m_reauth_init_config_auth = &cc_mqtt5_bm_client_reauth_init_config_auth;
funcs.m_reauth_set_response_timeout = &cc_mqtt5_bm_client_reauth_set_response_timeout;
Expand Down
Loading

0 comments on commit 444afbc

Please sign in to comment.