diff --git a/core/common/timer/Timer.cpp b/core/common/timer/Timer.cpp index ed6af6ba14..a7316943a5 100644 --- a/core/common/timer/Timer.cpp +++ b/core/common/timer/Timer.cpp @@ -66,12 +66,15 @@ void Timer::Run() { mCV.wait_for(threadLock, timeout); break; } else { + auto e = std::move(const_cast&>(mQueue.top())); + mQueue.pop(); + queueLock.unlock(); if (!e->IsValid()) { LOG_INFO(sLogger, ("invalid timer event", "task is cancelled")); } else { e->Execute(); } - mQueue.pop(); + queueLock.lock(); } } } diff --git a/core/common/timer/Timer.h b/core/common/timer/Timer.h index 1d6adf2c60..6825e8443d 100644 --- a/core/common/timer/Timer.h +++ b/core/common/timer/Timer.h @@ -52,6 +52,7 @@ class Timer { #ifdef APSARA_UNIT_TEST_MAIN friend class TimerUnittest; + friend class ScrapeSchedulerUnittest; #endif }; diff --git a/core/plugin/input/InputPrometheus.h b/core/plugin/input/InputPrometheus.h index e0ef5a8b10..9ae51128b6 100644 --- a/core/plugin/input/InputPrometheus.h +++ b/core/plugin/input/InputPrometheus.h @@ -17,7 +17,7 @@ class InputPrometheus : public Input { bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; bool Start() override; bool Stop(bool isPipelineRemoving) override; - bool SupportAck() const override { return false; } + bool SupportAck() const override { return true; } private: bool CreateInnerProcessors(const Json::Value& inputConfig); diff --git a/core/prometheus/async/PromFuture.cpp b/core/prometheus/async/PromFuture.cpp index ca7339d319..46185a004a 100644 --- a/core/prometheus/async/PromFuture.cpp +++ b/core/prometheus/async/PromFuture.cpp @@ -1,28 +1,38 @@ #include "prometheus/async/PromFuture.h" #include "common/Lock.h" +#include "common/http/HttpResponse.h" namespace logtail { -void PromFuture::Process(const HttpResponse& response, uint64_t timestampMilliSec) { +template +bool PromFuture::Process(Args... args) { WriteLock lock(mStateRWLock); if (mState == PromFutureState::New) { for (auto& callback : mDoneCallbacks) { - callback(response, timestampMilliSec); + if (!callback(std::forward(args)...)) { + mState = PromFutureState::Done; + return false; + } } mState = PromFutureState::Done; - } else { - return; } + + return true; } -void PromFuture::AddDoneCallback(std::function&& callback) { +template +void PromFuture::AddDoneCallback(CallbackSignature&& callback) { mDoneCallbacks.emplace_back(std::move(callback)); } -void PromFuture::Cancel() { +template +void PromFuture::Cancel() { WriteLock lock(mStateRWLock); mState = PromFutureState::Done; } +template class PromFuture; +template class PromFuture<>; + } // namespace logtail \ No newline at end of file diff --git a/core/prometheus/async/PromFuture.h b/core/prometheus/async/PromFuture.h index 63de4b59d1..f9b5a14846 100644 --- a/core/prometheus/async/PromFuture.h +++ b/core/prometheus/async/PromFuture.h @@ -1,18 +1,21 @@ #pragma once +#include + #include "common/Lock.h" -#include "common/http/HttpResponse.h" namespace logtail { enum class PromFutureState { New, Processing, Done }; +template class PromFuture { public: + using CallbackSignature = std::function; // Process should support oneshot and streaming mode. - void Process(const HttpResponse&, uint64_t timestampMilliSec); + bool Process(Args...); - void AddDoneCallback(std::function&& callback); + void AddDoneCallback(CallbackSignature&&); void Cancel(); @@ -20,7 +23,7 @@ class PromFuture { PromFutureState mState = {PromFutureState::New}; ReadWriteLock mStateRWLock; - std::vector> mDoneCallbacks; + std::vector mDoneCallbacks; #ifdef APSARA_UNIT_TEST_MAIN friend class ScrapeSchedulerUnittest; diff --git a/core/prometheus/async/PromHttpRequest.cpp b/core/prometheus/async/PromHttpRequest.cpp index 9cfe68452e..151ebfabe8 100644 --- a/core/prometheus/async/PromHttpRequest.cpp +++ b/core/prometheus/async/PromHttpRequest.cpp @@ -18,17 +18,24 @@ PromHttpRequest::PromHttpRequest(const std::string& method, const std::string& body, uint32_t timeout, uint32_t maxTryCnt, - std::shared_ptr future) + std::shared_ptr> future, + std::shared_ptr> isContextValidFuture) : AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, timeout, maxTryCnt), - mFuture(std::move(future)) { + mFuture(std::move(future)), + mIsContextValidFuture(std::move(isContextValidFuture)) { } void PromHttpRequest::OnSendDone(const HttpResponse& response) { - mFuture->Process(response, - std::chrono::duration_cast(mLastSendTime.time_since_epoch()).count()); + if (mFuture != nullptr) { + mFuture->Process( + response, std::chrono::duration_cast(mLastSendTime.time_since_epoch()).count()); + } } [[nodiscard]] bool PromHttpRequest::IsContextValid() const { + if (mIsContextValidFuture != nullptr) { + return mIsContextValidFuture->Process(); + } return true; } diff --git a/core/prometheus/async/PromHttpRequest.h b/core/prometheus/async/PromHttpRequest.h index b7b8115774..8c546e5f0a 100644 --- a/core/prometheus/async/PromHttpRequest.h +++ b/core/prometheus/async/PromHttpRequest.h @@ -20,7 +20,8 @@ class PromHttpRequest : public AsynHttpRequest { const std::string& body, uint32_t timeout, uint32_t maxTryCnt, - std::shared_ptr future); + std::shared_ptr> future, + std::shared_ptr> isContextValidFuture = nullptr); PromHttpRequest(const PromHttpRequest&) = default; ~PromHttpRequest() override = default; @@ -30,7 +31,8 @@ class PromHttpRequest : public AsynHttpRequest { private: void SetNextExecTime(std::chrono::steady_clock::time_point execTime); - std::shared_ptr mFuture; + std::shared_ptr> mFuture; + std::shared_ptr> mIsContextValidFuture; }; } // namespace logtail \ No newline at end of file diff --git a/core/prometheus/schedulers/BaseScheduler.cpp b/core/prometheus/schedulers/BaseScheduler.cpp index c18d542d37..db7de4ae79 100644 --- a/core/prometheus/schedulers/BaseScheduler.cpp +++ b/core/prometheus/schedulers/BaseScheduler.cpp @@ -3,14 +3,20 @@ namespace logtail { void BaseScheduler::ExecDone() { mExecCount++; + mLatestExecTime = mFirstExecTime + std::chrono::seconds(mExecCount * mInterval); } std::chrono::steady_clock::time_point BaseScheduler::GetNextExecTime() { - return mFirstExecTime + std::chrono::seconds(mExecCount * mInterval); + return mLatestExecTime; } void BaseScheduler::SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime) { mFirstExecTime = firstExecTime; + mLatestExecTime = mFirstExecTime; +} + +void BaseScheduler::DelayExecTime(uint64_t delaySeconds) { + mLatestExecTime = mLatestExecTime + std::chrono::seconds(delaySeconds); } void BaseScheduler::Cancel() { diff --git a/core/prometheus/schedulers/BaseScheduler.h b/core/prometheus/schedulers/BaseScheduler.h index 2a83bb29a1..4203d8d730 100644 --- a/core/prometheus/schedulers/BaseScheduler.h +++ b/core/prometheus/schedulers/BaseScheduler.h @@ -3,6 +3,7 @@ #include +#include "common/http/HttpResponse.h" #include "prometheus/async/PromFuture.h" namespace logtail { @@ -18,6 +19,7 @@ class BaseScheduler { std::chrono::steady_clock::time_point GetNextExecTime(); void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime); + void DelayExecTime(uint64_t delaySeconds); virtual void Cancel(); @@ -25,11 +27,13 @@ class BaseScheduler { bool IsCancelled(); std::chrono::steady_clock::time_point mFirstExecTime; + std::chrono::steady_clock::time_point mLatestExecTime; int64_t mExecCount = 0; int64_t mInterval = 0; ReadWriteLock mLock; bool mValidState = true; - std::shared_ptr mFuture; + std::shared_ptr> mFuture; + std::shared_ptr> mIsContextValidFuture; }; } // namespace logtail \ No newline at end of file diff --git a/core/prometheus/schedulers/ScrapeScheduler.cpp b/core/prometheus/schedulers/ScrapeScheduler.cpp index 43561ecaef..1f76522eb3 100644 --- a/core/prometheus/schedulers/ScrapeScheduler.cpp +++ b/core/prometheus/schedulers/ScrapeScheduler.cpp @@ -31,6 +31,7 @@ #include "pipeline/queue/ProcessQueueManager.h" #include "pipeline/queue/QueueKey.h" #include "prometheus/Constants.h" +#include "prometheus/async/PromFuture.h" #include "prometheus/async/PromHttpRequest.h" using namespace std; @@ -99,8 +100,14 @@ void ScrapeScheduler::PushEventGroup(PipelineEventGroup&& eGroup) { auto item = make_unique(std::move(eGroup), mInputIndex); #ifdef APSARA_UNIT_TEST_MAIN mItem.push_back(std::move(item)); + return; #endif - ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)); + while (true) { + if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) == 0) { + break; + } + usleep(10 * 1000); + } } string ScrapeScheduler::GetId() const { @@ -108,21 +115,34 @@ string ScrapeScheduler::GetId() const { } void ScrapeScheduler::ScheduleNext() { - auto future = std::make_shared(); + auto future = std::make_shared>(); + auto isContextValidFuture = std::make_shared>(); future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) { this->OnMetricResult(response, timestampMilliSec); this->ExecDone(); this->ScheduleNext(); + return true; + }); + isContextValidFuture->AddDoneCallback([this]() -> bool { + if (ProcessQueueManager::GetInstance()->IsValidToPush(mQueueKey)) { + return true; + } else { + this->DelayExecTime(1); + this->ScheduleNext(); + return false; + } }); if (IsCancelled()) { mFuture->Cancel(); + mIsContextValidFuture->Cancel(); return; } { WriteLock lock(mLock); mFuture = future; + mIsContextValidFuture = isContextValidFuture; } auto event = BuildScrapeTimerEvent(GetNextExecTime()); @@ -130,9 +150,10 @@ void ScrapeScheduler::ScheduleNext() { } void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) { - auto future = std::make_shared(); + auto future = std::make_shared>(); future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) { this->OnMetricResult(response, timestampMilliSec); + return true; }); mFuture = future; auto event = BuildScrapeTimerEvent(execTime); @@ -153,15 +174,19 @@ std::unique_ptr ScrapeScheduler::BuildScrapeTimerEvent(std::chrono:: mScrapeConfigPtr->mScrapeTimeoutSeconds, mScrapeConfigPtr->mScrapeIntervalSeconds / mScrapeConfigPtr->mScrapeTimeoutSeconds, - this->mFuture); + this->mFuture, + this->mIsContextValidFuture); auto timerEvent = std::make_unique(execTime, std::move(request)); return timerEvent; } void ScrapeScheduler::Cancel() { - if (mFuture) { + if (mFuture != nullptr) { mFuture->Cancel(); } + if (mIsContextValidFuture != nullptr) { + mIsContextValidFuture->Cancel(); + } { WriteLock lock(mLock); mValidState = false; diff --git a/core/prometheus/schedulers/TargetSubscriberScheduler.cpp b/core/prometheus/schedulers/TargetSubscriberScheduler.cpp index 3ff3bc8a55..de08efa2f2 100644 --- a/core/prometheus/schedulers/TargetSubscriberScheduler.cpp +++ b/core/prometheus/schedulers/TargetSubscriberScheduler.cpp @@ -237,11 +237,12 @@ string TargetSubscriberScheduler::GetId() const { } void TargetSubscriberScheduler::ScheduleNext() { - auto future = std::make_shared(); + auto future = std::make_shared>(); future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) { this->OnSubscription(response, timestampMilliSec); this->ExecDone(); this->ScheduleNext(); + return true; }); if (IsCancelled()) { mFuture->Cancel(); @@ -267,9 +268,10 @@ void TargetSubscriberScheduler::Cancel() { } void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_point execTime) { - auto future = std::make_shared(); + auto future = std::make_shared>(); future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampNanoSec) { this->OnSubscription(response, timestampNanoSec); + return true; }); mFuture = future; auto event = BuildSubscriberTimerEvent(execTime); diff --git a/core/unittest/prometheus/PromAsynUnittest.cpp b/core/unittest/prometheus/PromAsynUnittest.cpp index 67984adbb8..3ab400ef21 100644 --- a/core/unittest/prometheus/PromAsynUnittest.cpp +++ b/core/unittest/prometheus/PromAsynUnittest.cpp @@ -13,7 +13,7 @@ class PromAsynUnittest : public testing::Test { }; void PromAsynUnittest::TestExecTime() { - auto future = std::make_shared(); + auto future = std::make_shared>(); auto now = std::chrono::system_clock::now(); bool exec = false; future->AddDoneCallback([&exec, now](const HttpResponse&, uint64_t timestampMilliSec) { @@ -21,6 +21,7 @@ void PromAsynUnittest::TestExecTime() { std::chrono::duration_cast(now.time_since_epoch()).count()); APSARA_TEST_TRUE(exec); + return true; }); auto request = std::make_shared( "http", false, "127.0.0.1", 8080, "/", "", map(), "", 10, 3, future); diff --git a/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp b/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp index e38427c226..b1ed835990 100644 --- a/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp +++ b/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp @@ -31,14 +31,6 @@ using namespace std; namespace logtail { -class MockTimer : public Timer { -public: - void Init() {} - void PushEvent(std::unique_ptr&& e) { mQueue.push_back(std::move(e)); } - void Stop() {} - std::vector> mQueue; -}; - class ScrapeSchedulerUnittest : public testing::Test { public: void TestInitscrapeScheduler(); @@ -47,6 +39,7 @@ class ScrapeSchedulerUnittest : public testing::Test { void TestReceiveMessage(); void TestScheduler(); + void TestQueueIsFull(); protected: void SetUp() override { @@ -166,7 +159,7 @@ void ScrapeSchedulerUnittest::TestScheduler() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); ScrapeScheduler event(mScrapeConfig, "localhost", 8080, labels, 0, 0); - auto timer = make_shared(); + auto timer = make_shared(); event.SetTimer(timer); event.ScheduleNext(); @@ -178,9 +171,34 @@ void ScrapeSchedulerUnittest::TestScheduler() { APSARA_TEST_TRUE(event.mFuture->mState == PromFutureState::Done); } +void ScrapeSchedulerUnittest::TestQueueIsFull() { + Labels labels; + labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); + ScrapeScheduler event(mScrapeConfig, "localhost", 8080, labels, 0, 0); + auto timer = make_shared(); + event.SetTimer(timer); + auto now = std::chrono::steady_clock::now(); + event.SetFirstExecTime(now); + event.ScheduleNext(); + + APSARA_TEST_TRUE(timer->mQueue.size() == 1); + + const auto& e = timer->mQueue.top(); + APSARA_TEST_EQUAL(now, e->GetExecTime()); + APSARA_TEST_FALSE(e->IsValid()); + timer->mQueue.pop(); + // queue is full, so it should schedule next after 1 second + APSARA_TEST_EQUAL(1UL, timer->mQueue.size()); + const auto& next = timer->mQueue.top(); + APSARA_TEST_EQUAL(now + std::chrono::seconds(1), next->GetExecTime()); +} + UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestInitscrapeScheduler) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestProcess) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestSplitByLines) +UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestScheduler) +UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestQueueIsFull) + } // namespace logtail