Skip to content

Commit

Permalink
Implemented batching in wilson uploader (#1955)
Browse files Browse the repository at this point in the history
  • Loading branch information
domwst authored Feb 16, 2024
1 parent 75f69cf commit 126ee0c
Showing 1 changed file with 167 additions and 50 deletions.
217 changes: 167 additions & 50 deletions ydb/library/actors/wilson/wilson_uploader.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "wilson_uploader.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
Expand All @@ -7,7 +8,9 @@
#include <library/cpp/string_utils/url/url.h>
#include <util/stream/file.h>
#include <util/string/hex.h>

#include <chrono>
#include <queue>

namespace NWilson {

Expand All @@ -18,11 +21,83 @@ namespace NWilson {

namespace {

struct TSpan {
TMonotonic ExpirationTimestamp;
NTraceProto::Span Span;
size_t Size;
};

class TBatch {
private:
ui64 MaxSpansInBatch;
ui64 MaxBytesInBatch;

NServiceProto::ExportTraceServiceRequest Request;
NTraceProto::ScopeSpans* ScopeSpans;
ui64 SizeBytes = 0;
TMonotonic ExpirationTimestamp = TMonotonic::Zero();

public:
struct TData {
NServiceProto::ExportTraceServiceRequest Request;
ui64 SizeBytes;
ui64 SizeSpans;
TMonotonic ExpirationTimestamp;
};

TBatch(ui64 maxSpansInBatch, ui64 maxBytesInBatch, TString serviceName)
: MaxSpansInBatch(maxSpansInBatch)
, MaxBytesInBatch(maxBytesInBatch)
{
auto *rspan = Request.add_resource_spans();
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
serviceNameAttr->set_key("service.name");
serviceNameAttr->mutable_value()->set_string_value(std::move(serviceName));
ScopeSpans = rspan->add_scope_spans();
}

size_t SizeSpans() const {
return ScopeSpans->spansSize();
}

bool IsEmpty() const {
return SizeSpans() == 0;
}

bool Add(TSpan& span) {
if (SizeBytes + span.Size > MaxBytesInBatch || SizeSpans() == MaxSpansInBatch) {
return false;
}
SizeBytes += span.Size;
span.Span.Swap(ScopeSpans->add_spans());
ExpirationTimestamp = span.ExpirationTimestamp;
return true;
}

TData Complete() && {
return TData {
.Request = std::move(Request),
.SizeBytes = SizeBytes,
.SizeSpans = SizeSpans(),
.ExpirationTimestamp = ExpirationTimestamp,
};
}
};

class TWilsonUploader
: public TActorBootstrapped<TWilsonUploader>
{
static constexpr size_t WILSON_SERVICE_ID = 430;

ui64 MaxPendingSpanBytes = 100'000'000;
ui64 MaxSpansInBatch = 150;
ui64 MaxBytesInBatch = 20'000'000;
TDuration MaxBatchAccumulation = TDuration::Seconds(1);
ui32 MaxSpansPerSecond = 10;
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);

bool WakeupScheduled = false;

TString CollectorUrl;
TString ServiceName;

Expand All @@ -36,26 +111,20 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;

struct TSpanQueueItem {
TMonotonic ExpirationTimestamp;
NTraceProto::Span Span;
ui32 Size;
};

std::deque<TSpanQueueItem> Spans;
ui64 SpansSize = 0;
TBatch CurrentBatch;
std::queue<TBatch::TData> BatchQueue;
ui64 SpansSizeBytes = 0;
TMonotonic NextSendTimestamp;
ui32 MaxSpansAtOnce = 25;
ui32 MaxSpansPerSecond = 10;
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);

bool WakeupScheduled = false;
bool BatchCompletionScheduled = false;
TMonotonic NextBatchCompletion;

public:
TWilsonUploader(WilsonUploaderParams params)
: CollectorUrl(std::move(params.CollectorUrl))
, ServiceName(std::move(params.ServiceName))
, GrpcSigner(std::move(params.GrpcSigner))
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
{}

~TWilsonUploader() {
Expand Down Expand Up @@ -87,28 +156,69 @@ namespace NWilson {
}

void Handle(TEvWilson::TPtr ev) {
if (SpansSize >= 100'000'000) {
if (SpansSizeBytes >= MaxPendingSpanBytes) {
LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow");
} else {
const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
const TMonotonic now = TActivationContext::Monotonic();
const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue;
auto& span = ev->Get()->Span;
const ui32 size = span.ByteSizeLong();
Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
SpansSize += size;
if (size > MaxBytesInBatch) {
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch);
return;
}
TSpan spanItem {
.ExpirationTimestamp = expirationTimestamp,
.Span = std::move(span),
.Size = size,
};
SpansSizeBytes += size;
if (CurrentBatch.IsEmpty()) {
ScheduleBatchCompletion(now);
}
if (CurrentBatch.Add(spanItem)) {
return;
}
CompleteCurrentBatch();
TryMakeProgress();
Y_ABORT_UNLESS(CurrentBatch.Add(spanItem), "failed to add span to empty batch");
ScheduleBatchCompletion(now);
}
}

void ScheduleBatchCompletionEvent() {
Y_ABORT_UNLESS(!BatchCompletionScheduled);
auto cookie = NextBatchCompletion.GetValue();
TActivationContext::Schedule(NextBatchCompletion, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, cookie));
ALOG_TRACE(WILSON_SERVICE_ID, "scheduling batch completion w/ cookie=" << cookie);
BatchCompletionScheduled = true;
}

void ScheduleBatchCompletion(TMonotonic now) {
NextBatchCompletion = now + MaxBatchAccumulation;
if (!BatchCompletionScheduled) {
ScheduleBatchCompletionEvent();
}
}

void CompleteCurrentBatch() {
if (CurrentBatch.IsEmpty()) {
return;
}
BatchQueue.push(std::move(CurrentBatch).Complete());
CurrentBatch = TBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName);
}

void TryToSend() {
const TMonotonic now = TActivationContext::Monotonic();

ui32 numSpansDropped = 0;
while (!Spans.empty()) {
const TSpanQueueItem& item = Spans.front();
while (!BatchQueue.empty()) {
const TBatch::TData& item = BatchQueue.front();
if (item.ExpirationTimestamp <= now) {
SpansSize -= item.Size;
Spans.pop_front();
++numSpansDropped;
SpansSizeBytes -= item.SizeBytes;
numSpansDropped += item.SizeSpans;
BatchQueue.pop();
} else {
break;
}
Expand All @@ -119,42 +229,36 @@ namespace NWilson {
"dropped " << numSpansDropped << " span(s) due to expiration");
}

if (Context || Spans.empty()) {
if (Context || BatchQueue.empty()) {
return;
} else if (now < NextSendTimestamp) {
ScheduleWakeup(NextSendTimestamp);
return;
}

NServiceProto::ExportTraceServiceRequest request;
auto *rspan = request.add_resource_spans();
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
serviceNameAttr->set_key("service.name");
serviceNameAttr->mutable_value()->set_string_value(ServiceName);
auto *sspan = rspan->add_scope_spans();

NextSendTimestamp = now;
for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
auto& item = Spans.front();
auto& s = item.Span;

LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span"
<< " TraceId# " << HexEncode(s.trace_id())
<< " SpanId# " << HexEncode(s.span_id())
<< " ParentSpanId# " << HexEncode(s.parent_span_id())
<< " Name# " << s.name());

SpansSize -= item.Size;
s.Swap(sspan->add_spans());
NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);

TBatch::TData batch = std::move(BatchQueue.front());
BatchQueue.pop();

ALOG_DEBUG(WILSON_SERVICE_ID, "exporting batch of " << batch.SizeSpans << " spans, total spans size: " << batch.SizeBytes);
Y_ABORT_UNLESS(batch.Request.resource_spansSize() == 1 && batch.Request.resource_spans(0).scope_spansSize() == 1);
for (const auto& span : batch.Request.resource_spans(0).scope_spans(0).spans()) {
ALOG_DEBUG(WILSON_SERVICE_ID, "exporting span"
<< " TraceId# " << HexEncode(span.trace_id())
<< " SpanId# " << HexEncode(span.span_id())
<< " ParentSpanId# " << HexEncode(span.parent_span_id())
<< " Name# " << span.name());
}

NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond);
SpansSizeBytes -= batch.SizeBytes;

ScheduleWakeup(NextSendTimestamp);
Context = std::make_unique<grpc::ClientContext>();
if (GrpcSigner) {
GrpcSigner->SignClientContext(*Context);
}
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
Reader = Stub->AsyncExport(Context.get(), std::move(batch.Request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}

Expand All @@ -179,15 +283,28 @@ namespace NWilson {
template<typename T>
void ScheduleWakeup(T&& deadline) {
if (!WakeupScheduled) {
TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
nullptr, 0));
TActivationContext::Schedule(deadline,
new IEventHandle(TEvents::TSystem::Wakeup, 0,
SelfId(), {}, nullptr, 0));
WakeupScheduled = true;
}
}

void HandleWakeup() {
Y_ABORT_UNLESS(WakeupScheduled);
WakeupScheduled = false;
void HandleWakeup(TEvents::TEvWakeup::TPtr& ev) {
const auto cookie = ev->Cookie;
ALOG_TRACE(WILSON_SERVICE_ID, "wakeup received w/ cookie=" << cookie);
if (cookie == 0) {
Y_ABORT_UNLESS(WakeupScheduled);
WakeupScheduled = false;
} else {
Y_ABORT_UNLESS(BatchCompletionScheduled);
BatchCompletionScheduled = false;
if (cookie == NextBatchCompletion.GetValue()) {
CompleteCurrentBatch();
} else {
ScheduleBatchCompletionEvent();
}
}
TryMakeProgress();
}

Expand All @@ -198,7 +315,7 @@ namespace NWilson {

STRICT_STFUNC(StateWork,
hFunc(TEvWilson, Handle);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
hFunc(TEvents::TEvWakeup, HandleWakeup);
);

STRICT_STFUNC(StateBroken,
Expand Down

0 comments on commit 126ee0c

Please sign in to comment.