Skip to content

Commit

Permalink
ACLK timeout (netdata#19425)
Browse files Browse the repository at this point in the history
* fix aclk timeout

* added workers to aclk thread
  • Loading branch information
ktsaou authored Jan 17, 2025
1 parent 5559047 commit ccbae3a
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 13 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,7 @@ set(MQTT_WEBSOCKETS_FILES
src/aclk/mqtt_websockets/common_public.c
src/aclk/mqtt_websockets/common_public.h
src/aclk/mqtt_websockets/common_internal.h
src/aclk/mqtt_websockets/aclk_mqtt_workers.h
)

set(ACLK_PROTO_DEFS
Expand Down
37 changes: 37 additions & 0 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "aclk.h"

#include "mqtt_websockets/mqtt_wss_client.h"
#include "mqtt_websockets/aclk_mqtt_workers.h"
#include "aclk_otp.h"
#include "aclk_tx_msgs.h"
#include "aclk_query.h"
Expand Down Expand Up @@ -307,6 +308,7 @@ static int handle_connection(mqtt_wss_client client)
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
worker_is_busy(WORKER_ACLK_DISCONNECTED);
error_report("Connection Error or Dropped");
return 1;
}
Expand All @@ -315,16 +317,20 @@ static int handle_connection(mqtt_wss_client client)
const char *reason;
switch (disconnect_req) {
case ACLK_CLOUD_DISCONNECT:
worker_is_busy(WORKER_ACLK_CMD_DISCONNECT);
reason = "cloud request";
break;
case ACLK_PING_TIMEOUT:
worker_is_busy(WORKER_ACLK_CMD_TIMEOUT);
reason = "ping timeout";
schedule_node_update = true;
break;
case ACLK_RELOAD_CONF:
worker_is_busy(WORKER_ACLK_CMD_RELOAD_CONF);
reason = "reclaim";
break;
default:
worker_is_busy(WORKER_ACLK_CMD_UNKNOWN);
reason = "unknown";
break;
}
Expand Down Expand Up @@ -769,6 +775,31 @@ void *aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = ptr;

worker_register("ACLK");
worker_register_job_name(WORKER_ACLK_WAIT_CLAIMING, "wait claim");
worker_register_job_name(WORKER_ACLK_CONNECT, "connect");
worker_register_job_name(WORKER_ACLK_NODE_UPDATE, "node update");
worker_register_job_name(WORKER_ACLK_HANDLE_CONNECTION, "handle connection");
worker_register_job_name(WORKER_ACLK_DISCONNECTED, "disconnected");
worker_register_job_name(WORKER_ACLK_CMD_DISCONNECT, "cmd disconnect");
worker_register_job_name(WORKER_ACLK_CMD_TIMEOUT, "cmd timeout");
worker_register_job_name(WORKER_ACLK_CMD_RELOAD_CONF, "cmd reload");
worker_register_job_name(WORKER_ACLK_CMD_UNKNOWN, "cmd unknown");
worker_register_job_name(WORKER_ACLK_SENT_PING, "sent ping");
worker_register_job_name(WORKER_ACLK_POLL_ERROR, "poll error");
worker_register_job_name(WORKER_ACLK_POLL_OK, "poll ok");
worker_register_job_name(WORKER_ACLK_RX, "rx");
worker_register_job_name(WORKER_ACLK_RX_ERROR, "rx error");
worker_register_job_name(WORKER_ACLK_PROCESS_RAW, "p-raw");
worker_register_job_name(WORKER_ACLK_PROCESS_HANDSHAKE, "p-handshake");
worker_register_job_name(WORKER_ACLK_PROCESS_ESTABLISHED, "p-established");
worker_register_job_name(WORKER_ACLK_PROCESS_ERROR, "p-error");
worker_register_job_name(WORKER_ACLK_PROCESS_CLOSED_GRACEFULLY, "p-closed");
worker_register_job_name(WORKER_ACLK_PROCESS_UNKNOWN, "p-unknown");
worker_register_job_name(WORKER_ACLK_HANDLE_MQTT_INTERNAL, "mqtt internal");
worker_register_job_name(WORKER_ACLK_TX, "tx");
worker_register_job_name(WORKER_ACLK_TX_ERROR, "tx error");

ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
if (proxy_type == PROXY_TYPE_SOCKS5) {
Expand All @@ -779,6 +810,7 @@ void *aclk_main(void *ptr)

aclk_init_rx_msg_handlers();

worker_is_busy(WORKER_ACLK_WAIT_CLAIMING);
if (wait_till_agent_claim_ready())
goto exit;

Expand Down Expand Up @@ -809,21 +841,26 @@ void *aclk_main(void *ptr)
netdata_log_info("Starting ACLK query event loop");
aclk_query_init(mqttwss_client);
do {
worker_is_busy(WORKER_ACLK_CONNECT);
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;

if (schedule_node_update) {
worker_is_busy(WORKER_ACLK_NODE_UPDATE);
schedule_node_state_update(localhost, 0);
schedule_node_update = false;
}

worker_is_busy(WORKER_ACLK_HANDLE_CONNECTION);
if (handle_connection(mqttwss_client)) {
worker_is_busy(WORKER_ACLK_DISCONNECTED);
last_disconnect_time = now_realtime_sec();
aclk_set_disconnected();
nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
}
} while (service_running(SERVICE_ACLK));

worker_is_busy(WORKER_ACLK_DISCONNECTED);
aclk_graceful_disconnect(mqttwss_client);

#ifdef MQTT_WSS_DEBUG
Expand Down
30 changes: 30 additions & 0 deletions src/aclk/mqtt_websockets/aclk_mqtt_workers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_ACLK_MQTT_WORKERS_H
#define NETDATA_ACLK_MQTT_WORKERS_H

#define WORKER_ACLK_WAIT_CLAIMING 0
#define WORKER_ACLK_CONNECT 1
#define WORKER_ACLK_NODE_UPDATE 2
#define WORKER_ACLK_HANDLE_CONNECTION 3
#define WORKER_ACLK_DISCONNECTED 4
#define WORKER_ACLK_CMD_DISCONNECT 5
#define WORKER_ACLK_CMD_TIMEOUT 6
#define WORKER_ACLK_CMD_RELOAD_CONF 7
#define WORKER_ACLK_CMD_UNKNOWN 8
#define WORKER_ACLK_SENT_PING 9
#define WORKER_ACLK_POLL_ERROR 10
#define WORKER_ACLK_POLL_OK 11
#define WORKER_ACLK_RX 12
#define WORKER_ACLK_RX_ERROR 13
#define WORKER_ACLK_PROCESS_RAW 14
#define WORKER_ACLK_PROCESS_HANDSHAKE 15
#define WORKER_ACLK_PROCESS_ESTABLISHED 16
#define WORKER_ACLK_PROCESS_ERROR 17
#define WORKER_ACLK_PROCESS_CLOSED_GRACEFULLY 18
#define WORKER_ACLK_PROCESS_UNKNOWN 19
#define WORKER_ACLK_HANDLE_MQTT_INTERNAL 20
#define WORKER_ACLK_TX 21
#define WORKER_ACLK_TX_ERROR 22

#endif //NETDATA_ACLK_MQTT_WORKERS_H
48 changes: 37 additions & 11 deletions src/aclk/mqtt_websockets/mqtt_wss_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#endif

#include "libnetdata/libnetdata.h"
#include "aclk_mqtt_workers.h"
#include "mqtt_wss_client.h"
#include "mqtt_ng.h"
#include "ws_client.h"
Expand Down Expand Up @@ -616,7 +617,7 @@ int mqtt_wss_connect(
client->poll_fds[POLLFD_SOCKET].events = POLLIN;
// wait till MQTT connection is established
while (!client->mqtt_connected) {
if(mqtt_wss_service(client, -1)) {
if(mqtt_wss_service(client, 60 * MSEC_PER_SEC)) {
nd_log(NDLS_DAEMON, NDLP_ERR, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port);
return 2;
}
Expand Down Expand Up @@ -661,13 +662,13 @@ static const char *mqtt_wss_error_tos(int ec)

static int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
{
uint64_t exit_by = boottime_usec() + (timeout_ms * NSEC_PER_MSEC);
uint64_t exit_by_us = boottime_usec() + (timeout_ms * NSEC_PER_MSEC);
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT
while (rbuf_bytes_available(client->ws_client->buf_write)) {
const uint64_t now = boottime_usec();
if (now >= exit_by)
const uint64_t now_us = boottime_usec();
if (now_us >= exit_by_us)
return MWS_TIMED_OUT;
if (mqtt_wss_service(client, exit_by - now))
if (mqtt_wss_service(client, (exit_by_us - now_us) / USEC_PER_SEC))
return MWS_ERROR;
}
return MWS_OK;
Expand Down Expand Up @@ -753,9 +754,23 @@ static int handle_mqtt_internal(mqtt_wss_client client)

static int t_till_next_keepalive_ms(mqtt_wss_client client)
{
time_t last_send = mqtt_ng_last_send_time(client->mqtt);
time_t next_mqtt_keep_alive = last_send + client->mqtt_keepalive * 0.75;
return ((next_mqtt_keep_alive - now_realtime_sec()) * MSEC_PER_SEC);
time_t last_send_ts = mqtt_ng_last_send_time(client->mqtt);
time_t next_mqtt_keep_alive_ts = last_send_ts + client->mqtt_keepalive * 0.75;

time_t now_ts = now_realtime_sec();

if(now_ts >= next_mqtt_keep_alive_ts)
return 0;

int timeout_ms = (int)((next_mqtt_keep_alive_ts - now_ts) * MSEC_PER_SEC);

if(timeout_ms < 1)
timeout_ms = 1;

if(timeout_ms > (int)(45 * MSEC_PER_SEC))
timeout_ms = (int)(45 * MSEC_PER_SEC);

return timeout_ms;
}

#ifdef MQTT_WSS_CPUSTATS
Expand Down Expand Up @@ -784,8 +799,6 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
// Check user requested TO doesn't interfere with MQTT keep alives
if (!ping_timeout) {
int till_next_keep_alive = t_till_next_keepalive_ms(client);
if (till_next_keep_alive < 0)
till_next_keep_alive = 0;
if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
timeout_ms = till_next_keep_alive;
send_keepalive = 1;
Expand All @@ -797,14 +810,18 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
client->stats.time_keepalive += t2 - t1;
#endif

worker_is_idle();
if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
worker_is_busy(WORKER_ACLK_POLL_ERROR);

if (errno == EINTR) {
nd_log(NDLS_DAEMON, NDLP_WARNING, "poll interrupted by EINTR");
return 0;
}
nd_log(NDLS_DAEMON, NDLP_ERR, "poll error \"%s\"", strerror(errno));
return -2;
}
worker_is_busy(WORKER_ACLK_POLL_OK);

#ifdef MQTT_WSS_CPUSTATS
t1 = mqtt_wss_now_usec();
Expand All @@ -817,6 +834,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
// MQTT keep alives
mqtt_ng_ping(client->mqtt);
ping_timeout = now + PING_TIMEOUT;
worker_is_busy(WORKER_ACLK_SENT_PING);
} else {
if (ping_timeout && ping_timeout < now) {
disconnect_req = ACLK_PING_TIMEOUT;
Expand All @@ -836,6 +854,8 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
client->poll_fds[POLLFD_SOCKET].events = 0;

if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
worker_is_busy(WORKER_ACLK_RX);

if((ret = SSL_read(client->ssl, ptr, size)) > 0) {
spinlock_lock(&client->stat_lock);
client->stats.bytes_rx += ret;
Expand All @@ -847,6 +867,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
set_socket_pollfds(client, ret);
if (ret != SSL_ERROR_WANT_READ &&
ret != SSL_ERROR_WANT_WRITE) {
worker_is_busy(WORKER_ACLK_RX_ERROR);
nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret));
if (ret == SSL_ERROR_SYSCALL)
nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
Expand Down Expand Up @@ -882,9 +903,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#endif

// process MQTT stuff
if(client->ws_client->state == WS_ESTABLISHED)
if(client->ws_client->state == WS_ESTABLISHED) {
worker_is_busy(WORKER_ACLK_HANDLE_MQTT_INTERNAL);
if (handle_mqtt_internal(client))
return MQTT_WSS_ERR_PROTO_MQTT;
}

if (client->mqtt_didnt_finish_write) {
client->mqtt_didnt_finish_write = 0;
Expand All @@ -897,6 +920,8 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#endif

if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
worker_is_busy(WORKER_ACLK_TX);

if ((ret = SSL_write(client->ssl, ptr, size)) > 0) {
spinlock_lock(&client->stat_lock);
client->stats.bytes_tx += ret;
Expand All @@ -908,6 +933,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
set_socket_pollfds(client, ret);
if (ret != SSL_ERROR_WANT_READ &&
ret != SSL_ERROR_WANT_WRITE) {
worker_is_busy(WORKER_ACLK_TX_ERROR);
nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret));
if (ret == SSL_ERROR_SYSCALL)
nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
Expand Down
3 changes: 1 addition & 2 deletions src/aclk/mqtt_websockets/mqtt_wss_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include "common_public.h"


#define MQTT_WSS_OK 0 // All OK call me at your earliest convinience
#define MQTT_WSS_OK_TO 1 // All OK, poll timeout you requested when calling mqtt_wss_service expired
//you might want to know if timeout
Expand Down Expand Up @@ -58,7 +57,7 @@ int mqtt_wss_connect(
int ssl_flags,
const struct mqtt_wss_proxy *proxy,
bool *fallback_ipv4);
int mqtt_wss_service(mqtt_wss_client client, int timeout_ms);
int mqtt_wss_service(mqtt_wss_client client, int t_ms);
void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms);

// we redefine this instead of using MQTT-C flags as in future
Expand Down
7 changes: 7 additions & 0 deletions src/aclk/mqtt_websockets/ws_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "ws_client.h"
#include "common_internal.h"
#include "aclk_mqtt_workers.h"

const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
"Host: %s\x0D\x0A"
Expand Down Expand Up @@ -623,10 +624,12 @@ int ws_client_process(ws_client *client)
int ret;
switch(client->state) {
case WS_RAW:
worker_is_busy(WORKER_ACLK_PROCESS_RAW);
if (ws_client_start_handshake(client))
return WS_CLIENT_INTERNAL_ERROR;
return WS_CLIENT_NEED_MORE_BYTES;
case WS_HANDSHAKE:
worker_is_busy(WORKER_ACLK_PROCESS_HANDSHAKE);
do {
ret = ws_client_parse_handshake_resp(client);
if (ret == WS_CLIENT_PROTOCOL_ERROR)
Expand All @@ -636,6 +639,7 @@ int ws_client_process(ws_client *client)
} while (!ret);
break;
case WS_ESTABLISHED:
worker_is_busy(WORKER_ACLK_PROCESS_ESTABLISHED);
do {
ret = ws_client_process_rx_ws(client);
switch(ret) {
Expand All @@ -655,12 +659,15 @@ int ws_client_process(ws_client *client)
} while (!ret || ret == WS_CLIENT_PARSING_DONE);
break;
case WS_ERROR:
worker_is_busy(WORKER_ACLK_PROCESS_ERROR);
nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!");
return WS_CLIENT_PROTOCOL_ERROR;
case WS_CONN_CLOSED_GRACEFUL:
worker_is_busy(WORKER_ACLK_PROCESS_CLOSED_GRACEFULLY);
nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
return WS_CLIENT_CONNECTION_CLOSED;
default:
worker_is_busy(WORKER_ACLK_PROCESS_UNKNOWN);
nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption.");
return WS_CLIENT_INTERNAL_ERROR;
}
Expand Down
1 change: 1 addition & 0 deletions src/daemon/pulse/pulse-workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ static struct worker_utilization all_workers_utilization[] = {
{ .name = "DBENGINE", .family = "workers dbengine instances", .priority = 1000000 },
{ .name = "LIBUV", .family = "workers libuv threadpool", .priority = 1000000 },
{ .name = "WEB", .family = "workers web server", .priority = 1000000 },
{ .name = "ACLK", .family = "workers aclk", .priority = 1000000 },
{ .name = "ACLKSYNC", .family = "workers aclk sync", .priority = 1000000 },
{ .name = "METASYNC", .family = "workers metadata sync", .priority = 1000000 },
{ .name = "PLUGINSD", .family = "workers plugins.d", .priority = 1000000 },
Expand Down

0 comments on commit ccbae3a

Please sign in to comment.