Skip to content

Commit

Permalink
Better fix for processing long running messages
Browse files Browse the repository at this point in the history
  • Loading branch information
silviucpp committed Jul 6, 2021
1 parent bb95f53 commit 499dfa6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 48 deletions.
86 changes: 40 additions & 46 deletions c_src/queuecallbacksdispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <limits>

namespace {

const uint64_t kMaxUInt64 = std::numeric_limits<uint64_t>::max();

void consumers_event_callback(rd_kafka_t *rk, void *qev_opaque)
{
static_cast<QueueCallbacksDispatcher*>(qev_opaque)->signal(rk);
Expand All @@ -17,8 +20,7 @@ namespace {
}

QueueCallbacksDispatcher::QueueCallbacksDispatcher():
running_(true),
poll_timeout_us_(-1)
running_(true)
{
thread_callbacks_ = std::thread(&QueueCallbacksDispatcher::process_callbacks, this);
}
Expand All @@ -39,26 +41,18 @@ void QueueCallbacksDispatcher::watch(rd_kafka_t* instance, bool is_consumer)
const rd_kafka_conf_t* conf = rd_kafka_conf(instance);
rd_kafka_conf_get(conf, "max.poll.interval.ms", buffer, &buffer_size);
ASSERT(buffer_size > 0);
uint64_t max_poll_interval_ms = static_cast<uint64_t>(std::stoull(buffer)/2);

{
CritScope ss(&crt_);
objects_[instance] = item(is_consumer, max_poll_interval_ms, now_ms());

// poll timeout is the minimum value of all instances

for(auto& it: objects_)
{
if(it.second.max_poll_interval_ms < max_poll_interval_ms)
max_poll_interval_ms = it.second.max_poll_interval_ms;
}

poll_timeout_us_ = static_cast<int64_t>(max_poll_interval_ms*1000);
objects_[instance] = item(is_consumer, static_cast<uint64_t>(std::stoull(buffer)/2), now_ms());
}

rd_kafka_queue_t* queue = is_consumer ? rd_kafka_queue_get_consumer(instance): rd_kafka_queue_get_main(instance);
rd_kafka_queue_cb_event_enable(queue, consumers_event_callback, this);
rd_kafka_queue_destroy(queue);

// force rebuilding next poll time
events_.enqueue(nullptr);
}

bool QueueCallbacksDispatcher::remove(rd_kafka_t* instance)
Expand All @@ -75,30 +69,14 @@ bool QueueCallbacksDispatcher::remove(rd_kafka_t* instance)

is_consumer = it->second.is_consumer;
objects_.erase(it);

// recalculate the minimum polling time.

if(objects_.empty() == false)
{
uint64_t max_poll_interval_ms = std::numeric_limits<int64_t>::max();

for(auto& it: objects_)
{
if(it.second.max_poll_interval_ms < max_poll_interval_ms)
max_poll_interval_ms = it.second.max_poll_interval_ms;
}

poll_timeout_us_ = static_cast<int64_t>(max_poll_interval_ms*1000);
}
else
{
poll_timeout_us_ = -1;
}
}

rd_kafka_queue_t* queue = is_consumer ? rd_kafka_queue_get_consumer(instance): rd_kafka_queue_get_main(instance);
rd_kafka_queue_cb_event_enable(queue, NULL, nullptr);
rd_kafka_queue_destroy(queue);

// force rebuilding next poll time
events_.enqueue(nullptr);
return true;
}

Expand All @@ -111,45 +89,61 @@ void QueueCallbacksDispatcher::process_callbacks()
{
rd_kafka_t* obj = nullptr;

int64_t poll_timeout = -1;

while (running_)
{
if(events_.wait_dequeue_timed(obj, poll_timeout_us_) == false)
if(events_.wait_dequeue_timed(obj, poll_timeout) == false)
{
uint64_t now = now_ms();
CritScope ss(&crt_);
check_max_poll_interval_ms(now);
poll_timeout = check_max_poll_interval_ms(now);
continue;
}

if(obj == nullptr)
continue;

uint64_t now = now_ms();

CritScope ss(&crt_);
if(obj != nullptr)
{
CritScope ss(&crt_);

auto it = objects_.find(obj);
auto it = objects_.find(obj);

if(it != objects_.end())
if(it != objects_.end())
{
it->second.last_poll_ms = now;
do_poll(obj, it->second.is_consumer);
}

poll_timeout = check_max_poll_interval_ms(now);
}
else
{
it->second.last_poll_ms = now;
do_poll(obj, it->second.is_consumer);
CritScope ss(&crt_);
poll_timeout = check_max_poll_interval_ms(now);
}

check_max_poll_interval_ms(now);
}
}

void QueueCallbacksDispatcher::check_max_poll_interval_ms(uint64_t now)
int64_t QueueCallbacksDispatcher::check_max_poll_interval_ms(uint64_t now)
{
if(objects_.empty())
return -1;

uint64_t next_schedule = kMaxUInt64;

for(auto& it: objects_)
{
if(now - it.second.last_poll_ms >= it.second.max_poll_interval_ms)
{
it.second.last_poll_ms = now;
do_poll(it.first, it.second.is_consumer);
}

next_schedule = std::min(next_schedule, it.second.max_poll_interval_ms+(now - it.second.last_poll_ms));
}

return static_cast<int64_t>(next_schedule*1000);
}

void QueueCallbacksDispatcher::do_poll(rd_kafka_t* obj, bool is_consumer)
Expand Down
3 changes: 1 addition & 2 deletions c_src/queuecallbacksdispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class QueueCallbacksDispatcher

private:

void check_max_poll_interval_ms(uint64_t now);
int64_t check_max_poll_interval_ms(uint64_t now);
void do_poll(rd_kafka_t* obj, bool is_consumer);

struct item {
Expand All @@ -41,7 +41,6 @@ class QueueCallbacksDispatcher
std::thread thread_callbacks_;

bool running_;
int64_t poll_timeout_us_;
moodycamel::BlockingConcurrentQueue<rd_kafka_t*> events_;
std::unordered_map<rd_kafka_t*, item> objects_;

Expand Down

0 comments on commit 499dfa6

Please sign in to comment.