Skip to content

Commit

Permalink
add compiler actors to stats
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds committed Jan 23, 2025
1 parent 4621fcc commit 2c68427
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace {
class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public ITopicFormatHandler, public TTypeParser {
using TBase = NActors::TActor<TTopicFormatHandler>;

static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_FORMAT_HANDLER";

struct TCounters {
TCountersDesc Desc;

Expand Down
12 changes: 9 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ struct TUserPoolMetrics {
auto microsecGroup = execpoolGroup->GetSubgroup("sensor", "ElapsedMicrosecByActivity");
Session = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_SESSION", true);
RowDispatcher = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER", true);
Compiler = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_COMPILER", true);
CompilerActor = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_COMPILE_ACTOR", true);
CompilerService = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_COMPILE_SERVICE", true);
FormatHandler = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_FORMAT_HANDLER", true);
}
::NMonitoring::TDynamicCounters::TCounterPtr Session;
::NMonitoring::TDynamicCounters::TCounterPtr RowDispatcher;
::NMonitoring::TDynamicCounters::TCounterPtr Compiler;
::NMonitoring::TDynamicCounters::TCounterPtr CompilerActor;
::NMonitoring::TDynamicCounters::TCounterPtr CompilerService;
::NMonitoring::TDynamicCounters::TCounterPtr FormatHandler;
};

struct TEvPrivate {
Expand Down Expand Up @@ -1145,7 +1149,9 @@ void TRowDispatcher::UpdateCpuTime() {
}
auto currentCpuTime = UserPoolMetrics.Session->Val()
+ UserPoolMetrics.RowDispatcher->Val()
+ UserPoolMetrics.Compiler->Val();
+ UserPoolMetrics.CompilerActor->Val()
+ UserPoolMetrics.CompilerService->Val()
+ UserPoolMetrics.FormatHandler->Val();
auto diff = (currentCpuTime - LastCpuTime) / Consumers.size();
for (auto& [actorId, consumer] : Consumers) {
consumer->CpuMicrosec += diff;
Expand Down
5 changes: 3 additions & 2 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import ydb.public.api.protos.draft.fq_pb2 as fq

YDS_CONNECTION = "yds"

COMPUTE_NODE_COUNT = 3

@pytest.fixture
def kikimr(request):
kikimr_conf = StreamingOverKikimrConfig(
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(3)}
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(COMPUTE_NODE_COUNT)}
)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
Expand Down Expand Up @@ -957,6 +957,7 @@ def test_sensors(self, kikimr, client):

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_COMPILE_SERVICE", COMPUTE_NODE_COUNT)
wait_row_dispatcher_sensor_value(kikimr, "ClientsCount", 1)
wait_row_dispatcher_sensor_value(kikimr, "RowsSent", 1, exact_match=False)
wait_row_dispatcher_sensor_value(kikimr, "IncomingRequests", 1, exact_match=False)
Expand Down

0 comments on commit 2c68427

Please sign in to comment.