Skip to content

Commit

Permalink
stream-thread fix memory corruption (netdata#19367)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktsaou authored Jan 10, 2025
1 parent 1ec8323 commit 32c5b0a
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/streaming/stream-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,16 @@ static int set_pipe_size(int pipe_fd __maybe_unused, int new_size) {

// --------------------------------------------------------------------------------------------------------------------

static void stream_thread_messages_resize_unsafe(struct stream_thread *sth) {
static void stream_thread_messages_resize(struct stream_thread *sth) {
internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ );

if(sth->nodes_count * 2 >= sth->messages.size) {
spinlock_lock(&sth->messages.spinlock);
size_t new_size = MAX(sth->messages.size * 2, sth->nodes_count * 2);
sth->messages.array = reallocz(sth->messages.array, new_size * sizeof(*sth->messages.array));
sth->messages.copy = reallocz(sth->messages.copy, new_size * sizeof(*sth->messages.copy));
sth->messages.size = new_size;
spinlock_unlock(&sth->messages.spinlock);
}
}

Expand Down Expand Up @@ -513,11 +515,11 @@ void *stream_thread(void *ptr) {
if(now_ut - last_dequeue_ut >= 100 * USEC_PER_MS) {
worker_is_busy(WORKER_STREAM_JOB_DEQUEUE);

stream_thread_messages_resize(sth);

// move any pending hosts in the inbound queue, to the running list
spinlock_lock(&sth->queue.spinlock);

stream_thread_messages_resize_unsafe(sth);

stream_thread_process_waiting_list_unsafe(sth, now_ut);
// stream_receiver_move_entire_queue_to_running_unsafe(sth);

Expand Down

0 comments on commit 32c5b0a

Please sign in to comment.