Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ChainActor ping kind #12917

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/apps/ydb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Added CREATE TABLE text suggestion on scheme error during `ydb import file csv`
* Backup and restore of changefeeds has been added to `ydb tools dump` and `ydb tools restore`. As a result, there are changes in the backup file structure: for tables with changefeeds, a subdirectory is created for each changefeed, named after the changefeed. This subdirectory contains two files: `changefeed_description.pb`, which contains the changefeed description, and `topic_description.pb`, which contains information about the underlying topic.
* Added `--skip-checksum-validation` option to `ydb import s3` command to skip server-side checksum validation.
* Added new experimental options for `ydb debug ping` command: `--chain-length`, `--chain-work-duration`, `--no-tail-chain`.


## 2.18.0 ##

Expand Down
205 changes: 205 additions & 0 deletions ydb/core/grpc_services/rpc_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,205 @@ class TExecuteTxProxyPingRPC : public TActorBootstrapped<TExecuteTxProxyPingRPC>
std::shared_ptr<TEvTxProxyRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvActorChainRequest = TGrpcRequestNoOperationCall<Debug::ActorChainRequest, Debug::ActorChainResponse>;

// creates a chain of TChainWorkerActor workers and replies, when they finish
class TExecuteActorChainPingRPC : public TActorBootstrapped<TExecuteActorChainPingRPC> {
private:
struct TEvPrivate {
enum EEv {
EvChainItemComplete = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

struct TEvChainItemComplete : TEventLocal<TEvChainItemComplete, EvChainItemComplete> {
};
};

class TChainWorkerActor : public TActorBootstrapped<TChainWorkerActor> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TChainWorkerActor(const TActorId& prevActor, size_t chainsLeft, size_t workUsec, bool noTailChain)
: ReplyTo(prevActor)
, ChainsLeft(chainsLeft)
, WorkUsec(workUsec)
, NoTailChain(noTailChain)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvChainItemComplete, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with "
<< ChainsLeft << " next chains and " << ReplyTo << " prev actor bootstrapped");

if (ChainsLeft == 0) {
WorkAndDie();
return;
}

auto* nextChainActor = new TChainWorkerActor(SelfId(), ChainsLeft - 1, WorkUsec, NoTailChain);
if (NoTailChain) {
RegisterWithSameMailbox(nextChainActor);
} else {
// same mailbox + tail
TlsActivationContext->ExecutorThread.RegisterActor<ESendingType::Tail>(
nextChainActor,
&TlsActivationContext->Mailbox,
this->SelfId());
}
}

void WorkAndDie() {
// assume that 1 iteration is ~ 0.5 ns
size_t iterations = WorkUsec * 1000 / 2;
volatile size_t dummy = 0;
for (size_t i = 0; i < iterations; ++i) {
++dummy;
}

if (NoTailChain) {
Send(ReplyTo, new TEvPrivate::TEvChainItemComplete());
} else {
Send<ESendingType::Tail>(ReplyTo, new TEvPrivate::TEvChainItemComplete());
}

PassAway();
}

void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with "
<< ChainsLeft << " next chains and " << ReplyTo << " prev actor got chain reply");

WorkAndDie();
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
Send<ESendingType::Tail>(ReplyTo, new TEvPrivate::TEvChainItemComplete());
PassAway();
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TChainWorkerActor in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
TActorId ReplyTo;
size_t ChainsLeft;
size_t WorkUsec;
bool NoTailChain;
};

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteActorChainPingRPC(TEvActorChainRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvChainItemComplete, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
const auto* req = Request_->GetProtoRequest();
size_t chainLength = req->GetChainLength();
if (chainLength == 0) {
chainLength = 10;
}
size_t workUsec = req->GetWorkUsec();
if (workUsec == 0) {
workUsec = 5;
}

bool noTailChain = req->GetNoTailChain();

LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId()
<< " sending ping to ActorChain of length " << chainLength << " with work " << workUsec << " usec");

auto* nextChainActor = new TChainWorkerActor(this->SelfId(), chainLength, workUsec, noTailChain);
TActorId nextChainActorId;
if (noTailChain) {
nextChainActorId = RegisterWithSameMailbox(nextChainActor);
} else {
// same mailbox + tail
nextChainActorId = TlsActivationContext->ExecutorThread.RegisterActor<ESendingType::Tail>(
nextChainActor,
&TlsActivationContext->Mailbox,
this->SelfId());
}

if (!nextChainActorId) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to ActorChain");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from full ActorChain");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteActorChainPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvActorChainRequest> Request_;
};

} // anonymous

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -248,4 +447,10 @@ void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&
f.RegisterActor(new TExecuteTxProxyPingRPC(request));
}

void DoActorChainPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvActorChainRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoActorChainPing");
f.RegisterActor(new TExecuteActorChainPingRPC(request));
}

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoActorChainPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ enum class ERequestType: size_t {
PING_KQP,
PING_SCHEME_CACHE,
PING_TX_PROXY,
PING_ACTOR_CHAIN,

// Requests inside write session
TOPIC_STREAMWRITE,
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/grpc/ydb_debug_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ service DebugService {
rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse);
rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse);
rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse);
rpc PingActorChain(Debug.ActorChainRequest) returns (Debug.ActorChainResponse);
}
19 changes: 19 additions & 0 deletions ydb/public/api/protos/ydb_debug.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ message TxProxyResponse {
StatusIds.StatusCode status = 1;
repeated Ydb.Issue.IssueMessage issues = 2;
}

// Ping Actor Chain

message ActorChainRequest {

// number of actors to be created, default 10
uint32 ChainLength = 1;

// immitate work duration for each actor (approximate), default ~ 5 usec
uint32 WorkUsec = 2;

// don't use tail sends and registrations
bool NoTailChain = 3;
}

message ActorChainResponse {
StatusIds.StatusCode status = 1;
repeated Ydb.Issue.IssueMessage issues = 2;
}
30 changes: 28 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_latency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ void Evaluate(
} else {
++result.ErrorCount;
}
} catch (...) {
} catch (yexception ex) {
TStringStream ss;
ss << "Failed to perform request: " << ex.what() << Endl;
Cerr << ss.Str();
++result.ErrorCount;
}
}
Expand Down Expand Up @@ -143,8 +146,12 @@ TCommandLatency::TCommandLatency()
, Format(DEFAULT_FORMAT)
, RunKind(DEFAULT_RUN_KIND)
, Percentile(DEFAULT_PERCENTILE)
, ChainConfig(new NDebug::TActorChainPingSettings())
{}

TCommandLatency::~TCommandLatency() {
}

void TCommandLatency::Config(TConfig& config) {
TYdbCommand::Config(config);

Expand All @@ -164,8 +171,19 @@ void TCommandLatency::Config(TConfig& config) {
'f', "format", TStringBuilder() << "Output format. Available options: " << availableFormats)
.OptionalArgument("STRING").StoreResult(&Format).DefaultValue(DEFAULT_FORMAT);
config.Opts->AddLongOption(
'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: "<< availableKinds)
'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: " << availableKinds)
.OptionalArgument("STRING").StoreResult(&RunKind).DefaultValue(DEFAULT_RUN_KIND);

// actor chain options
config.Opts->AddLongOption(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New CLI options should be addressed in changelog.md

"chain-length", TStringBuilder() << "Chain length (ActorChain kind only)")
.OptionalArgument("INT").StoreResult(&ChainConfig->ChainLength_).DefaultValue(ChainConfig->ChainLength_);
config.Opts->AddLongOption(
"chain-work-duration", TStringBuilder() << "Duration of work in usec for each actor in the chain (ActorChain kind only)")
.OptionalArgument("INT").StoreResult(&ChainConfig->WorkUsec_).DefaultValue(ChainConfig->WorkUsec_);
config.Opts->AddLongOption(
"no-tail-chain", TStringBuilder() << "Don't use Tail sends and registrations (ActorChain kind only)")
.NoArgument().SetFlag(&ChainConfig->NoTailChain_).DefaultValue(ChainConfig->NoTailChain_);
}

void TCommandLatency::Parse(TConfig& config) {
Expand Down Expand Up @@ -218,6 +236,13 @@ int TCommandLatency::Run(TConfig& config) {
};
};

auto chainConfig = *ChainConfig;
auto txActorChainPingFactory = [debugClient, chainConfig] () {
return [debugClient, chainConfig] () {
return TCommandPing::PingActorChain(*debugClient, chainConfig);
};
};

using TTaskPair = std::pair<TCommandPing::EPingKind, TCallableFactory>;
const std::vector<TTaskPair> allTasks = {
{ TCommandPing::EPingKind::PlainGrpc, plainGrpcPingFactory },
Expand All @@ -226,6 +251,7 @@ int TCommandLatency::Run(TConfig& config) {
{ TCommandPing::EPingKind::Select1, select1Factory },
{ TCommandPing::EPingKind::SchemeCache, schemeCachePingFactory },
{ TCommandPing::EPingKind::TxProxy, txProxyPingFactory },
{ TCommandPing::EPingKind::ActorChain, txActorChainPingFactory },
};

std::vector<TTaskPair> runTasks;
Expand Down
16 changes: 14 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_latency.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/lib/ydb_cli/common/interruptible.h>

namespace NYdb::NConsoleClient {
#include <memory>

namespace NYdb {

namespace NDebug {
struct TActorChainPingSettings;
}

namespace NConsoleClient {

class TCommandLatency
: public TYdbCommand
Expand All @@ -22,6 +30,7 @@ class TCommandLatency

public:
TCommandLatency();
~TCommandLatency();

virtual void Config(TConfig& config) override;
virtual void Parse(TConfig& config) override;
Expand All @@ -33,6 +42,9 @@ class TCommandLatency
EFormat Format;
TCommandPing::EPingKind RunKind;
double Percentile;

std::unique_ptr<NDebug::TActorChainPingSettings> ChainConfig;
};

} // NYdb::NConsoleClient
} // namespace NConsoleClient
} // namespace NYdb
Loading
Loading