Skip to content

Commit

Permalink
Streaming re-organization (netdata#18941)
Browse files Browse the repository at this point in the history
split streaming into multiple files
  • Loading branch information
ktsaou authored Nov 5, 2024
1 parent 20a280a commit fd9b49b
Show file tree
Hide file tree
Showing 57 changed files with 2,483 additions and 2,415 deletions.
58 changes: 36 additions & 22 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ set(LIBH2O_FILES
set(DAEMON_FILES
src/daemon/buildinfo.c
src/daemon/buildinfo.h
src/daemon/common.c
src/daemon/h2o-common.c
src/daemon/common.h
src/daemon/daemon.c
src/daemon/daemon.h
Expand Down Expand Up @@ -1017,7 +1017,7 @@ set(H2O_FILES
src/web/server/h2o/http_server.h
src/web/server/h2o/h2o_utils.c
src/web/server/h2o/h2o_utils.h
src/web/server/h2o/streaming.c
src/web/server/h2o/rrdpush.c
src/web/server/h2o/streaming.h
src/web/server/h2o/connlist.c
src/web/server/h2o/connlist.h
Expand Down Expand Up @@ -1371,35 +1371,49 @@ set(SYSTEMD_JOURNAL_PLUGIN_FILES
)

set(STREAMING_PLUGIN_FILES
src/streaming/rrdpush.c
src/streaming/rrdpush.h
src/streaming/compression.c
src/streaming/compression.h
src/streaming/compression_brotli.c
src/streaming/compression_brotli.h
src/streaming/compression_gzip.c
src/streaming/compression_gzip.h
src/streaming/compression_lz4.c
src/streaming/compression_lz4.h
src/streaming/compression_zstd.c
src/streaming/compression_zstd.h
src/streaming/stream-compression/compression.c
src/streaming/stream-compression/compression.h
src/streaming/stream-compression/brotli.c
src/streaming/stream-compression/brotli.h
src/streaming/stream-compression/gzip.c
src/streaming/stream-compression/gzip.h
src/streaming/stream-compression/lz4.c
src/streaming/stream-compression/lz4.h
src/streaming/stream-compression/zstd.c
src/streaming/stream-compression/zstd.h
src/streaming/receiver.c
src/streaming/sender.c
src/streaming/replication.c
src/streaming/replication.h
src/streaming/common.h
src/streaming/h2o-common.h
src/streaming/protocol/command-nodeid.c
src/streaming/protocol/commands.c
src/streaming/protocol/commands.h
src/streaming/protocol/command-claimed_id.c
src/streaming/stream_path.c
src/streaming/stream_path.h
src/streaming/stream_capabilities.c
src/streaming/stream_capabilities.h
src/streaming/sender_connect.c
src/streaming/sender_internals.h
src/streaming/sender_execute.c
src/streaming/sender_commit.c
src/streaming/stream-path.c
src/streaming/stream-path.h
src/streaming/stream-capabilities.c
src/streaming/stream-capabilities.h
src/streaming/sender-connect.c
src/streaming/sender-internals.h
src/streaming/sender-execute.c
src/streaming/sender-commit.c
src/streaming/sender-destinations.c
src/streaming/stream-handshake.c
src/streaming/protocol/command-function.c
src/streaming/protocol/command-host-labels.c
src/streaming/protocol/command-chart-definition.c
src/streaming/protocol/command-begin-set-end.c
src/streaming/protocol/command-host-variables.c
src/streaming/stream-conf.c
src/streaming/stream-conf.h
src/streaming/stream-handshake.h
src/streaming/sender.h
src/streaming/sender-destinations.h
src/streaming/rrdhost-status.c
src/streaming/rrdhost-status.h
src/streaming/receiver.h
)

set(WEB_PLUGIN_FILES
Expand Down
4 changes: 2 additions & 2 deletions src/daemon/analytics.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ void analytics_gather_mutable_meta_data(void)
analytics_alarms_notifications();

analytics_set_data(
&analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || configured_as_parent()) ? "true" : "false");
&analytics_data.netdata_config_is_parent, (rrdhost_hosts_available() > 1 || stream_conf_configured_as_parent()) ? "true" : "false");

analytics_set_data(&analytics_data.netdata_host_agent_claimed, is_agent_claimed() ? "true" : "false");

Expand Down Expand Up @@ -619,7 +619,7 @@ void *analytics_main(void *ptr)
*/
void set_late_analytics_variables(struct rrdhost_system_info *system_info)
{
analytics_set_data(&analytics_data.netdata_config_stream_enabled, default_rrdpush_enabled ? "true" : "false");
analytics_set_data(&analytics_data.netdata_config_stream_enabled, stream_conf_send_enabled ? "true" : "false");
analytics_set_data_str(&analytics_data.netdata_config_memory_mode, (char *)rrd_memory_mode_name(default_rrd_memory_mode));
analytics_set_data(&analytics_data.netdata_host_cloud_enabled, "true");

Expand Down
1 change: 1 addition & 0 deletions src/daemon/analytics.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct analytics_data {
bool exporting_enabled;
};

struct rrdhost_system_info;
void set_late_analytics_variables(struct rrdhost_system_info *system_info);
void analytics_free_data(void);
void analytics_log_shell(void);
Expand Down
1 change: 0 additions & 1 deletion src/daemon/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
// streaming metrics between netdata servers
#include "streaming/rrdpush.h"


// anomaly detection
#include "ml/ml.h"

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ int unittest_prepare_rrd(const char **user) {
fprintf(stderr, "rrd_init failed for unittest\n");
return 1;
}
default_rrdpush_enabled = 0;
stream_conf_send_enabled = 0;

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/database/contexts/api_v2_contexts.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ static void rrdcontext_to_json_v2_rrdhost(BUFFER *wb, RRDHOST *host, struct rrdc
// stale - connected but not having live data
// reachable - connected with live data
// pruned - not connected for some time and has been removed
buffer_json_member_add_string(wb, "state", rrdhost_state_cloud_emulation(host) ? "reachable" : "stale");
buffer_json_member_add_string(wb, "state", rrdhost_is_online(host) ? "reachable" : "stale");

rrdhost_health_to_json_v2(wb, "health", &s);
agent_capabilities_to_json(wb, host, "capabilities");
Expand Down
2 changes: 1 addition & 1 deletion src/database/contexts/api_v2_contexts_agents.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now
sending++;

if(host != localhost) {
if (rrdhost_state_cloud_emulation(host))
if (rrdhost_is_online(host))
receiving++;
else
archived++;
Expand Down
14 changes: 7 additions & 7 deletions src/database/engine/dbengine-stresstest.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ static RRDHOST *dbengine_rrdhost_find_or_create(char *name) {
default_rrd_history_entries,
RRD_MEMORY_MODE_DBENGINE,
health_plugin_enabled(),
default_rrdpush_enabled,
default_rrdpush_destination,
default_rrdpush_api_key,
default_rrdpush_send_charts_matching,
default_rrdpush_enable_replication,
default_rrdpush_seconds_to_replicate,
default_rrdpush_replication_step,
stream_conf_send_enabled,
stream_conf_send_destination,
stream_conf_send_api_key,
stream_conf_send_charts_matching,
stream_conf_replication_enabled,
stream_conf_replication_period,
stream_conf_replication_step,
NULL,
0
);
Expand Down
14 changes: 7 additions & 7 deletions src/database/engine/dbengine-unittest.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ static RRDHOST *dbengine_rrdhost_find_or_create(char *name) {
default_rrd_history_entries,
RRD_MEMORY_MODE_DBENGINE,
health_plugin_enabled(),
default_rrdpush_enabled,
default_rrdpush_destination,
default_rrdpush_api_key,
default_rrdpush_send_charts_matching,
default_rrdpush_enable_replication,
default_rrdpush_seconds_to_replicate,
default_rrdpush_replication_step,
stream_conf_send_enabled,
stream_conf_send_destination,
stream_conf_send_api_key,
stream_conf_send_charts_matching,
stream_conf_replication_enabled,
stream_conf_replication_period,
stream_conf_replication_step,
NULL,
0
);
Expand Down
82 changes: 41 additions & 41 deletions src/database/rrd.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,53 @@ struct ml_metrics_statistics {
size_t silenced;
};


// use this for configuration flags, not for state control
// flags are set/unset in a manner that is not thread safe
// and may lead to missing information.
typedef enum __attribute__ ((__packed__)) rrdset_flags {
RRDSET_FLAG_DEBUG = (1 << 2), // enables or disables debugging for a chart
RRDSET_FLAG_OBSOLETE = (1 << 3), // this is marked by the collector/module as obsolete
RRDSET_FLAG_EXPORTING_SEND = (1 << 4), // if set, this chart should be sent to Prometheus web API and external databases
RRDSET_FLAG_EXPORTING_IGNORE = (1 << 5), // if set, this chart should not be sent to Prometheus web API and external databases

RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming)

RRDSET_FLAG_STORE_FIRST = (1 << 8), // if set, do not eliminate the first collection during interpolation
RRDSET_FLAG_HETEROGENEOUS = (1 << 9), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 10), // if set, the chart should be checked to determine if the dimensions are homogeneous
RRDSET_FLAG_HIDDEN = (1 << 11), // if set, do not show this chart on the dashboard, but use it for exporting
RRDSET_FLAG_SYNC_CLOCK = (1 << 12), // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 13), // this is marked by the collector/module when a chart has obsolete dimensions

RRDSET_FLAG_METADATA_UPDATE = (1 << 14), // Mark that metadata needs to be stored
RRDSET_FLAG_ANOMALY_DETECTION = (1 << 15), // flag to identify anomaly detection charts.
RRDSET_FLAG_INDEXED_ID = (1 << 16), // the rrdset is indexed by its id
RRDSET_FLAG_INDEXED_NAME = (1 << 17), // the rrdset is indexed by its name

RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 18),

RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 19), // the sending side has replication in progress
RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 20), // the sending side has completed replication
RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 21), // the receiving side has replication in progress
RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 22), // the receiving side has completed replication

RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 23), // a custom variable has been updated and needs to be exposed to parent

RRDSET_FLAG_COLLECTION_FINISHED = (1 << 24), // when set, data collection is not available for this chart

RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 25), // this chart has at least one rrdcal linked
} RRDSET_FLAGS;

#include "daemon/common.h"
#include "web/api/queries/query.h"
#include "web/api/queries/rrdr.h"
#include "health/rrdvar.h"
#include "health/rrdcalc.h"
#include "rrdlabels.h"
#include "streaming/stream_capabilities.h"
#include "streaming/stream_path.h"
#include "streaming/stream-capabilities.h"
#include "streaming/stream-path.h"
#include "streaming/rrdpush.h"
//#include "aclk/aclk_rrdhost_state.h"
#include "sqlite/sqlite_health.h"
Expand Down Expand Up @@ -664,45 +703,6 @@ STORAGE_ENGINE* storage_engine_find(const char* name);
// ----------------------------------------------------------------------------
// RRDSET - this is a chart

// use this for configuration flags, not for state control
// flags are set/unset in a manner that is not thread safe
// and may lead to missing information.

typedef enum __attribute__ ((__packed__)) rrdset_flags {
RRDSET_FLAG_DEBUG = (1 << 2), // enables or disables debugging for a chart
RRDSET_FLAG_OBSOLETE = (1 << 3), // this is marked by the collector/module as obsolete
RRDSET_FLAG_EXPORTING_SEND = (1 << 4), // if set, this chart should be sent to Prometheus web API and external databases
RRDSET_FLAG_EXPORTING_IGNORE = (1 << 5), // if set, this chart should not be sent to Prometheus web API and external databases

RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming)

RRDSET_FLAG_STORE_FIRST = (1 << 8), // if set, do not eliminate the first collection during interpolation
RRDSET_FLAG_HETEROGENEOUS = (1 << 9), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 10), // if set, the chart should be checked to determine if the dimensions are homogeneous
RRDSET_FLAG_HIDDEN = (1 << 11), // if set, do not show this chart on the dashboard, but use it for exporting
RRDSET_FLAG_SYNC_CLOCK = (1 << 12), // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 13), // this is marked by the collector/module when a chart has obsolete dimensions

RRDSET_FLAG_METADATA_UPDATE = (1 << 14), // Mark that metadata needs to be stored
RRDSET_FLAG_ANOMALY_DETECTION = (1 << 15), // flag to identify anomaly detection charts.
RRDSET_FLAG_INDEXED_ID = (1 << 16), // the rrdset is indexed by its id
RRDSET_FLAG_INDEXED_NAME = (1 << 17), // the rrdset is indexed by its name

RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 18),

RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 19), // the sending side has replication in progress
RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 20), // the sending side has completed replication
RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 21), // the receiving side has replication in progress
RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 22), // the receiving side has completed replication

RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 23), // a custom variable has been updated and needs to be exposed to parent

RRDSET_FLAG_COLLECTION_FINISHED = (1 << 24), // when set, data collection is not available for this chart

RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 25), // this chart has at least one rrdcal linked
} RRDSET_FLAGS;

#define rrdset_flag_get(st) __atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE)
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE) & (flag))
#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_RELEASE)
Expand Down
Loading

0 comments on commit fd9b49b

Please sign in to comment.