Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

解决个别异常网络情况下的稳定性 #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 104 additions & 20 deletions mqttclient/mqttclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,22 @@ static client_state_t mqtt_get_client_state(mqtt_client_t* c)
static void mqtt_set_client_state(mqtt_client_t* c, client_state_t state)
{
platform_mutex_lock(&c->mqtt_global_lock);
c->mqtt_client_state = state;
if(c->mqtt_client_state == CLIENT_STATE_CLEAN_SESSION)
{
/*CLIENT_STATE_CLEAN_SESSION can only change to CLIENT_STATE_INVALID zhaoshimin 20220620*/
if(state == CLIENT_STATE_INVALID)
{
c->mqtt_client_state = CLIENT_STATE_INVALID;
}
else
{
/*no change the c->mqtt_client_state*/
}
}
else
{
c->mqtt_client_state = state;
}
platform_mutex_unlock(&c->mqtt_global_lock);
}

Expand Down Expand Up @@ -491,11 +506,18 @@ static void mqtt_clean_session(mqtt_client_t* c)
ack_handlers_t *ack_handler;
message_handlers_t *msg_handler;

platform_mutex_lock(&c->mqtt_write_lock);
/* release all ack_handler_list memory */
if (!(mqtt_list_is_empty(&c->mqtt_ack_handler_list))) {
LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) {
ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
if(ack_handler->handler)
{
platform_memory_free(ack_handler->handler);
ack_handler->handler = RT_NULL;
}
platform_memory_free(ack_handler);
mqtt_subtract_ack_handler_num(c);
}
mqtt_list_del_init(&c->mqtt_ack_handler_list);
}
Expand All @@ -509,6 +531,7 @@ static void mqtt_clean_session(mqtt_client_t* c)
}
mqtt_list_del_init(&c->mqtt_msg_handler_list);
}
platform_mutex_unlock(&c->mqtt_write_lock);

mqtt_set_client_state(c, CLIENT_STATE_INVALID);
}
Expand All @@ -523,6 +546,7 @@ static void mqtt_ack_list_scan(mqtt_client_t* c, uint8_t flag)
{
mqtt_list_t *curr, *next;
ack_handlers_t *ack_handler;
message_handlers_t *msg_handler;

if ((mqtt_list_is_empty(&c->mqtt_ack_handler_list)) || (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)))
return;
Expand All @@ -543,8 +567,14 @@ static void mqtt_ack_list_scan(mqtt_client_t* c, uint8_t flag)

if(flag)
{
/*when flag == 0, rconnet the broker, resubscribe creat the type SUBACK ack_handler_list, it not been destroyed zhaoshimin 20200629*/
/*when flag == 0, reconncet the broker, resubscribe creat the type SUBACK ack_handler_list, it not been destroyed zhaoshimin 20200629*/
platform_mutex_lock(&c->mqtt_write_lock);
msg_handler = ack_handler->handler;
if(msg_handler)
{
mqtt_msg_handler_destory(msg_handler);
msg_handler = RT_NULL;
}
mqtt_ack_handler_destroy(ack_handler);
mqtt_subtract_ack_handler_num(c);
platform_mutex_unlock(&c->mqtt_write_lock);
Expand All @@ -558,7 +588,7 @@ static int mqtt_try_resubscribe(mqtt_client_t* c)
mqtt_list_t *curr, *next;
message_handlers_t *msg_handler;

KAWAII_MQTT_LOG_W("%s:%d %s()... mqtt try resubscribe ...", __FILE__, __LINE__, __FUNCTION__);
KAWAII_MQTT_LOG_I("%s:%d %s()... mqtt try resubscribe ...", __FILE__, __LINE__, __FUNCTION__);

if (mqtt_list_is_empty(&c->mqtt_msg_handler_list))
RETURN_ERROR(KAWAII_MQTT_SUCCESS_ERROR);
Expand Down Expand Up @@ -587,8 +617,10 @@ static int mqtt_try_do_reconnect(mqtt_client_t* c)
/* process these ack messages immediately after reconnecting */
mqtt_ack_list_scan(c, 0);
}

KAWAII_MQTT_LOG_D("%s:%d %s()... mqtt try connect result is -0x%04x", __FILE__, __LINE__, __FUNCTION__, -rc);
else
{
KAWAII_MQTT_LOG_E("%s:%d %s()... mqtt try connect result is -0x%04x", __FILE__, __LINE__, __FUNCTION__, -rc);
}

RETURN_ERROR(rc);
}
Expand All @@ -608,8 +640,14 @@ static int mqtt_try_reconnect(mqtt_client_t* c)
/*connect fail must delay reconnect try duration time and let cpu time go out, the lowest priority task can run */
mqtt_sleep_ms(c->mqtt_reconnect_try_duration);
RETURN_ERROR(KAWAII_MQTT_RECONNECT_TIMEOUT_ERROR);
}
else {
} else {

/* when connect server success, call the connect success callback function*/
if(c->mqtt_connect_handler)
{
c->mqtt_connect_handler(c, c->mqtt_connect_data);
}

RETURN_ERROR(rc);
}

Expand Down Expand Up @@ -832,6 +870,8 @@ static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)

switch (packet_type) {
case 0: /* timed out reading packet */
/*when mqtt closed by server, funtion mqtt_read_pacek return no delay time, so add mqtt_sleep_ms to let cpu time zhaoshimin 20200723*/
mqtt_sleep_ms(100);
break;

case CONNACK: /* has been processed */
Expand Down Expand Up @@ -939,15 +979,15 @@ static int mqtt_yield(mqtt_client_t* c, int timeout_ms)

static void mqtt_yield_thread(void *arg)
{
int rc;
int rc = 0;

mqtt_client_t *c = (mqtt_client_t *)arg;


while (1) {
rc = mqtt_yield(c, c->mqtt_cmd_timeout);
if (KAWAII_MQTT_CLEAN_SESSION_ERROR == rc) {
KAWAII_MQTT_LOG_E("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
KAWAII_MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
network_disconnect(c->mqtt_network);
mqtt_clean_session(c);
break;
Expand All @@ -957,6 +997,7 @@ static void mqtt_yield_thread(void *arg)
}
/*let the rtos recycles thread resources zhaoshimin 20200629*/
platform_thread_destroy(c->mqtt_thread);
c->mqtt_thread = RT_NULL;

}

Expand All @@ -966,14 +1007,25 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
int rc = KAWAII_MQTT_CONNECT_FAILED_ERROR;
platform_timer_t connect_timer;
mqtt_connack_data_t connack_data = {0};
client_state_t state;
MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer;

if (NULL == c)
{
RETURN_ERROR(KAWAII_MQTT_NULL_VALUE_ERROR);
}

if (CLIENT_STATE_CONNECTED == mqtt_get_client_state(c))
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CONNECTED == state)
{
RETURN_ERROR(KAWAII_MQTT_SUCCESS_ERROR);

}
else if(CLIENT_STATE_CLEAN_SESSION == state)
{
RETURN_ERROR(KAWAII_MQTT_CLEAN_SESSION_ERROR);
}
/*protect the socket zhaoshimin 20220622 */
platform_mutex_lock(&c->mqtt_write_lock);

#ifdef KAWAII_MQTT_NETWORK_TYPE_TLS
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
Expand All @@ -985,10 +1037,10 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
if (KAWAII_MQTT_SUCCESS_ERROR != rc) {
/*when connect faile, you should call network_release to release socket file descriptor zhaoshimin 20200629*/
network_release(c->mqtt_network);
platform_mutex_unlock(&c->mqtt_write_lock);
RETURN_ERROR(rc);
}

KAWAII_MQTT_LOG_I("%s:%d %s()... mqtt connect success...", __FILE__, __LINE__, __FUNCTION__);

connect_data.keepAliveInterval = c->mqtt_keep_alive_interval;
connect_data.cleansession = c->mqtt_clean_session;
Expand All @@ -1007,8 +1059,6 @@ static int mqtt_connect_with_results(mqtt_client_t* c)

platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000));

platform_mutex_lock(&c->mqtt_write_lock);

/* serialize connect packet */
if ((len = MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)) <= 0)
goto exit;
Expand All @@ -1025,8 +1075,9 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
rc = connack_data.rc;
else
rc = KAWAII_MQTT_CONNECT_FAILED_ERROR;
} else
} else {
rc = KAWAII_MQTT_CONNECT_FAILED_ERROR;
}

exit:
if (rc == KAWAII_MQTT_SUCCESS_ERROR) {
Expand All @@ -1053,7 +1104,15 @@ static int mqtt_connect_with_results(mqtt_client_t* c)

c->mqtt_ping_outstanding = 0; /* reset ping outstanding */

/* call the connect success callback function*/
if((rc == KAWAII_MQTT_SUCCESS_ERROR) && (c->mqtt_connect_handler))
{
c->mqtt_connect_handler(c, c->mqtt_connect_data);
}

} else {
/*when server ack error, it must close the mqtt socket zhaoshimin 20200724 */
network_release(c->mqtt_network);
mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */
}

Expand All @@ -1072,6 +1131,7 @@ static int mqtt_init(mqtt_client_t* c)
RETURN_ERROR(KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR);
}
memset(c->mqtt_network, 0, sizeof(network_t));
c->mqtt_network->socket = -1;

c->mqtt_packet_id = 1;
c->mqtt_clean_session = 0; //no clear session by default
Expand All @@ -1092,6 +1152,8 @@ static int mqtt_init(mqtt_client_t* c)
c->mqtt_reconnect_data = NULL;
c->mqtt_reconnect_handler = NULL;
c->mqtt_interceptor_handler = NULL;
c->mqtt_connect_data = NULL;
c->mqtt_connect_handler = NULL;

/*only malloc write buf and read buf when call mqtt_init function */
if ((KAWAII_MQTT_MIN_PAYLOAD_SIZE >= c->mqtt_read_buf_size) || (KAWAII_MQTT_MAX_PAYLOAD_SIZE <= c->mqtt_read_buf_size))
Expand Down Expand Up @@ -1139,6 +1201,8 @@ KAWAII_MQTT_CLIENT_SET_DEFINE(write_buf_size, uint32_t, 0)
KAWAII_MQTT_CLIENT_SET_DEFINE(reconnect_try_duration, uint32_t, 0)
KAWAII_MQTT_CLIENT_SET_DEFINE(reconnect_handler, reconnect_handler_t, NULL)
KAWAII_MQTT_CLIENT_SET_DEFINE(interceptor_handler, interceptor_handler_t, NULL)
KAWAII_MQTT_CLIENT_SET_DEFINE(connect_handler, connect_handler_t, NULL)
KAWAII_MQTT_CLIENT_SET_DEFINE(connect_data, void*, NULL)

void mqtt_sleep_ms(int ms)
{
Expand Down Expand Up @@ -1252,6 +1316,20 @@ int mqtt_disconnect(mqtt_client_t* c)

platform_timer_cutdown(&timer, c->mqtt_cmd_timeout);

if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
{
/*��δ���ӣ�����mqtt�̻߳������е�����²�ִ�� */
if(c->mqtt_thread)
{
mqtt_set_client_state(c, CLIENT_STATE_CLEAN_SESSION);
return KAWAII_MQTT_FAILED_ERROR;
}
else
{
return KAWAII_MQTT_SUCCESS_ERROR;
}
}

platform_mutex_lock(&c->mqtt_write_lock);

/* serialize disconnect packet and send it */
Expand Down Expand Up @@ -1302,7 +1380,10 @@ int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, m
}

rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);

if(KAWAII_MQTT_SUCCESS_ERROR != rc)
{
platform_memory_free(msg_handler);
}
exit:

platform_mutex_unlock(&c->mqtt_write_lock);
Expand Down Expand Up @@ -1341,7 +1422,10 @@ int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter)
}

rc = mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler);

if(KAWAII_MQTT_SUCCESS_ERROR != rc)
{
platform_memory_free(msg_handler);
}
exit:

platform_mutex_unlock(&c->mqtt_write_lock);
Expand All @@ -1359,7 +1443,7 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg

if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)) {
rc = KAWAII_MQTT_NOT_CONNECT_ERROR;
goto exit;
RETURN_ERROR(rc);
}

if ((NULL != msg->payload) && (0 == msg->payloadlen))
Expand Down Expand Up @@ -1402,7 +1486,8 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg

platform_mutex_unlock(&c->mqtt_write_lock);

if ((KAWAII_MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc) || (KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR == rc)) {
if ((KAWAII_MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc) || (KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR == rc) ||
(KAWAII_MQTT_SEND_PACKET_ERROR == rc)) {
KAWAII_MQTT_LOG_W("%s:%d %s()... there is not enough memory space to record...", __FILE__, __LINE__, __FUNCTION__);

/*must realse the socket file descriptor zhaoshimin 20200629*/
Expand All @@ -1418,7 +1503,6 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg

int mqtt_list_subscribe_topic(mqtt_client_t* c)
{
int i = 0;
mqtt_list_t *curr, *next;
message_handlers_t *msg_handler;

Expand Down
5 changes: 5 additions & 0 deletions mqttclient/mqttclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct message_data {
typedef void (*interceptor_handler_t)(void* client, message_data_t* msg);
typedef void (*message_handler_t)(void* client, message_data_t* msg);
typedef void (*reconnect_handler_t)(void* client, void* reconnect_date);
typedef void (*connect_handler_t)(void* client, void* connect_date);

typedef struct message_handlers {
mqtt_list_t list;
Expand Down Expand Up @@ -94,6 +95,7 @@ typedef struct mqtt_client {
char *mqtt_port;
char *mqtt_ca;
void *mqtt_reconnect_data;
void *mqtt_connect_data;
uint8_t *mqtt_read_buf;
uint8_t *mqtt_write_buf;
uint16_t mqtt_keep_alive_interval;
Expand Down Expand Up @@ -122,6 +124,7 @@ typedef struct mqtt_client {
platform_timer_t mqtt_last_received;
reconnect_handler_t mqtt_reconnect_handler;
interceptor_handler_t mqtt_interceptor_handler;
connect_handler_t mqtt_connect_handler;
} mqtt_client_t;


Expand Down Expand Up @@ -156,6 +159,8 @@ KAWAII_MQTT_CLIENT_SET_STATEMENT(write_buf_size, uint32_t)
KAWAII_MQTT_CLIENT_SET_STATEMENT(reconnect_try_duration, uint32_t)
KAWAII_MQTT_CLIENT_SET_STATEMENT(reconnect_handler, reconnect_handler_t)
KAWAII_MQTT_CLIENT_SET_STATEMENT(interceptor_handler, interceptor_handler_t)
KAWAII_MQTT_CLIENT_SET_STATEMENT(connect_data, void*)
KAWAII_MQTT_CLIENT_SET_STATEMENT(connect_handler, connect_handler_t)

void mqtt_sleep_ms(int ms);
mqtt_client_t *mqtt_lease(void);
Expand Down
7 changes: 5 additions & 2 deletions network/nettype_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ int nettype_tcp_connect(network_t* n)

void nettype_tcp_disconnect(network_t* n)
{
if (NULL != n)
if((NULL != n) && (n->socket >= 0))
{
platform_net_socket_close(n->socket);
n->socket = -1;
n->socket = -1;
}

}
7 changes: 5 additions & 2 deletions platform/RT-Thread/platform_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

void *platform_memory_alloc(size_t size)
{
char *ptr;
char *ptr = RT_NULL;
ptr = rt_malloc(size);
memset(ptr, 0, size);
if(ptr)
{
memset(ptr, 0, size);
}
return (void *)ptr;
}

Expand Down
2 changes: 1 addition & 1 deletion test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static int mqtt_publish_handle1(mqtt_client_t *client)
}


int main(void)
int main_mqtt(void)
{
mqtt_client_t *client = NULL;

Expand Down