diff --git a/c_src/queuecallbacksdispatcher.cc b/c_src/queuecallbacksdispatcher.cc index 540ffed..69c8957 100644 --- a/c_src/queuecallbacksdispatcher.cc +++ b/c_src/queuecallbacksdispatcher.cc @@ -5,6 +5,9 @@ #include namespace { + + const uint64_t kMaxUInt64 = std::numeric_limits::max(); + void consumers_event_callback(rd_kafka_t *rk, void *qev_opaque) { static_cast(qev_opaque)->signal(rk); @@ -17,8 +20,7 @@ namespace { } QueueCallbacksDispatcher::QueueCallbacksDispatcher(): - running_(true), - poll_timeout_us_(-1) + running_(true) { thread_callbacks_ = std::thread(&QueueCallbacksDispatcher::process_callbacks, this); } @@ -39,26 +41,18 @@ void QueueCallbacksDispatcher::watch(rd_kafka_t* instance, bool is_consumer) const rd_kafka_conf_t* conf = rd_kafka_conf(instance); rd_kafka_conf_get(conf, "max.poll.interval.ms", buffer, &buffer_size); ASSERT(buffer_size > 0); - uint64_t max_poll_interval_ms = static_cast(std::stoull(buffer)/2); { CritScope ss(&crt_); - objects_[instance] = item(is_consumer, max_poll_interval_ms, now_ms()); - - // poll timeout is the minimum value of all instances - - for(auto& it: objects_) - { - if(it.second.max_poll_interval_ms < max_poll_interval_ms) - max_poll_interval_ms = it.second.max_poll_interval_ms; - } - - poll_timeout_us_ = static_cast(max_poll_interval_ms*1000); + objects_[instance] = item(is_consumer, static_cast(std::stoull(buffer)/2), now_ms()); } rd_kafka_queue_t* queue = is_consumer ? rd_kafka_queue_get_consumer(instance): rd_kafka_queue_get_main(instance); rd_kafka_queue_cb_event_enable(queue, consumers_event_callback, this); rd_kafka_queue_destroy(queue); + + // force rebuilding next poll time + events_.enqueue(nullptr); } bool QueueCallbacksDispatcher::remove(rd_kafka_t* instance) @@ -75,30 +69,14 @@ bool QueueCallbacksDispatcher::remove(rd_kafka_t* instance) is_consumer = it->second.is_consumer; objects_.erase(it); - - // recalculate the minimum polling time. - - if(objects_.empty() == false) - { - uint64_t max_poll_interval_ms = std::numeric_limits::max(); - - for(auto& it: objects_) - { - if(it.second.max_poll_interval_ms < max_poll_interval_ms) - max_poll_interval_ms = it.second.max_poll_interval_ms; - } - - poll_timeout_us_ = static_cast(max_poll_interval_ms*1000); - } - else - { - poll_timeout_us_ = -1; - } } rd_kafka_queue_t* queue = is_consumer ? rd_kafka_queue_get_consumer(instance): rd_kafka_queue_get_main(instance); rd_kafka_queue_cb_event_enable(queue, NULL, nullptr); rd_kafka_queue_destroy(queue); + + // force rebuilding next poll time + events_.enqueue(nullptr); return true; } @@ -111,37 +89,49 @@ void QueueCallbacksDispatcher::process_callbacks() { rd_kafka_t* obj = nullptr; + int64_t poll_timeout = -1; + while (running_) { - if(events_.wait_dequeue_timed(obj, poll_timeout_us_) == false) + if(events_.wait_dequeue_timed(obj, poll_timeout) == false) { uint64_t now = now_ms(); CritScope ss(&crt_); - check_max_poll_interval_ms(now); + poll_timeout = check_max_poll_interval_ms(now); continue; } - if(obj == nullptr) - continue; - uint64_t now = now_ms(); - CritScope ss(&crt_); + if(obj != nullptr) + { + CritScope ss(&crt_); - auto it = objects_.find(obj); + auto it = objects_.find(obj); - if(it != objects_.end()) + if(it != objects_.end()) + { + it->second.last_poll_ms = now; + do_poll(obj, it->second.is_consumer); + } + + poll_timeout = check_max_poll_interval_ms(now); + } + else { - it->second.last_poll_ms = now; - do_poll(obj, it->second.is_consumer); + CritScope ss(&crt_); + poll_timeout = check_max_poll_interval_ms(now); } - - check_max_poll_interval_ms(now); } } -void QueueCallbacksDispatcher::check_max_poll_interval_ms(uint64_t now) +int64_t QueueCallbacksDispatcher::check_max_poll_interval_ms(uint64_t now) { + if(objects_.empty()) + return -1; + + uint64_t next_schedule = kMaxUInt64; + for(auto& it: objects_) { if(now - it.second.last_poll_ms >= it.second.max_poll_interval_ms) @@ -149,7 +139,11 @@ void QueueCallbacksDispatcher::check_max_poll_interval_ms(uint64_t now) it.second.last_poll_ms = now; do_poll(it.first, it.second.is_consumer); } + + next_schedule = std::min(next_schedule, it.second.max_poll_interval_ms+(now - it.second.last_poll_ms)); } + + return static_cast(next_schedule*1000); } void QueueCallbacksDispatcher::do_poll(rd_kafka_t* obj, bool is_consumer) diff --git a/c_src/queuecallbacksdispatcher.h b/c_src/queuecallbacksdispatcher.h index 6e06330..20e0004 100644 --- a/c_src/queuecallbacksdispatcher.h +++ b/c_src/queuecallbacksdispatcher.h @@ -23,7 +23,7 @@ class QueueCallbacksDispatcher private: - void check_max_poll_interval_ms(uint64_t now); + int64_t check_max_poll_interval_ms(uint64_t now); void do_poll(rd_kafka_t* obj, bool is_consumer); struct item { @@ -41,7 +41,6 @@ class QueueCallbacksDispatcher std::thread thread_callbacks_; bool running_; - int64_t poll_timeout_us_; moodycamel::BlockingConcurrentQueue events_; std::unordered_map objects_;