Skip to content

Commit

Permalink
Contexts Loading (netdata#19336)
Browse files Browse the repository at this point in the history
* do not load instances and dimensions for unknown contexts - instead of creating contexts and adding instances and dimensions

* node updates to NC

* unify all liveness calls

* prevent wanted cache size from getting negative
  • Loading branch information
ktsaou authored Jan 7, 2025
1 parent 2c222b2 commit 84ddf0d
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 67 deletions.
35 changes: 29 additions & 6 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,29 @@ void *aclk_main(void *ptr)
return NULL;
}

bool aclk_host_state_update_auto(RRDHOST *host) {
int live;
switch(rrdhost_ingestion_status(host)) {
case RRDHOST_INGEST_STATUS_ARCHIVED:
case RRDHOST_INGEST_STATUS_INITIALIZING:
case RRDHOST_INGEST_STATUS_OFFLINE:
live = 0;
break;

case RRDHOST_INGEST_STATUS_REPLICATING:
// receiving replication
// no need to send this to NC
return false;

case RRDHOST_INGEST_STATUS_ONLINE:
// currently collecting data
live = 1;
break;
}
aclk_host_state_update(host, live, 1);
return true;
}

void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
{
ND_UUID node_id;
Expand All @@ -858,7 +881,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)

node_instance_creation_t node_instance_creation = {
.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL,
.hops = rrdhost_system_info_hops(host->system_info),
.hops = rrdhost_ingestion_hops(host),
.hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid};

Expand All @@ -869,7 +892,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Registering host=%s, hops=%d", host->machine_guid,
rrdhost_system_info_hops(host->system_info));
rrdhost_ingestion_hops(host));

aclk_execute_query(create_query);
return;
Expand All @@ -878,7 +901,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)

aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = rrdhost_system_info_hops(host->system_info),
.hops = rrdhost_ingestion_hops(host),
.live = cmd,
.queryable = queryable,
.session_id = aclk_session_newarch
Expand All @@ -895,7 +918,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%d, queryable=%d",
(char*)node_state_update.node_id, cmd,
rrdhost_system_info_hops(host->system_info), queryable);
rrdhost_ingestion_hops(host), queryable);

freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
Expand Down Expand Up @@ -1067,7 +1090,7 @@ char *aclk_state(void)
}

buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s",
rrdhost_system_info_hops(host->system_info),
rrdhost_ingestion_hops(host),
host == localhost ? "self" : "child");

if (host != localhost)
Expand Down Expand Up @@ -1202,7 +1225,7 @@ char *aclk_state_json(void)
json_object_object_add(nodeinstance, "node-id", tmp);
}

tmp = json_object_new_int(rrdhost_system_info_hops(host->system_info));
tmp = json_object_new_int(rrdhost_ingestion_hops(host));
json_object_object_add(nodeinstance, "streaming-hops", tmp);

tmp = json_object_new_string(host == localhost ? "self" : "child");
Expand Down
2 changes: 2 additions & 0 deletions src/aclk/aclk.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ extern struct aclk_shared_state {
} aclk_shared_state;

void aclk_host_state_update(RRDHOST *host, int cmd, int queryable);
bool aclk_host_state_update_auto(RRDHOST *host);

void aclk_send_node_instances(void);

void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
Expand Down
10 changes: 3 additions & 7 deletions src/aclk/aclk_rx_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,10 @@ int create_node_instance_result(const char *msg, size_t msg_len)

RRDHOST *host = rrdhost_find_by_guid(res.machine_guid);
if (likely(host)) {
if (host == localhost) {
node_state_update.live = 1;
node_state_update.hops = 0;
} else {
node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN));
node_state_update.hops = rrdhost_system_info_hops(host->system_info);
}
node_state_update.live = rrdhost_is_local(host) ? 1 : 0;
node_state_update.hops = rrdhost_ingestion_hops(host);
node_state_update.capabilities = aclk_get_node_instance_capas(host);
schedule_node_state_update(host, 5000);
}

CLAIM_ID claim_id = claim_id_get();
Expand Down
23 changes: 9 additions & 14 deletions src/database/contexts/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static void rrdinstance_load_dimension_callback(SQL_DIMENSION_DATA *sd, void *da
RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item(host->rrdctx.contexts, sd->context);
if(!rca) {
nd_log(NDLS_DAEMON, NDLP_ERR,
"RRDCONTEXT: context '%s' is not found in host '%s'",
"RRDCONTEXT: context '%s' is not found in host '%s' - not loading dimensions",
sd->context, rrdhost_hostname(host));
return;
}
Expand All @@ -43,7 +43,7 @@ static void rrdinstance_load_dimension_callback(SQL_DIMENSION_DATA *sd, void *da
if(!ria) {
rrdcontext_release(rca);
nd_log(NDLS_DAEMON, NDLP_ERR,
"RRDCONTEXT: instance '%s' of context '%s' is not found in host '%s'",
"RRDCONTEXT: instance '%s' of context '%s' is not found in host '%s' - not loading dimensions",
sd->chart_id, sd->context, rrdhost_hostname(host));
return;
}
Expand All @@ -67,18 +67,13 @@ static void rrdinstance_load_dimension_callback(SQL_DIMENSION_DATA *sd, void *da
static void rrdinstance_load_instance_callback(SQL_CHART_DATA *sc, void *data) {
RRDHOST *host = data;

RRDCONTEXT tc = {
.id = string_strdupz(sc->context),
.title = string_strdupz(sc->title),
.units = string_strdupz(sc->units),
.family = string_strdupz(sc->family),
.priority = sc->priority,
.chart_type = sc->chart_type,
.flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
.rrdhost = host,
};

RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_set_and_acquire_item(host->rrdctx.contexts, string2str(tc.id), &tc, sizeof(tc));
RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item(host->rrdctx.contexts, sc->context);
if(!rca) {
nd_log(NDLS_DAEMON, NDLP_ERR,
"RRDCONTEXT: context '%s' is not found in host '%s' - not loadings instances",
sc->context, rrdhost_hostname(host));
return;
}
RRDCONTEXT *rc = rrdcontext_acquired_value(rca);

RRDINSTANCE tri = {
Expand Down
7 changes: 5 additions & 2 deletions src/database/engine/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
wanted_cache_size = referenced_size + dirty;

// if we don't have enough clean pages, there is no reason to be aggressive or critical
if(wanted_cache_size < current_cache_size - clean)
if(wanted_cache_size < (current_cache_size - clean) && current_cache_size > clean)
wanted_cache_size = current_cache_size - clean;

if(cache->config.out_of_memory_protection_bytes) {
Expand All @@ -415,7 +415,10 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
const uint64_t min_available = cache->config.out_of_memory_protection_bytes;
if (sm.ram_available_bytes < min_available) {
// we must shrink
wanted_cache_size = current_cache_size - (min_available - sm.ram_available_bytes);
if(current_cache_size > (min_available - sm.ram_available_bytes))
wanted_cache_size = current_cache_size - (min_available - sm.ram_available_bytes);
else
wanted_cache_size = hot + dirty;
}
else if(cache->config.use_all_ram) {
// we can grow
Expand Down
8 changes: 6 additions & 2 deletions src/database/rrd.h
Original file line number Diff line number Diff line change
Expand Up @@ -1263,9 +1263,13 @@ extern RRDHOST *localhost;
#define rrdhost_sender_replicating_charts_minus_one(host) (__atomic_sub_fetch(&((host)->stream.snd.status.replication.charts), 1, __ATOMIC_RELAXED))
#define rrdhost_sender_replicating_charts_zero(host) (__atomic_store_n(&((host)->stream.snd.status.replication.charts), 0, __ATOMIC_RELAXED))

#define rrdhost_is_online(host) ( \
#define rrdhost_is_local(host) ( \
(host) == localhost || \
rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST) || \
rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST) \
)

#define rrdhost_is_online(host) ( \
rrdhost_is_local(host) || \
(rrdhost_flag_check(host, RRDHOST_FLAG_COLLECTOR_ONLINE) && !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)) \
)

Expand Down
2 changes: 1 addition & 1 deletion src/database/rrdfunctions.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ int rrd_functions_find_by_name(RRDHOST *host, BUFFER *wb, const char *name, size
rrd_collector_running(rdcf->collector) ? "yes" : "no",
host->stream.rcv.status.tid, host->stream.snd.status.tid,
state_id, rdcf->rrdhost_state_id,
rrdhost_system_info_hops(host->system_info)
rrdhost_ingestion_hops(host)
);

dictionary_acquired_item_release(host->functions, *item);
Expand Down
16 changes: 5 additions & 11 deletions src/database/sqlite/sqlite_aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,8 @@ static void node_update_timer_cb(uv_timer_t *handle)
struct aclk_sync_cfg_t *ahc = handle->data;
RRDHOST *host = ahc->host;

rrdhost_receiver_lock(host);
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
rrdhost_receiver_unlock(host);
nd_log(NDLS_ACLK, NDLP_DEBUG,"Timer: Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
aclk_host_state_update(host, live, 1);
if(aclk_host_state_update_auto(host))
uv_timer_stop(&ahc->timer);
}

static void close_callback(uv_handle_t *handle, void *data __maybe_unused)
Expand Down Expand Up @@ -426,19 +423,16 @@ static void aclk_synchronization(void *arg)
uv_timer_stop(&ahc->timer);

ahc->timer.data = ahc;
int rc = uv_timer_start(&ahc->timer, node_update_timer_cb, schedule_time, 0);
int rc = uv_timer_start(&ahc->timer, node_update_timer_cb, schedule_time, 5000);
if (!rc)
break; // Timer started, exit
}
}

// This is fallback if timer fails
rrdhost_receiver_lock(host);
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
rrdhost_receiver_unlock(host);
aclk_host_state_update(host, live, 1);
nd_log(NDLS_ACLK, NDLP_DEBUG,"Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
aclk_host_state_update_auto(host);
break;

case ACLK_DATABASE_NODE_UNREGISTER:
sql_unregister_node(cmd.param[0]);
break;
Expand Down
6 changes: 1 addition & 5 deletions src/database/sqlite/sqlite_aclk_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,23 @@ void aclk_check_node_info_and_collectors(void)
if (!wc->node_info_send_time && !wc->node_collectors_send)
continue;

bool replicating = false;

if (unlikely(rrdhost_receiver_replicating_charts(host))) {
internal_error(true, "ACLK SYNC: Host %s is still replicating in", rrdhost_hostname(host));
replicating_rcv++;
replicating_rcv_host = host->hostname;
replicating = true;
}

if (unlikely(rrdhost_sender_replicating_charts(host))) {
internal_error(true, "ACLK SYNC: Host %s is still replicating out", rrdhost_hostname(host));
replicating_snd++;
replicating_snd_host = host->hostname;
replicating = true;
}

#ifdef REPLICATION_TRACKING
replication_tracking_counters(host, &replay_counters);
#endif

if(replicating)
if(replicating_rcv)
continue;

bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue));
Expand Down
8 changes: 3 additions & 5 deletions src/database/sqlite/sqlite_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,8 @@ struct node_instance_list *get_node_list(void)

uuid_copy(node_list[row].host_id, *host_id);
node_list[row].queryable = 1;
node_list[row].live =
(host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
node_list[row].hops = host->system_info ? rrdhost_system_info_hops(host->system_info) :
uuid_eq(*host_id, localhost->host_id.uuid) ? 0 : 1;
node_list[row].live = rrdhost_ingestion_status(host) == RRDHOST_INGEST_STATUS_ONLINE ? 1 : 0;
node_list[row].hops = rrdhost_ingestion_hops(host);
node_list[row].hostname =
sqlite3_column_bytes(res, 2) ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
}
Expand Down Expand Up @@ -938,7 +936,7 @@ static int store_host_metadata(RRDHOST *host)
SQLITE_BIND_FAIL(bind_fail, bind_text_null(res, ++param, rrdhost_os(host), 1));
SQLITE_BIND_FAIL(bind_fail, bind_text_null(res, ++param, rrdhost_timezone(host), 1));
SQLITE_BIND_FAIL(bind_fail, bind_text_null(res, ++param, "", 1));
SQLITE_BIND_FAIL(bind_fail, sqlite3_bind_int(res, ++param, host->system_info ? rrdhost_system_info_hops(host->system_info) : 0));
SQLITE_BIND_FAIL(bind_fail, sqlite3_bind_int(res, ++param, rrdhost_ingestion_hops(host)));
SQLITE_BIND_FAIL(bind_fail, sqlite3_bind_int(res, ++param, host->rrd_memory_mode));
SQLITE_BIND_FAIL(bind_fail, bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1));
SQLITE_BIND_FAIL(bind_fail, sqlite3_bind_int(res, ++param, host->utc_offset));
Expand Down
26 changes: 19 additions & 7 deletions src/streaming/rrdhost-status.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ static NETDATA_DOUBLE rrdhost_sender_replication_completion_unsafe(RRDHOST *host
return completion;
}

RRDHOST_INGEST_STATUS rrdhost_ingestion_status(RRDHOST *host) {
RRDHOST_STATUS status;
rrdhost_status(host, now_realtime_sec(), &status);
return status.ingest.status;
}

int16_t rrdhost_ingestion_hops(RRDHOST *host) {
if(rrdhost_is_local(host)) return 0;
if(!host->system_info) return 1;
return rrdhost_system_info_hops(host->system_info);
}

void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
memset(s, 0, sizeof(*s));

Expand Down Expand Up @@ -130,7 +142,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
s->ingest.reason = (online) ? STREAM_HANDSHAKE_NEVER : host->stream.rcv.status.exit_reason;

rrdhost_receiver_lock(host);
s->ingest.hops = (int16_t)(host->system_info ? rrdhost_system_info_hops(host->system_info) : (host == localhost) ? 0 : 1);
s->ingest.hops = rrdhost_ingestion_hops(host);
bool has_receiver = false;
if (host->receiver && rrdhost_flag_check(host, RRDHOST_FLAG_COLLECTOR_ONLINE)) {
has_receiver = true;
Expand All @@ -144,16 +156,20 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
}
rrdhost_receiver_unlock(host);

s->ingest.collected.metrics = __atomic_load_n(&host->collected.metrics_count, __ATOMIC_RELAXED);
s->ingest.collected.instances = __atomic_load_n(&host->collected.instances_count, __ATOMIC_RELAXED);
s->ingest.collected.contexts = __atomic_load_n(&host->collected.contexts_count, __ATOMIC_RELAXED);

if (online) {
if(s->db.status == RRDHOST_DB_STATUS_INITIALIZING)
s->ingest.status = RRDHOST_INGEST_STATUS_INITIALIZING;

else if (host == localhost || rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST)) {
else if (rrdhost_is_local(host)) {
s->ingest.status = RRDHOST_INGEST_STATUS_ONLINE;
s->ingest.since = netdata_start_time;
}

else if (s->ingest.replication.in_progress)
else if (s->ingest.replication.in_progress || !s->ingest.collected.metrics)
s->ingest.status = RRDHOST_INGEST_STATUS_REPLICATING;

else
Expand All @@ -169,10 +185,6 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
s->ingest.status = RRDHOST_INGEST_STATUS_OFFLINE;
}

s->ingest.collected.metrics = __atomic_load_n(&host->collected.metrics_count, __ATOMIC_RELAXED);
s->ingest.collected.instances = __atomic_load_n(&host->collected.instances_count, __ATOMIC_RELAXED);
s->ingest.collected.contexts = __atomic_load_n(&host->collected.contexts_count, __ATOMIC_RELAXED);

if(host == localhost)
s->ingest.type = RRDHOST_INGEST_TYPE_LOCALHOST;
else if(has_receiver)
Expand Down
12 changes: 7 additions & 5 deletions src/streaming/rrdhost-status.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ typedef enum __attribute__((packed)) {
} RRDHOST_DB_LIVENESS;

typedef enum __attribute__((packed)) {
RRDHOST_INGEST_STATUS_ARCHIVED = 0,
RRDHOST_INGEST_STATUS_INITIALIZING,
RRDHOST_INGEST_STATUS_REPLICATING,
RRDHOST_INGEST_STATUS_ONLINE,
RRDHOST_INGEST_STATUS_OFFLINE,
RRDHOST_INGEST_STATUS_ARCHIVED = 0, // an old host in the database (never connected during this session)
RRDHOST_INGEST_STATUS_INITIALIZING, // contexts are still loading
RRDHOST_INGEST_STATUS_REPLICATING, // receiving replication
RRDHOST_INGEST_STATUS_ONLINE, // currently collecting data
RRDHOST_INGEST_STATUS_OFFLINE, // a disconnected node
} RRDHOST_INGEST_STATUS;

typedef enum __attribute__((packed)) {
Expand Down Expand Up @@ -166,5 +166,7 @@ typedef struct rrdhost_status_t {
} RRDHOST_STATUS;

void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
RRDHOST_INGEST_STATUS rrdhost_ingestion_status(RRDHOST *host);
int16_t rrdhost_ingestion_hops(RRDHOST *host);

#endif //NETDATA_RRDHOST_STATUS_H
2 changes: 1 addition & 1 deletion src/streaming/stream-connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ bool stream_connect(struct sender_state *s, uint16_t default_port, time_t timeou
// make sure the socket is closed
nd_sock_close(&s->sock);

s->hops = (int16_t)(rrdhost_system_info_hops(host->system_info) + 1);
s->hops = (int16_t)(rrdhost_ingestion_hops(s->host) + 1);

// reset this to make sure we have its current value
s->sock.verify_certificate = netdata_ssl_validate_certificate_sender;
Expand Down
2 changes: 1 addition & 1 deletion src/web/api/v1/api_v1_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static inline void web_client_api_request_v1_info_mirrored_hosts_status(BUFFER *
buffer_json_add_array_item_object(wb);

buffer_json_member_add_string(wb, "hostname", rrdhost_hostname(host));
buffer_json_member_add_uint64(wb, "hops", host->system_info ? rrdhost_system_info_hops(host->system_info) : (host == localhost) ? 0 : 1);
buffer_json_member_add_int64(wb, "hops", rrdhost_ingestion_hops(host));
buffer_json_member_add_boolean(wb, "reachable", (host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)));

buffer_json_member_add_string(wb, "guid", host->machine_guid);
Expand Down

0 comments on commit 84ddf0d

Please sign in to comment.