Skip to content

Commit

Permalink
Alert prototypes: use r/w spinlock instead of spinlock (netdata#19410)
Browse files Browse the repository at this point in the history
use r/w spinlock instead of spinlock
  • Loading branch information
ktsaou authored Jan 15, 2025
1 parent 2d803da commit 6f1655a
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 21 deletions.
7 changes: 5 additions & 2 deletions src/database/sqlite/sqlite_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ SQLITE_API int sqlite3_exec_monitored(
void *data, /* 1st argument to callback */
char **errmsg /* Error msg written here */
) {
internal_fatal(!nd_thread_runs_sql(), "THIS THREAD CANNOT RUN SQL");

int rc = sqlite3_exec(db, sql, callback, data, errmsg);
pulse_sqlite3_query_completed(rc == SQLITE_OK, rc == SQLITE_BUSY, rc == SQLITE_LOCKED);
return rc;
}

SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt) {
internal_fatal(!nd_thread_runs_sql(), "THIS THREAD CANNOT RUN SQL");

int rc;
int cnt = 0;

Expand Down Expand Up @@ -66,8 +70,7 @@ static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database, int r
return false;
}

int execute_insert(sqlite3_stmt *res)
{
int execute_insert(sqlite3_stmt *res) {
int rc;
rc = sqlite3_step_monitored(res);
if (rc == SQLITE_CORRUPT) {
Expand Down
4 changes: 2 additions & 2 deletions src/health/health_dyncfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,11 @@ static int dyncfg_health_prototype_job_action(BUFFER *result, DYNCFG_CMDS cmd, B
code = dyncfg_default_response(result, HTTP_RESP_OK, "already enabled");
else {
size_t matches_enabled = 0;
spinlock_lock(&ap->_internal.spinlock);
rw_spinlock_write_lock(&ap->_internal.rw_spinlock);
for(RRD_ALERT_PROTOTYPE *t = ap; t ;t = t->_internal.next)
if(t->match.enabled)
matches_enabled++;
spinlock_unlock(&ap->_internal.spinlock);
rw_spinlock_write_unlock(&ap->_internal.rw_spinlock);

if(!matches_enabled) {
code = dyncfg_default_response(result, HTTP_RESP_BAD_REQUEST, "all rules in this alert are disabled, so enabling the alert has no effect");
Expand Down
3 changes: 1 addition & 2 deletions src/health/health_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ typedef struct rrd_alert_prototype {
struct rrd_alert_config config;

struct {
uint32_t uses;
bool enabled;
bool is_on_disk;
SPINLOCK spinlock;
RW_SPINLOCK rw_spinlock;
struct rrd_alert_prototype *prev, *next;
} _internal;
} RRD_ALERT_PROTOTYPE;
Expand Down
28 changes: 13 additions & 15 deletions src/health/health_prototypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ static void health_prototype_cleanup_one_unsafe(RRD_ALERT_PROTOTYPE *ap) {
}

void health_prototype_cleanup(RRD_ALERT_PROTOTYPE *ap) {
spinlock_lock(&ap->_internal.spinlock);
rw_spinlock_write_lock(&ap->_internal.rw_spinlock);

while(ap->_internal.next) {
RRD_ALERT_PROTOTYPE *t = ap->_internal.next;
Expand All @@ -215,7 +215,7 @@ void health_prototype_cleanup(RRD_ALERT_PROTOTYPE *ap) {
freez(t);
}

spinlock_unlock(&ap->_internal.spinlock);
rw_spinlock_write_unlock(&ap->_internal.rw_spinlock);

health_prototype_cleanup_one_unsafe(ap);
}
Expand All @@ -228,7 +228,7 @@ void health_prototype_free(RRD_ALERT_PROTOTYPE *ap) {

void health_prototype_insert_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
RRD_ALERT_PROTOTYPE *ap = value;
spinlock_init(&ap->_internal.spinlock);
rw_spinlock_init(&ap->_internal.rw_spinlock);
if(ap->config.source_type != DYNCFG_SOURCE_TYPE_DYNCFG)
ap->_internal.is_on_disk = true;
}
Expand All @@ -253,24 +253,23 @@ bool health_prototype_conflict_cb(const DICTIONARY_ITEM *item __maybe_unused, vo
nap = callocz(1, sizeof(*nap));
memcpy(nap, new_value, sizeof(*nap));

spinlock_lock(&ap->_internal.spinlock);
rw_spinlock_write_lock(&ap->_internal.rw_spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ap->_internal.next, nap, _internal.prev, _internal.next);
spinlock_unlock(&ap->_internal.spinlock);
rw_spinlock_write_unlock(&ap->_internal.rw_spinlock);

if(nap->_internal.enabled)
ap->_internal.enabled = true;
}
}
else {
// alerts with the same name replace the existing one
spinlock_init(&nap->_internal.spinlock);
nap->_internal.uses = ap->_internal.uses;
rw_spinlock_init(&nap->_internal.rw_spinlock);

spinlock_lock(&nap->_internal.spinlock);
spinlock_lock(&ap->_internal.spinlock);
rw_spinlock_write_lock(&nap->_internal.rw_spinlock);
rw_spinlock_write_lock(&ap->_internal.rw_spinlock);
SWAP(*ap, *nap);
spinlock_unlock(&ap->_internal.spinlock);
spinlock_unlock(&nap->_internal.spinlock);
rw_spinlock_write_unlock(&ap->_internal.rw_spinlock);
rw_spinlock_write_unlock(&nap->_internal.rw_spinlock);

health_prototype_cleanup(nap);
memset(nap, 0, sizeof(*nap));
Expand Down Expand Up @@ -598,7 +597,7 @@ static void health_prototype_apply_to_rrdset(RRDSET *st, RRD_ALERT_PROTOTYPE *ap
if(!ap->_internal.enabled)
return;

spinlock_lock(&ap->_internal.spinlock);
rw_spinlock_read_lock(&ap->_internal.rw_spinlock);
for(size_t template = 0; template < 2; template++) {
bool want_template = template ? true : false;

Expand All @@ -617,11 +616,10 @@ static void health_prototype_apply_to_rrdset(RRDSET *st, RRD_ALERT_PROTOTYPE *ap
if (!prototype_matches_rrdset(st, t))
continue;

if (rrdcalc_add_from_prototype(st->rrdhost, st, t))
ap->_internal.uses++;
rrdcalc_add_from_prototype(st->rrdhost, st, t);
}
}
spinlock_unlock(&ap->_internal.spinlock);
rw_spinlock_read_unlock(&ap->_internal.rw_spinlock);
}

void health_prototype_alerts_for_rrdset_incrementally(RRDSET *st) {
Expand Down
10 changes: 10 additions & 0 deletions src/libnetdata/threads/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ static struct {

static __thread ND_THREAD *_nd_thread_info = NULL;
static __thread char _nd_thread_os_name[ND_THREAD_TAG_MAX + 1] = "";
static __thread bool _nd_thread_can_run_sql = true;

void nd_thread_can_run_sql(bool run_sql) {
_nd_thread_can_run_sql = run_sql;
}

bool nd_thread_runs_sql(void) {
return _nd_thread_can_run_sql;
}


// --------------------------------------------------------------------------------------------------------------------
// O/S abstraction
Expand Down
3 changes: 3 additions & 0 deletions src/libnetdata/threads/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,7 @@ void nd_thread_rwspinlock_write_unlocked(void);
#define nd_thread_rwspinlock_write_unlocked() debug_dummy()
#endif

void nd_thread_can_run_sql(bool exclude);
bool nd_thread_runs_sql(void);

#endif //NETDATA_THREADS_H
2 changes: 2 additions & 0 deletions src/streaming/stream-connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,8 @@ static void *stream_connector_thread(void *ptr) {
struct connector *sc = ptr;
sc->tid = gettid_cached();

nd_thread_can_run_sql(false);

worker_register("STREAMCNT");
worker_register_job_name(WORKER_SENDER_CONNECTOR_JOB_CONNECTING, "connect");
worker_register_job_name(WORKER_SENDER_CONNECTOR_JOB_CONNECTED, "connected");
Expand Down
2 changes: 2 additions & 0 deletions src/streaming/stream-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_r
void *stream_thread(void *ptr) {
struct stream_thread *sth = ptr;

nd_thread_can_run_sql(false);

worker_register("STREAM");

// stream thread main event loop
Expand Down

0 comments on commit 6f1655a

Please sign in to comment.