Skip to content

Commit

Permalink
related to #389
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco Baldinetti committed Feb 3, 2022
1 parent 90a0beb commit 3716a91
Showing 1 changed file with 107 additions and 87 deletions.
194 changes: 107 additions & 87 deletions platformio/stima_v3/stima/src/stima.ino
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ void loop() {
}
#endif

#if (USE_MQTT && (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH))
if (is_event_mqtt_yield && is_mqtt_connected) {
is_event_mqtt_yield = false;
mqtt_client.yield(1);
wdt_reset();
}
#endif

if (is_mqtt_connected) {
mqtt_client.yield(1);
wdt_reset();
}

if (is_event_time) {
time_task();
wdt_reset();
Expand Down Expand Up @@ -310,6 +323,10 @@ void init_tasks() {
is_event_mqtt = false;
is_event_mqtt_paused = false;
mqtt_state = MQTT_INIT;
is_mqtt_subscribed = false;
is_mqtt_connected = false;
mqtt_error_count = 0;
setted_ptr_time_data = 0;
mqtt_session_present=false;
#endif

Expand Down Expand Up @@ -656,6 +673,7 @@ bool mqttConnect(char *username, char *password) {
MQTTPacket_connectData options = MQTTPacket_connectData_initializer;
MQTT::connackData data ;
options.MQTTVersion = 4; // Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
// options.keepAliveInterval = 60;
options.will.topicName.cstring = maint_topic;
options.will.message.cstring = (char *) MQTT_ON_ERROR_MESSAGE;
options.will.retained = true;
Expand Down Expand Up @@ -1386,19 +1404,10 @@ void interrupt_task_1s () {
#endif
interrupts();
}

if (is_time_set && now() >= next_ptr_time_for_testing_sensors && next_ptr_time_for_testing_sensors) {
setNextTime((time_t *) &next_ptr_time_for_testing_sensors, SENSORS_TESTING_DELAY_S);
noInterrupts();
if (!is_event_sensors_reading) {
is_test = !is_first_test;
is_event_sensors_reading = true;
ready_tasks_count++;
}
interrupts();
}
#endif

i2c_error++;

noInterrupts();
if (!is_event_rtc) {
is_event_rtc = true;
Expand Down Expand Up @@ -1575,10 +1584,9 @@ void supervisor_task() {
if (is_supervisor_first_run && is_time_set) {
if (readable_configuration.report_seconds) {
setNextTime((time_t *) &next_ptr_time_for_sensors_reading, readable_configuration.report_seconds);

// testing sensors
setNextTime((time_t *) &next_ptr_time_for_testing_sensors, SENSORS_TESTING_DELAY_S);

setNextTime((time_t *) &next_ptr_time_for_mqtt_tx, readable_configuration.transmit_seconds);
setNextTime((time_t *) &next_ptr_time_for_mqtt_yield, MQTT_YIELD_S);
LOGN(F("Acquisition scheduling..."));
LOGN(F("--> observations every %d minutes"), OBSERVATIONS_MINUTES);
}
Expand Down Expand Up @@ -1613,13 +1621,10 @@ void supervisor_task() {
if (next_ptr_time_for_testing_sensors) {
LOGN(F("--> testing at: %d:%d:%d"), hour(next_ptr_time_for_testing_sensors), minute(next_ptr_time_for_testing_sensors), second(next_ptr_time_for_testing_sensors));
}
}
else if (is_supervisor_first_run && !is_time_set) {
if (is_rtc_first_time_get) {
LOGN(F("--> There is only RTC time. Doing reboot and retry"));

if (next_ptr_time_for_mqtt_tx) {
LOGN(F("--> sending at: %d:%d:%d"), hour(next_ptr_time_for_mqtt_tx), minute(next_ptr_time_for_mqtt_tx), second(next_ptr_time_for_mqtt_tx));
}
// it's not possible start station without date and time
have_to_reboot = true;
}
#endif
#if (USE_LCD)
Expand Down Expand Up @@ -1732,10 +1737,22 @@ void supervisor_task() {

void rtc_task() {
if (is_time_set) {
noInterrupts();
is_event_rtc = false;
ready_tasks_count--;
interrupts();
if (now() >= next_ptr_time_for_testing_sensors && next_ptr_time_for_testing_sensors) {
setNextTime((time_t *) &next_ptr_time_for_testing_sensors, SENSORS_TESTING_DELAY_S);

noInterrupts();
if (!is_event_sensors_reading) {
is_test = !is_first_test;
is_event_sensors_reading = true;
ready_tasks_count++;
}
interrupts();
}

if (now() >= next_ptr_time_for_mqtt_yield && next_ptr_time_for_mqtt_yield) {
setNextTime((time_t *) &next_ptr_time_for_mqtt_yield, MQTT_YIELD_S);
is_event_mqtt_yield = true;
}

#if (MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_GSM || MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_GSM)
if (is_time_for_sensors_reading_updated) {
Expand All @@ -1752,6 +1769,11 @@ void rtc_task() {
#endif
}
#endif

noInterrupts();
is_event_rtc = false;
ready_tasks_count--;
interrupts();
}
}

Expand Down Expand Up @@ -1920,7 +1942,6 @@ void time_task() {

if (is_set_rtc_ok) {
LOGN(F("RTC set... [ %s ]"), OK_STRING);
i2c_error = 0;
retry = 0;
time_state = TIME_SET_PROVIDER;
LOGV(F("TIME_SET_RTC --> TIME_SET_PROVIDER"));
Expand Down Expand Up @@ -2867,7 +2888,13 @@ void data_saving_task() {
#endif

noInterrupts();
if (!is_event_supervisor) {
if (!is_event_supervisor && (now() >= next_ptr_time_for_mqtt_tx) && next_ptr_time_for_mqtt_tx) {
setNextTime((time_t *) &next_ptr_time_for_mqtt_tx, readable_configuration.transmit_seconds);
LOGN(F("Next MQTT TX scheduled at: %d:%d:%d\r\n"), hour(next_ptr_time_for_mqtt_tx), minute(next_ptr_time_for_mqtt_tx), second(next_ptr_time_for_mqtt_tx));
is_event_supervisor = true;
ready_tasks_count++;
}
else if (!is_event_supervisor && is_event_mqtt_paused) {
is_event_supervisor = true;
ready_tasks_count++;
}
Expand Down Expand Up @@ -3051,15 +3078,16 @@ void mqtt_task() {
read_bytes_count = mqtt_ptr_file.read(&ptr_time_data, sizeof(time_t));

// found
if (read_bytes_count == sizeof(time_t) && ptr_time_data < now()) {
if ((read_bytes_count == sizeof(time_t)) && (ptr_time_data < now())) {
LOGN(F("Data pointer... [ %d/%d/%d %d:%d:%d ] [ TEST ]"), day(ptr_time_data), month(ptr_time_data), year(ptr_time_data), hour(ptr_time_data), minute(ptr_time_data), second(ptr_time_data));
is_ptr_found = true;
mqtt_state = MQTT_PTR_FOUND;
LOGV(F("MQTT_PTR_READ ---> MQTT_PTR_FOUND"));
}
// not found (no sdcard error): find it by starting from 1th January of this year
else if (read_bytes_count >= 0) {
LOGN(F("Data pointer... [ FIND ]"));
datetime.Year = CalendarYrToTm(year(now()));
datetime.Year = CalendarYrToTm(year());
datetime.Month = 1;
datetime.Day = 1;
datetime.Hour = 0;
Expand Down Expand Up @@ -3329,6 +3357,8 @@ void mqtt_task() {
else if (is_mqtt_processing_json) {
mqtt_state = MQTT_RPC_DELAY;
LOGV(F("MQTT_SENSORS_LOOP ---> MQTT_RPC_DELAY"));
//mqtt_state = MQTT_ON_DISCONNECT;
//SERIAL_TRACE(F("MQTT_SENSORS_LOOP ---> MQTT_ON_DISCONNECT\r\n"));
}
break;

Expand Down Expand Up @@ -3456,82 +3486,65 @@ void mqtt_task() {
mqtt_state = MQTT_CHECK; // fallback
LOGV(F("MQTT_OPEN_DATA_FILE ---> MQTT_CHECK"));
}
break;
break;

case MQTT_CLOSE_DATA_FILE:
if (is_mqtt_processing_sdcard) {
is_sdcard_error = !read_data_file.close();
mqtt_state = MQTT_RPC_DELAY;
LOGV(F("MQTT_CLOSE_DATA_FILE ---> MQTT_RPC_DELAY"));
//mqtt_state = MQTT_ON_DISCONNECT;
//SERIAL_TRACE(F("MQTT_CLOSE_DATA_FILE ---> MQTT_ON_DISCONNECT\r\n"));
}
break;
break;

case MQTT_RPC_DELAY:
//delay_ms = readable_configuration.report_seconds*500UL;
delay_ms = 300000UL;
start_time_ms = millis();
state_after_wait = MQTT_ON_DISCONNECT;
mqtt_state = MQTT_WAIT_STATE_RPC;

//delay_ms = readable_configuration.report_seconds*500UL;
delay_ms = 300000UL;
start_time_ms = millis();
state_after_wait = MQTT_ON_DISCONNECT;
mqtt_state = MQTT_WAIT_STATE_RPC;

LOGV(F("MQTT_WAIT_STATE_RPC: %lu"),delay_ms);
LOGV(F("MQTT_RPC_DELAY ---> MQTT_WAIT_STATE_RPC"));

break;
LOGV(F("MQTT_WAIT_STATE_RPC: %lu"), delay_ms);
LOGV(F("MQTT_RPC_DELAY ---> MQTT_WAIT_STATE_RPC"));
break;

case MQTT_ON_DISCONNECT:
#if (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH)
if (is_mqtt_error) {
#endif

getFullTopic(full_topic_buffer, readable_configuration.mqtt_maint_topic, MQTT_STATUS_TOPIC);
snprintf(&message_buffer[0][0], MQTT_MESSAGE_LENGTH, MQTT_ON_DISCONNECT_MESSAGE);

if (mqttPublish(full_topic_buffer, &message_buffer[0][0]), true) {
retry = 0;
mqtt_state = MQTT_DISCONNECT;
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_DISCONNECT"));
}
// retry
else if ((++retry) < MQTT_RETRY_COUNT_MAX) {
delay_ms = MQTT_DELAY_MS;
start_time_ms = millis();
state_after_wait = MQTT_ON_DISCONNECT;
mqtt_state = MQTT_WAIT_STATE;
LOGE(F("MQTT on disconnect publish message... [ retry ]"));
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_WAIT_STATE"));
}
// fail
else {
LOGE(F("MQTT on disconnect publish message... [ %s ]"), FAIL_STRING);
retry = 0;
is_mqtt_error = true;
mqtt_state = MQTT_DISCONNECT;
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_DISCONNECT"));
}
getFullTopic(full_topic_buffer, readable_configuration.mqtt_maint_topic, MQTT_STATUS_TOPIC);
snprintf(&message_buffer[0][0], MQTT_MESSAGE_LENGTH, MQTT_ON_DISCONNECT_MESSAGE);

#if (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH)
if (mqttPublish(full_topic_buffer, &message_buffer[0][0]), true) {
retry = 0;
mqtt_state = MQTT_DISCONNECT;
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_DISCONNECT"));
}
// retry
else if ((++retry) < MQTT_RETRY_COUNT_MAX) {
delay_ms = MQTT_DELAY_MS;
start_time_ms = millis();
state_after_wait = MQTT_ON_DISCONNECT;
mqtt_state = MQTT_WAIT_STATE;
LOGE(F("MQTT on disconnect publish message... [ retry ]"));
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_WAIT_STATE"));
}
// fail
else {
LOGE(F("MQTT on disconnect publish message... [ %s ]"), FAIL_STRING);
retry = 0;
is_mqtt_error = true;
mqtt_state = MQTT_DISCONNECT;
LOGV(F("MQTT_ON_DISCONNECT ---> MQTT_DISCONNECT"));
}
#endif
break;

case MQTT_DISCONNECT:
#if (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH)
if (is_mqtt_error) {
#endif

mqtt_client.disconnect();
mqtt_client.setMessageHandler(comtopic, NULL); // remove handler setted
mqtt_client.disconnect();
mqtt_client.setMessageHandler(comtopic, NULL); // remove handler setted
ipstack.disconnect();
is_mqtt_connected = false;
LOGT(F("MQTT Disconnect... [ %s ]"), OK_STRING);

#if (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_ETH || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_ETH)
}
#elif (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_GSM || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_GSM)
#if (MODULE_TYPE == STIMA_MODULE_TYPE_SAMPLE_GSM || MODULE_TYPE == STIMA_MODULE_TYPE_REPORT_GSM)
// resume GSM task for closing connection
noInterrupts();
if (!is_event_gsm) {
Expand Down Expand Up @@ -3583,22 +3596,23 @@ void mqtt_task() {

case MQTT_CLOSE_PTR_FILE:
mqtt_ptr_file.close();
//mqtt_state = MQTT_CLOSE_SDCARD;
mqtt_state = MQTT_END;
LOGV(F("MQTT_CLOSE_PTR_FILE ---> MQTT_END"));
break;
mqtt_state = MQTT_CLOSE_SDCARD;
LOGV(F("MQTT_CLOSE_PTR_FILE ---> MQTT_CLOSE_SDCARD"));
break;

/*
case MQTT_CLOSE_SDCARD:
#if (!ENABLE_SDCARD_LOGGING)
is_sdcard_error = false;
is_sdcard_open = false;
#endif

mqtt_state = MQTT_END;
LOGV(F("MQTT_CLOSE_SDCARD ---> MQTT_END"));
break;
*/
break;

case MQTT_END:
if (is_mqtt_published_data) {
mqtt_error_count = 0;
LOGN(F("[ %d ] data published through mqtt... [ %s ]"), mqtt_data_count, is_mqtt_error ? ERROR_STRING : OK_STRING);
#if (USE_LCD)
// is_lcd_error |= lcd.setCursor(0, 3);
Expand All @@ -3608,6 +3622,12 @@ void mqtt_task() {
// is_lcd_error |= lcd.print(is_mqtt_error ? ERROR_STRING : OK_STRING)==0;
#endif
}
//else {
// if ((++mqtt_error_count) >= MQTT_ERROR_COUNT_FOR_REBOOT) {
// mqtt_error_count = 0;
// hard_reboot();
// }
//}

noInterrupts();
is_event_mqtt_paused = false;
Expand Down

0 comments on commit 3716a91

Please sign in to comment.