Skip to content

Commit

Permalink
Add ChainActor ping kind
Browse files Browse the repository at this point in the history
  • Loading branch information
eivanov89 committed Jan 23, 2025
1 parent 819d850 commit 8c0ae08
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 23 deletions.
205 changes: 205 additions & 0 deletions ydb/core/grpc_services/rpc_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,205 @@ class TExecuteTxProxyPingRPC : public TActorBootstrapped<TExecuteTxProxyPingRPC>
std::shared_ptr<TEvTxProxyRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvActorChainRequest = TGrpcRequestNoOperationCall<Debug::ActorChainRequest, Debug::ActorChainResponse>;

// creates a chain of TChainWorkerActor workers and replies, when they finish
class TExecuteActorChainPingRPC : public TActorBootstrapped<TExecuteActorChainPingRPC> {
private:
struct TEvPrivate {
enum EEv {
EvChainItemComplete = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

struct TEvChainItemComplete : TEventLocal<TEvChainItemComplete, EvChainItemComplete> {
};
};

class TChainWorkerActor : public TActorBootstrapped<TChainWorkerActor> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TChainWorkerActor(const TActorId& prevActor, size_t chainsLeft, size_t workUsec, bool noTailChain)
: ReplyTo(prevActor)
, ChainsLeft(chainsLeft)
, WorkUsec(workUsec)
, NoTailChain(noTailChain)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvChainItemComplete, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with "
<< ChainsLeft << " next chains and " << ReplyTo << " prev actor bootstrapped");

if (ChainsLeft == 0) {
WorkAndDie();
return;
}

auto* nextChainActor = new TChainWorkerActor(SelfId(), ChainsLeft - 1, WorkUsec, NoTailChain);
if (NoTailChain) {
RegisterWithSameMailbox(nextChainActor);
} else {
// same mailbox + tail
TlsActivationContext->ExecutorThread.RegisterActor<ESendingType::Tail>(
nextChainActor,
&TlsActivationContext->Mailbox,
this->SelfId());
}
}

void WorkAndDie() {
// assume that 1 iteration is ~ 0.5 ns
size_t iterations = WorkUsec * 1000 / 2;
volatile size_t dummy = 0;
for (size_t i = 0; i < iterations; ++i) {
++dummy;
}

if (NoTailChain) {
Send(ReplyTo, new TEvPrivate::TEvChainItemComplete());
} else {
Send<ESendingType::Tail>(ReplyTo, new TEvPrivate::TEvChainItemComplete());
}

PassAway();
}

void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with "
<< ChainsLeft << " next chains and " << ReplyTo << " prev actor got chain reply");

WorkAndDie();
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
Send<ESendingType::Tail>(ReplyTo, new TEvPrivate::TEvChainItemComplete());
PassAway();
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TChainWorkerActor in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
TActorId ReplyTo;
size_t ChainsLeft;
size_t WorkUsec;
bool NoTailChain;
};

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteActorChainPingRPC(TEvActorChainRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvChainItemComplete, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
const auto* req = Request_->GetProtoRequest();
size_t chainLength = req->GetChainLength();
if (chainLength == 0) {
chainLength = 10;
}
size_t workUsec = req->GetWorkUsec();
if (workUsec == 0) {
workUsec = 5;
}

bool noTailChain = req->GetNoTailChain();

LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId()
<< " sending ping to ActorChain of length " << chainLength << " with work " << workUsec << " usec");

auto* nextChainActor = new TChainWorkerActor(this->SelfId(), chainLength, workUsec, noTailChain);
TActorId nextChainActorId;
if (noTailChain) {
nextChainActorId = RegisterWithSameMailbox(nextChainActor);
} else {
// same mailbox + tail
nextChainActorId = TlsActivationContext->ExecutorThread.RegisterActor<ESendingType::Tail>(
nextChainActor,
&TlsActivationContext->Mailbox,
this->SelfId());
}

if (!nextChainActorId) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to ActorChain");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from full ActorChain");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteActorChainPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvActorChainRequest> Request_;
};

} // anonymous

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -248,4 +447,10 @@ void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&
f.RegisterActor(new TExecuteTxProxyPingRPC(request));
}

void DoActorChainPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvActorChainRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoActorChainPing");
f.RegisterActor(new TExecuteActorChainPingRPC(request));
}

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoActorChainPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ enum class ERequestType: size_t {
PING_KQP,
PING_SCHEME_CACHE,
PING_TX_PROXY,
PING_ACTOR_CHAIN,

// Requests inside write session
TOPIC_STREAMWRITE,
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/grpc/ydb_debug_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ service DebugService {
rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse);
rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse);
rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse);
rpc PingActorChain(Debug.ActorChainRequest) returns (Debug.ActorChainResponse);
}
19 changes: 19 additions & 0 deletions ydb/public/api/protos/ydb_debug.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ message TxProxyResponse {
StatusIds.StatusCode status = 1;
repeated Ydb.Issue.IssueMessage issues = 2;
}

// Ping Actor Chain

message ActorChainRequest {

// number of actors to be created, default 10
uint32 ChainLength = 1;

// immitate work duration for each actor (approximate), default ~ 5 usec
uint32 WorkUsec = 2;

// don't use tail sends and registrations
bool NoTailChain = 3;
}

message ActorChainResponse {
StatusIds.StatusCode status = 1;
repeated Ydb.Issue.IssueMessage issues = 2;
}
30 changes: 28 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_latency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ void Evaluate(
} else {
++result.ErrorCount;
}
} catch (...) {
} catch (yexception ex) {
TStringStream ss;
ss << "Failed to perform request: " << ex.what() << Endl;
Cerr << ss.Str();
++result.ErrorCount;
}
}
Expand Down Expand Up @@ -143,8 +146,12 @@ TCommandLatency::TCommandLatency()
, Format(DEFAULT_FORMAT)
, RunKind(DEFAULT_RUN_KIND)
, Percentile(DEFAULT_PERCENTILE)
, ChainConfig(new NDebug::TActorChainPingSettings())
{}

TCommandLatency::~TCommandLatency() {
}

void TCommandLatency::Config(TConfig& config) {
TYdbCommand::Config(config);

Expand All @@ -164,8 +171,19 @@ void TCommandLatency::Config(TConfig& config) {
'f', "format", TStringBuilder() << "Output format. Available options: " << availableFormats)
.OptionalArgument("STRING").StoreResult(&Format).DefaultValue(DEFAULT_FORMAT);
config.Opts->AddLongOption(
'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: "<< availableKinds)
'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: " << availableKinds)
.OptionalArgument("STRING").StoreResult(&RunKind).DefaultValue(DEFAULT_RUN_KIND);

// actor chain options
config.Opts->AddLongOption(
"chain-length", TStringBuilder() << "Chain length (ActorChain kind only)")
.OptionalArgument("INT").StoreResult(&ChainConfig->ChainLength_).DefaultValue(ChainConfig->ChainLength_);
config.Opts->AddLongOption(
"chain-work-duration", TStringBuilder() << "Duration of work in usec for each actor in the chain (ActorChain kind only)")
.OptionalArgument("INT").StoreResult(&ChainConfig->WorkUsec_).DefaultValue(ChainConfig->WorkUsec_);
config.Opts->AddLongOption(
"no-tail-chain", TStringBuilder() << "Don't use Tail sends and registrations (ActorChain kind only)")
.NoArgument().SetFlag(&ChainConfig->NoTailChain_).DefaultValue(ChainConfig->NoTailChain_);
}

void TCommandLatency::Parse(TConfig& config) {
Expand Down Expand Up @@ -218,6 +236,13 @@ int TCommandLatency::Run(TConfig& config) {
};
};

auto chainConfig = *ChainConfig;
auto txActorChainPingFactory = [debugClient, chainConfig] () {
return [debugClient, chainConfig] () {
return TCommandPing::PingActorChain(*debugClient, chainConfig);
};
};

using TTaskPair = std::pair<TCommandPing::EPingKind, TCallableFactory>;
const std::vector<TTaskPair> allTasks = {
{ TCommandPing::EPingKind::PlainGrpc, plainGrpcPingFactory },
Expand All @@ -226,6 +251,7 @@ int TCommandLatency::Run(TConfig& config) {
{ TCommandPing::EPingKind::Select1, select1Factory },
{ TCommandPing::EPingKind::SchemeCache, schemeCachePingFactory },
{ TCommandPing::EPingKind::TxProxy, txProxyPingFactory },
{ TCommandPing::EPingKind::ActorChain, txActorChainPingFactory },
};

std::vector<TTaskPair> runTasks;
Expand Down
16 changes: 14 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_latency.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/lib/ydb_cli/common/interruptible.h>

namespace NYdb::NConsoleClient {
#include <memory>

namespace NYdb {

namespace NDebug {
struct TActorChainPingSettings;
}

namespace NConsoleClient {

class TCommandLatency
: public TYdbCommand
Expand All @@ -22,6 +30,7 @@ class TCommandLatency

public:
TCommandLatency();
~TCommandLatency();

virtual void Config(TConfig& config) override;
virtual void Parse(TConfig& config) override;
Expand All @@ -33,6 +42,9 @@ class TCommandLatency
EFormat Format;
TCommandPing::EPingKind RunKind;
double Percentile;

std::unique_ptr<NDebug::TActorChainPingSettings> ChainConfig;
};

} // NYdb::NConsoleClient
} // namespace NConsoleClient
} // namespace NYdb
Loading

0 comments on commit 8c0ae08

Please sign in to comment.