Skip to content

Commit

Permalink
Revert "added patch from eclipse-mosquitto/mosquitto.rsmb#11 fixes so…
Browse files Browse the repository at this point in the history
…me persistence"

This reverts commit 97367b2.
  • Loading branch information
Demo User committed Jul 21, 2016
1 parent 33c4d9f commit bc8f4b2
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 68 deletions.
20 changes: 10 additions & 10 deletions rsmb/src/MQTTProtocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
Expand Down Expand Up @@ -295,13 +295,13 @@ void MQTTProtocol_update(time_t now)

sprintf(buf, "%d", bstate->clients->count);
MQTTProtocol_sys_publish("$SYS/broker/client count/connected", buf);

sprintf(buf, "%d", bstate->disconnected_clients->count);
MQTTProtocol_sys_publish("$SYS/broker/client count/disconnected", buf);

sprintf(buf, "%d", bstate->se->subs->count);
MQTTProtocol_sys_publish("$SYS/broker/subscriptions/count", buf);

sprintf(buf, "%d", bstate->se->wsubs->count);
MQTTProtocol_sys_publish("$SYS/broker/wildcard_subscriptions/count", buf);

Expand All @@ -313,7 +313,7 @@ void MQTTProtocol_update(time_t now)

sprintf(buf, "%d", bstate->max_inflight_messages);
MQTTProtocol_sys_publish("$SYS/broker/settings/max_inflight_messages", buf);

sprintf(buf, "%d", bstate->ffdc_count);
MQTTProtocol_sys_publish("$SYS/broker/ffdc/count", buf);

Expand Down Expand Up @@ -389,7 +389,7 @@ int MQTTProtocol_housekeeping(int more_work)
{
time(&(last_keepalive));
MQTTProtocol_keepalive(now);
more_work = MQTTProtocol_retry(bstate->clients, now, 1);
more_work = MQTTProtocol_retry(now, 1);
MQTTProtocol_update(now);
Socket_cleanNew(now);
}
Expand Down Expand Up @@ -633,8 +633,8 @@ int MQTTProtocol_handleConnects(void* pack, int sock, Clients* client)
Log(LOG_PROTOCOL, 26, NULL, sock, connect->clientID);/*
connect->Protocol, connect->flags.bits.cleanstart, connect->keepAliveTimer,
connect->version, connect->username, connect->password);*/
Socket_removeNew(sock);

Socket_removeNew(sock);
if (bstate->state != BROKER_RUNNING)
terminate = 1; /* don't accept new connection requests when we are shutting down */
/* Now check the version. If we don't recognize it we will not have parsed the packet,
Expand Down
8 changes: 4 additions & 4 deletions rsmb/src/MQTTProtocolClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ int MQTTProtocol_processQueued(Clients* client)
#if defined(QOS0_SEND_LIMIT)
&& qos0count < bstate->max_inflight_messages /* an arbitrary criterion - but when would we restart? */
#endif
#if 0 //defined(MQTTS)
#if defined(MQTTS)
&& (client->protocol == PROTOCOL_MQTT || client->outboundMsgs->count == 0)
#endif
)
Expand Down Expand Up @@ -854,18 +854,18 @@ void MQTTProtocol_retries(time_t now, Clients* client)
* @param doRetry boolean - retries as well as pending writes?
* @return not actually used
*/
int MQTTProtocol_retry(Tree* clients, time_t now, int doRetry)
int MQTTProtocol_retry(time_t now, int doRetry)
{
Node* current = NULL;
int rc = 0;

FUNC_ENTRY;
current = TreeNextElement(clients, current);
current = TreeNextElement(bstate->clients, current);
/* look through the outbound message list of each client, checking to see if a retry is necessary */
while (current)
{
Clients* client = (Clients*)(current->content);
current = TreeNextElement(clients, current);
current = TreeNextElement(bstate->clients, current);
if (client->connected == 0)
{
#if defined(MQTTS)
Expand Down
8 changes: 4 additions & 4 deletions rsmb/src/MQTTProtocolClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
Expand Down Expand Up @@ -54,7 +54,7 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock, Clients* client);

void MQTTProtocol_keepalive(time_t);
int MQTTProtocol_processQueued(Clients* client);
int MQTTProtocol_retry(Tree*, time_t, int);
int MQTTProtocol_retry(time_t, int);
void MQTTProtocol_retries(time_t now, Clients* client);
void MQTTProtocol_freeClient(Clients* client);
void MQTTProtocol_removeQoS0Messages(List* msgList);
Expand Down
38 changes: 16 additions & 22 deletions rsmb/src/MQTTSProtocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void MQTTSProtocol_terminate()
/**
* MQTTs protocol advertise processing.
*/
void MQTTSProtocol_housekeeping(int work)
void MQTTSProtocol_housekeeping()
{
ListElement* current = NULL;
time_t now = 0;
Expand All @@ -176,12 +176,13 @@ void MQTTSProtocol_housekeeping(int work)
/* for each listener, if the advertise parameter is set and the interval has expired,
* call send_advertise
*/
time(&(now));
while (ListNextElement(bstate->listeners, &current))
{
Listener *listener = (Listener*)current->content;
if (listener->advertise)
{
if (now == 0)
time(&(now));
if (difftime(now, listener->advertise->last) > listener->advertise->interval)
{
int rc = 0;
Expand All @@ -192,10 +193,7 @@ void MQTTSProtocol_housekeeping(int work)
}
}
}
work = MQTTProtocol_retry(bstate->mqtts_clients, now, 1);

FUNC_EXIT_RC(work);
return work;
FUNC_EXIT;
}


Expand Down Expand Up @@ -344,7 +342,7 @@ int MQTTSProtocol_handleConnects(void* pack, int sock, char* clientAddr, Clients
int terminate = 0;
Node* elem = NULL;
int rc = 0;
int existingClient = 1;
int existingClient = 0;

FUNC_ENTRY;
Log(LOG_PROTOCOL, 39, NULL, sock, clientAddr, client ? client->clientID : "", connect->flags.cleanSession);
Expand Down Expand Up @@ -396,25 +394,22 @@ int MQTTSProtocol_handleConnects(void* pack, int sock, char* clientAddr, Clients
if (elem == NULL)
{
client = TreeRemoveKey(bstate->disconnected_mqtts_clients, connect->clientID);
if (client == NULL)
if (client == NULL) /* this is a totally new connection */
{
/* Brand new client connection */
int i;

if((client = TreeRemoveKey(bstate->disconnected_clients, connect->clientID)) == NULL) /* this is a totally new connection */
{
client = malloc(sizeof(Clients));
memset(client, '\0', sizeof(Clients));
client->outboundMsgs = ListInitialize();
client->inboundMsgs = ListInitialize();
for (i = 0; i < PRIORITY_MAX; ++i)
client->queuedMsgs[i] = ListInitialize();
client->registrations = ListInitialize();
existingClient = 0;
client->clientID = connect->clientID;
connect->clientID = NULL; /* don't want to free this space as it is being used in the clients tree below */
}
client = malloc(sizeof(Clients));
memset(client, '\0', sizeof(Clients));
client->protocol = PROTOCOL_MQTTS;
client->outboundMsgs = ListInitialize();
client->inboundMsgs = ListInitialize();
for (i = 0; i < PRIORITY_MAX; ++i)
client->queuedMsgs[i] = ListInitialize();
client->registrations = ListInitialize();
client->noLocal = 0; /* (connect->version == PRIVATE_PROTOCOL_VERSION) ? 1 : 0; */
client->clientID = connect->clientID;
connect->clientID = NULL; /* don't want to free this space as it is being used in the clients tree below */
// Set Wireless Node ID if exists
if ( wirelessNodeId == NULL)
{
Expand Down Expand Up @@ -797,7 +792,6 @@ int MQTTSProtocol_handlePubacks(void* pack, int sock, char* clientAddr, Clients*
Log(LOG_WARNING, 50, NULL, "PUBACK", client->clientID, puback->msgId);
} else {
Messages* m = (Messages*)(client->outboundMsgs->current->content);
Log(LOG_PROTOCOL, 56, NULL, sock, clientAddr, client ? client->clientID : "", puback->msgId, m->qos);
if (m->qos != 1) {
Log(LOG_WARNING, 51, NULL, "PUBACK", client->clientID, puback->msgId, m->qos);
} else {
Expand Down
8 changes: 4 additions & 4 deletions rsmb/src/MQTTSProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
Expand All @@ -27,7 +27,7 @@

int MQTTSProtocol_initialize(BrokerStates* aBrokerState);
void MQTTSProtocol_terminate();
void MQTTSProtocol_housekeeping(int work);
void MQTTSProtocol_housekeeping();
void MQTTSProtocol_timeslice(int sock);

int MQTTSProtocol_handleAdvertises(void* pack, int sock, char* clientAddr, Clients* client);
Expand Down
25 changes: 11 additions & 14 deletions rsmb/src/Persistence.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
Expand Down Expand Up @@ -1190,9 +1190,6 @@ Clients* Persistence_createDefaultClient(char* clientID)
for (i = 0; i < PRIORITY_MAX; ++i)
newc->queuedMsgs[i] = ListInitialize();
FUNC_EXIT;
#if defined(MQTTS)
newc->registrations = ListInitialize();
#endif
return newc;
}

Expand Down Expand Up @@ -1233,10 +1230,10 @@ Subscriptions* Persistence_read_subscription()
s->wildcards = Topics_hasWildcards(s->topicName);
s->priority = PRIORITY_NORMAL;
success = 1;

if ((elem = TreeFind(bstate->disconnected_clients, s->clientName)) == NULL)
{
/*printf("adding sub for client %s\n", s->clientName);*/
/*printf("adding sub for client %s\n", s->clientName);*/
TreeAdd(bstate->disconnected_clients, Persistence_createDefaultClient(s->clientName),
sizeof(Clients) + strlen(s->clientName)+1 + 3*sizeof(List));
}
Expand Down Expand Up @@ -1274,24 +1271,24 @@ void Persistence_close_file(int write_error)
if (rfile)
{
char* bak1 = add_prefix(cur_backup_fn1);

int rc = fclose(rfile);

if (write_error != 0 || rc != 0)
{
char* loc = add_prefix(cur_fn);
char* bak = add_prefix(cur_backup_fn);

_unlink(loc); /* remove the erroneously written persistence file */
_unlink(loc); /* remove the erroneously written persistence file */
rename(bak, loc); /* restore the backup */
rename(bak1, bak); /* restore the 2nd backup to backup */

free_prefix(loc, cur_fn);
free_prefix(bak, cur_backup_fn);
}
else
_unlink(bak1); /* remove the second backup */

free_prefix(bak1, cur_backup_fn1);
}
}
Expand All @@ -1316,7 +1313,7 @@ int take_FFDC(char* symptoms)
#define SYMPTOM_BUFFER_SIZE 256
char symptom_buffer[SYMPTOM_BUFFER_SIZE];
snprintf(symptom_buffer, SYMPTOM_BUFFER_SIZE, "Requested by command. %s", symptoms);
symptom_buffer[SYMPTOM_BUFFER_SIZE - 1] = '\0';
symptom_buffer[SYMPTOM_BUFFER_SIZE - 1] = '\0';
Broker_recordFFDC(symptom_buffer);
return 0;
}
Expand Down
20 changes: 10 additions & 10 deletions rsmb/src/Protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
Expand Down Expand Up @@ -162,7 +162,7 @@ void Protocol_timeslice()
}
#endif
}

if (bridge_connection == 0)
#if defined(MQTTS)
{
Expand Down Expand Up @@ -191,7 +191,7 @@ void Protocol_timeslice()
Protocol_closing();
more_work = MQTTProtocol_housekeeping(more_work);
#if defined(MQTTS)
mqttsn_work = MQTTSProtocol_housekeeping(mqttsn_work);
MQTTSProtocol_housekeeping();
#endif
exit:
FUNC_EXIT;
Expand All @@ -212,7 +212,7 @@ void Protocol_processPublication(Publish* publish, char* originator)
int clean_needed = 0;

FUNC_ENTRY;

if (Topics_hasWildcards(publish->topic))
{
Log(LOG_INFO, 12, NULL, publish->topic, originator);
Expand All @@ -222,7 +222,7 @@ void Protocol_processPublication(Publish* publish, char* originator)
if ((strcmp(INTERNAL_CLIENTID, originator) != 0) && bstate->password_file && bstate->acl_file)
{
Clients* client = (Clients*)(TreeFindIndex(bstate->clients, originator, 1)->content);

if (Users_authorise(client->user, publish->topic, ACL_WRITE) == false)
{
Log(LOG_AUDIT, 149, NULL, originator, publish->topic);
Expand Down Expand Up @@ -264,10 +264,10 @@ void Protocol_processPublication(Publish* publish, char* originator)
unsigned int qos = ((Subscriptions*)(current->content))->qos;
int priority = ((Subscriptions*)(current->content))->priority;
char* clientName = ((Subscriptions*)(current->content))->clientName;

if (publish->header.bits.qos < qos) /* reduce qos if > subscribed qos */
qos = publish->header.bits.qos;

if ((curnode = TreeFindIndex(bstate->clients, clientName, 1)) == NULL)
curnode = TreeFind(bstate->disconnected_clients, clientName);
#if defined(MQTTS)
Expand All @@ -281,7 +281,7 @@ void Protocol_processPublication(Publish* publish, char* originator)
int retained = 0;
Messages* saved = NULL;
char* original_topic = publish->topic;

#if !defined(NO_BRIDGE)
if (pubclient->outbound || pubclient->noLocal)
{
Expand Down

0 comments on commit bc8f4b2

Please sign in to comment.