From e1f89b09d963220259d8c0ecc76b1db6d9797bec Mon Sep 17 00:00:00 2001 From: Alexey Efimov Date: Tue, 17 Dec 2024 16:02:53 +0100 Subject: [PATCH] get rid of sync http mon (#12637) --- .../pdisk/blobstorage_pdisk_ut_run.cpp | 10 +- ydb/core/blobstorage/ut_vdisk/lib/astest.h | 18 +- ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp | 6 +- ydb/core/driver_lib/run/run.cpp | 13 +- ydb/core/http_proxy/ut/datastreams_fixture.h | 6 +- ydb/core/mon/async_http_mon.cpp | 941 ----------------- ydb/core/mon/async_http_mon.h | 54 - ydb/core/mon/mon.cpp | 979 +++++++++++++++++- ydb/core/mon/mon.h | 53 +- ydb/core/mon/sync_http_mon.cpp | 120 --- ydb/core/mon/sync_http_mon.h | 37 - ydb/core/mon/ya.make | 4 - ydb/core/persqueue/ut/counters_ut.cpp | 2 +- ydb/core/protos/feature_flags.proto | 2 +- ydb/core/testlib/actors/test_runtime.cpp | 21 +- .../persqueue_new_schemecache_ut.cpp | 10 +- ydb/services/persqueue_v1/persqueue_ut.cpp | 6 +- 17 files changed, 1046 insertions(+), 1236 deletions(-) delete mode 100644 ydb/core/mon/async_http_mon.cpp delete mode 100644 ydb/core/mon/async_http_mon.h delete mode 100644 ydb/core/mon/sync_http_mon.cpp delete mode 100644 ydb/core/mon/sync_http_mon.h diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp index 9dab031de000..a63f61128a26 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp @@ -4,7 +4,7 @@ #include "blobstorage_pdisk_ut_base_test.h" #include -#include +#include #include #include #include @@ -142,17 +142,21 @@ void Run(TVector tests, TTestRunConfig runCfg) { if (IsMonitoringEnabled) { // Monitoring startup - monitoring.Reset(new NActors::TSyncHttpMon({ + monitoring.Reset(new NActors::TMon({ .Port = pm.GetPort(8081), .Title = "TestYard monitoring" })); appData.Mon = monitoring.Get(); monitoring->RegisterCountersPage("counters", "Counters", mainCounters); - monitoring->Start(); } actorSystem1->Start(); + + if (IsMonitoringEnabled) { + monitoring->Start(actorSystem1.Get()); + } + Sleep(TDuration::MilliSeconds(runCfg.BeforeTestSleepMs)); VERBOSE_COUT("Sending TEvBoot to test"); diff --git a/ydb/core/blobstorage/ut_vdisk/lib/astest.h b/ydb/core/blobstorage/ut_vdisk/lib/astest.h index fe8c6b0e124b..8d74c586a99a 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/astest.h +++ b/ydb/core/blobstorage/ut_vdisk/lib/astest.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -82,6 +82,7 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) { NKikimrServices::EServiceKikimr_Name ); TString explanation; + logSettings->SetLevel(NLog::PRI_TRACE, NActorsServices::EServiceCommon::HTTP, explanation); //logSettings->SetLevel(NLog::PRI_INFO, NKikimrServices::BS_SKELETON, explanation); //logSettings->SetLevel(NLog::PRI_INFO, NKikimrServices::BS_HULLCOMP, explan NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(logSettings, @@ -92,12 +93,6 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) { setup1->LocalServices.push_back(std::move(loggerActorPair)); ////////////////////////////////////////////////////////////////////////////// - ///////////////////////// SETUP TEST ACTOR /////////////////////////////////// - NActors::TActorId testActorId = NActors::TActorId(1, "test123"); - TActorSetupCmd testActorSetup(testActor, TMailboxType::Simple, 0); - setup1->LocalServices.push_back(std::pair(testActorId, std::move(testActorSetup))); - ////////////////////////////////////////////////////////////////////////////// - ///////////////////////// TYPE REGISTRY ////////////////////////////////////// TIntrusivePtr typeRegistry(new NKikimr::NScheme::TKikimrTypeRegistry()); ////////////////////////////////////////////////////////////////////////////// @@ -106,14 +101,13 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) { if (!MonPort) { MonPort = pm.GetPort(MonPort); } - Monitoring.reset(new NActors::TSyncHttpMon({ + Monitoring.reset(new NActors::TMon({ .Port = MonPort, .Title = "at" })); NMonitoring::TIndexMonPage *actorsMonPage = Monitoring->RegisterIndexPage("actors", "Actors"); Y_UNUSED(actorsMonPage); Monitoring->RegisterCountersPage("counters", "Counters", Counters); - Monitoring->Start(); loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Monitoring settings set up"); ////////////////////////////////////////////////////////////////////////////// @@ -126,11 +120,15 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) { ActorSystem1.reset(new TActorSystem(setup1, AppData.get(), logSettings)); loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Actor system created"); - ActorSystem1->Start(); LOG_NOTICE(*ActorSystem1, NActorsServices::TEST, "Actor system started"); + Monitoring->Start(ActorSystem1.get()).wait(); + ///////////////////////// SETUP TEST ACTOR /////////////////////////////////// + NActors::TActorId testActorId = NActors::TActorId(1, "test123"); + ActorSystem1->RegisterLocalService(testActorId, ActorSystem1->Register(testActor, TMailboxType::Simple, 0)); + ////////////////////////////////////////////////////////////////////////////// DoneEvent.Wait(); ActorSystem1->Stop(); diff --git a/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp b/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp index cbc459a4cc8b..c692fb26663c 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp @@ -9,7 +9,7 @@ #include #include -#include +#include #include #include @@ -368,13 +368,12 @@ void TConfiguration::Prepare(IVDiskSetup *vdiskSetup, bool newPDisks, bool runRe ////////////////////////////////////////////////////////////////////////////// ///////////////////////// MONITORING SETTINGS ///////////////////////////////// - Monitoring.reset(new NActors::TSyncHttpMon({ + Monitoring.reset(new NActors::TMon({ .Port = 8088, .Title = "at" })); NMonitoring::TIndexMonPage *actorsMonPage = Monitoring->RegisterIndexPage("actors", "Actors"); Monitoring->RegisterCountersPage("counters", "Counters", Counters); - Monitoring->Start(); loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Monitoring settings set up"); ////////////////////////////////////////////////////////////////////////////// @@ -391,6 +390,7 @@ void TConfiguration::Prepare(IVDiskSetup *vdiskSetup, bool newPDisks, bool runRe loggerActor->Log(Now(), NKikimr::NLog::PRI_NOTICE, NActorsServices::TEST, "Actor system created"); ActorSystem1->Start(); + Monitoring->Start(ActorSystem1.get()); LOG_NOTICE(*ActorSystem1, NActorsServices::TEST, "Actor system started"); } diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 788e82d98cc5..9eea1050c656 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -35,8 +35,7 @@ #include #include #include -#include -#include +#include #include #include @@ -473,11 +472,7 @@ void TKikimrRunner::InitializeMonitoring(const TKikimrRunConfig& runConfig, bool if (ModuleFactories && ModuleFactories->MonitoringFactory) { Monitoring = ModuleFactories->MonitoringFactory(std::move(monConfig), appConfig); } else { - if (appConfig.GetFeatureFlags().GetEnableAsyncHttpMon()) { - Monitoring = new NActors::TAsyncHttpMon(std::move(monConfig)); - } else { - Monitoring = new NActors::TSyncHttpMon(std::move(monConfig)); - } + Monitoring = new NActors::TMon(std::move(monConfig)); } if (Monitoring) { Monitoring->RegisterCountersPage("counters", "Counters", Counters); @@ -1783,10 +1778,10 @@ void TKikimrRunner::KikimrStop(bool graceful) { if (enableReleaseNodeNameOnGracefulShutdown) { using namespace NKikimr::NNodeBroker; using TEvent = TEvNodeBroker::TEvGracefulShutdownRequest; - + const ui32 nodeId = ActorSystem->NodeId; bool isDynamicNode = AppData->DynamicNameserviceConfig->MinDynamicNodeId <= nodeId; - + if (isDynamicNode) { NTabletPipe::TClientConfig pipeConfig; pipeConfig.RetryPolicy = {.RetryLimitCount = 10}; diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index ed8ab1c05d24..af00bc662093 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include @@ -839,7 +839,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { MonPort = TPortManager().GetPort(); Counters = new NMonitoring::TDynamicCounters(); - Monitoring.Reset(new NActors::TSyncHttpMon({ + Monitoring.Reset(new NActors::TMon({ .Port = MonPort, .Address = "127.0.0.1", .Threads = 3, @@ -847,7 +847,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { .Host = "127.0.0.1", })); Monitoring->RegisterCountersPage("counters", "Counters", Counters); - Monitoring->Start(); + Monitoring->Start(ActorRuntime->GetAnyNodeActorSystem()); Sleep(TDuration::Seconds(1)); diff --git a/ydb/core/mon/async_http_mon.cpp b/ydb/core/mon/async_http_mon.cpp deleted file mode 100644 index b7a92e6dfacf..000000000000 --- a/ydb/core/mon/async_http_mon.cpp +++ /dev/null @@ -1,941 +0,0 @@ -#include "async_http_mon.h" -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include - -#include -#include - -#include "mon_impl.h" - -namespace NActors { - -struct TEvMon { - enum { - EvMonitoringRequest = NActors::NMon::HttpInfo + 10, - EvMonitoringResponse, - End - }; - - static_assert(EvMonitoringRequest > NMon::End, "expect EvMonitoringRequest > NMon::End"); - static_assert(End < EventSpaceEnd(NActors::TEvents::ES_MON), "expect End < EventSpaceEnd(NActors::TEvents::ES_MON)"); - - struct TEvMonitoringRequest : TEventPB { - TEvMonitoringRequest() = default; - }; - - struct TEvMonitoringResponse : TEventPB { - TEvMonitoringResponse() = default; - }; -}; - -// compatibility layer -class THttpMonRequest : public NMonitoring::IMonHttpRequest { -public: - NHttp::THttpIncomingRequestPtr Request; - TStringStream& Response; - NMonitoring::IMonPage* Page; - TString PathInfo; - mutable std::unique_ptr Headers; - mutable std::unique_ptr Params; - mutable std::unique_ptr PostParams; - - THttpMonRequest(NHttp::THttpIncomingRequestPtr request, TStringStream& response, NMonitoring::IMonPage* page, const TString& pathInfo) - : Request(request) - , Response(response) - , Page(page) - , PathInfo(pathInfo) - { - } - - static TStringBuf GetPathFromUrl(TStringBuf url) { - return url.Before('?'); - } - - static TStringBuf GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) { - TString path = GetPageFullPath(page); - url.SkipPrefix(path); - return GetPathFromUrl(url); - } - - virtual IOutputStream& Output() override { - return Response; - } - - virtual HTTP_METHOD GetMethod() const override { - if (Request->Method == "GET") { - return HTTP_METHOD_GET; - } - if (Request->Method == "OPTIONS") { - return HTTP_METHOD_OPTIONS; - } - if (Request->Method == "POST") { - return HTTP_METHOD_POST; - } - if (Request->Method == "HEAD") { - return HTTP_METHOD_HEAD; - } - if (Request->Method == "PUT") { - return HTTP_METHOD_PUT; - } - if (Request->Method == "DELETE") { - return HTTP_METHOD_DELETE; - } - return HTTP_METHOD_UNDEFINED; - } - - virtual TStringBuf GetPath() const override { - return GetPathFromUrl(Request->URL); - } - - virtual TStringBuf GetPathInfo() const override { - return PathInfo; - } - - virtual TStringBuf GetUri() const override { - return Request->URL; - } - - virtual const TCgiParameters& GetParams() const override { - if (!Params) { - Params = std::make_unique(Request->URL.After('?')); - } - return *Params; - } - - virtual const TCgiParameters& GetPostParams() const override { - if (!PostParams) { - PostParams = std::make_unique(Request->Body); - } - return *PostParams; - } - - virtual TStringBuf GetPostContent() const override { - return Request->Body; - } - - virtual const THttpHeaders& GetHeaders() const override { - if (!Headers) { - TString strHeaders(Request->Headers); - TStringInput headers(strHeaders); - Headers = std::make_unique(&headers); - } - return *Headers; - } - - virtual TStringBuf GetHeader(TStringBuf name) const override { - auto header = GetHeaders().FindHeader(name); - if (header) { - return header->Value(); - } - return {}; - } - - bool AcceptsJsonResponse() { - TStringBuf acceptHeader = GetHeader("Accept"); - return acceptHeader.find(TStringBuf("application/json")) != TStringBuf::npos; - } - - virtual TStringBuf GetCookie(TStringBuf name) const override { - NHttp::TCookies cookies(GetHeader("Cookie")); - return cookies.Get(name); - } - - virtual TString GetRemoteAddr() const override { - if (Request->Address) { - return Request->Address->ToString(); - } - return {}; - } - - virtual TString GetServiceTitle() const override { - return {}; - } - - virtual NMonitoring::IMonPage* GetPage() const override { - return Page; - } - - virtual IMonHttpRequest* MakeChild(NMonitoring::IMonPage* page, const TString& pathInfo) const override { - return new THttpMonRequest(Request, Response, page, pathInfo); - } -}; - -// container for legacy requests -class THttpMonRequestContainer : public TStringStream, public THttpMonRequest { -public: - THttpMonRequestContainer(NHttp::THttpIncomingRequestPtr request, NMonitoring::IMonPage* index) - : THttpMonRequest(request, *this, index, TString(GetPathInfoFromUrl(index, request->URL))) - { - } -}; - -// handles actor communication -class THttpMonLegacyActorRequest : public TActorBootstrapped { -public: - NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; - THttpMonRequestContainer Container; - TIntrusivePtr ActorMonPage; - - THttpMonLegacyActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TIntrusivePtr actorMonPage) - : Event(std::move(event)) - , Container(Event->Get()->Request, actorMonPage.Get()) - , ActorMonPage(actorMonPage) - {} - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_REQUEST; - } - - void Bootstrap() { - if (Event->Get()->Request->Method == "OPTIONS") { - return ReplyOptionsAndPassAway(); - } - Become(&THttpMonLegacyActorRequest::StateFunc); - if (ActorMonPage->Authorizer) { - NActors::IEventHandle* handle = ActorMonPage->Authorizer(SelfId(), Container); - if (handle) { - TActivationContext::Send(handle); - return; - } - } - SendRequest(); - } - void ReplyWith(NHttp::THttpOutgoingResponsePtr response) { - if (response->Status.StartsWith("2")) { - TString url(Event->Get()->Request->URL.Before('?')); - TString status(response->Status); - NMonitoring::THistogramPtr ResponseTimeHgram = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters, - ActorMonPage->MonServiceName) - ->GetSubgroup("subsystem", "mon") - ->GetSubgroup("url", url) - ->GetSubgroup("status", status) - ->GetHistogram("ResponseTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1)); - ResponseTimeHgram->Collect(Event->Get()->Request->Timer.Passed() * 1000); - } - Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - } - - void ReplyOptionsAndPassAway() { - NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; - TString url(request->URL.Before('?')); - TString type = mimetypeByExt(url.data()); - if (type.empty()) { - type = "application/json"; - } - NHttp::THeaders headers(request->Headers); - TString origin = TString(headers["Origin"]); - if (origin.empty()) { - origin = "*"; - } - TStringBuilder response; - response << "HTTP/1.1 204 No Content\r\n" - "Access-Control-Allow-Origin: " << origin << "\r\n" - "Access-Control-Allow-Credentials: true\r\n" - "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept,X-Trace-Verbosity,X-Want-Trace,traceparent\r\n" - "Access-Control-Expose-Headers: traceresponse,X-Worker-Name\r\n" - "Access-Control-Allow-Methods: OPTIONS,GET,POST,PUT,DELETE\r\n" - "Content-Type: " << type << "\r\n" - "Connection: keep-alive\r\n\r\n"; - ReplyWith(request->CreateResponseString(response)); - PassAway(); - } - - bool CredentialsProvided() { - return Container.GetCookie("ydb_session_id") || Container.GetHeader("Authorization"); - } - - TString YdbToHttpError(Ydb::StatusIds::StatusCode status) { - switch (status) { - case Ydb::StatusIds::UNAUTHORIZED: - // YDB status UNAUTHORIZED is used for both access denied case and if no credentials were provided. - return CredentialsProvided() ? "403 Forbidden" : "401 Unauthorized"; - case Ydb::StatusIds::INTERNAL_ERROR: - return "500 Internal Server Error"; - case Ydb::StatusIds::UNAVAILABLE: - return "503 Service Unavailable"; - case Ydb::StatusIds::OVERLOADED: - return "429 Too Many Requests"; - case Ydb::StatusIds::TIMEOUT: - return "408 Request Timeout"; - case Ydb::StatusIds::PRECONDITION_FAILED: - return "412 Precondition Failed"; - default: - return "400 Bad Request"; - } - } - - void ReplyErrorAndPassAway(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) { - ReplyErrorAndPassAway(result.Status, result.Issues, true); - } - - void ReplyErrorAndPassAway(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, bool addAccessControlHeaders) { - NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; - TStringBuilder response; - TStringBuilder body; - TStringBuf contentType; - const TString httpError = YdbToHttpError(status); - - if (Container.AcceptsJsonResponse()) { - contentType = "application/json"; - NJson::TJsonValue json; - TString message; - MakeJsonErrorReply(json, message, issues, NYdb::EStatus(status)); - NJson::WriteJson(&body.Out, &json); - } else { - contentType = "text/html"; - body << "

" << httpError << "

"; - if (issues) { - body << "

" << issues.ToString() << "

"; - } - body << ""; - } - - response << "HTTP/1.1 " << httpError << "\r\n"; - if (addAccessControlHeaders) { - NHttp::THeaders headers(request->Headers); - TString origin = TString(headers["Origin"]); - if (origin.empty()) { - origin = "*"; - } - response << "Access-Control-Allow-Origin: " << origin << "\r\n"; - response << "Access-Control-Allow-Credentials: true\r\n"; - response << "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept\r\n"; - response << "Access-Control-Allow-Methods: OPTIONS, GET, POST, PUT, DELETE\r\n"; - } - - response << "Content-Type: " << contentType << "\r\n"; - response << "Content-Length: " << body.size() << "\r\n"; - response << "\r\n"; - response << body; - ReplyWith(request->CreateResponseString(response)); - PassAway(); - } - - void ReplyForbiddenAndPassAway(const TString& error = {}) { - NYql::TIssues issues; - issues.AddIssue(error); - ReplyErrorAndPassAway(Ydb::StatusIds::UNAUTHORIZED, issues, false); - } - - void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr) { - NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; - if (ActorMonPage->Authorizer) { - TString user = (result && result->UserToken) ? result->UserToken->GetUserSID() : "anonymous"; - LOG_NOTICE_S(*TlsActivationContext, NActorsServices::HTTP, - (request->Address ? request->Address->ToString() : "") - << " " << user - << " " << request->Method - << " " << request->URL); - } - TString serializedToken; - if (result && result->UserToken) { - serializedToken = result->UserToken->GetSerializedToken(); - } - Send(ActorMonPage->TargetActorId, new NMon::TEvHttpInfo( - Container, serializedToken), IEventHandle::FlagTrackDelivery); - } - - void HandleUndelivered(TEvents::TEvUndelivered::TPtr&) { - NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; - ReplyWith(request->CreateResponseServiceUnavailable( - TStringBuilder() << "Actor " << ActorMonPage->TargetActorId << " is not available")); - PassAway(); - } - - void HandleResponse(NMon::IEvHttpInfoRes::TPtr& ev) { - if (ev->Get()->GetContentType() == NMon::IEvHttpInfoRes::Html) { - THtmlResultMonPage resultPage(ActorMonPage->Path, ActorMonPage->Title, ActorMonPage->Host, ActorMonPage->PreTag, *(ev->Get())); - resultPage.Parent = ActorMonPage->Parent; - resultPage.Output(Container); - } else { - ev->Get()->Output(Container); - } - ReplyWith(Event->Get()->Request->CreateResponseString(Container.Str())); - PassAway(); - } - - void Handle(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult::TPtr& ev) { - const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result(*ev->Get()); - if (result.Status != Ydb::StatusIds::SUCCESS) { - return ReplyErrorAndPassAway(result); - } - bool found = false; - if (result.UserToken) { - for (const TString& sid : ActorMonPage->AllowedSIDs) { - if (result.UserToken->IsExist(sid)) { - found = true; - break; - } - } - } - if (found || ActorMonPage->AllowedSIDs.empty() || !result.UserToken) { - SendRequest(&result); - } else { - return ReplyForbiddenAndPassAway("SID is not allowed"); - } - } - - STATEFN(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvents::TEvUndelivered, HandleUndelivered); - hFunc(NMon::IEvHttpInfoRes, HandleResponse); - hFunc(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle); - } - } -}; - -// handles all indexes and static data in synchronous way -class THttpMonLegacyIndexRequest : public TActorBootstrapped { -public: - NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; - THttpMonRequestContainer Container; - - THttpMonLegacyIndexRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, NMonitoring::IMonPage* index) - : Event(std::move(event)) - , Container(Event->Get()->Request, index) - {} - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_REQUEST; - } - - void Bootstrap() { - ProcessRequest(); - } - - void ProcessRequest() { - Container.Page->Output(Container); - NHttp::THttpOutgoingResponsePtr response = Event->Get()->Request->CreateResponseString(Container.Str()); - Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - PassAway(); - } -}; - -// receives all requests for one actor page and converts them to request-actors -class THttpMonServiceLegacyActor : public TActorBootstrapped { -public: - THttpMonServiceLegacyActor(TIntrusivePtr actorMonPage) - : ActorMonPage(std::move(actorMonPage)) - { - } - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_SERVICE; - } - - void Bootstrap() { - Become(&THttpMonServiceLegacyActor::StateWork); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { - Register(new THttpMonLegacyActorRequest(std::move(ev), ActorMonPage)); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - - TIntrusivePtr ActorMonPage; -}; - -// receives everyhing not related to actor communcation, converts them to request-actors -class THttpMonServiceLegacyIndex : public TActor { -public: - THttpMonServiceLegacyIndex(TIntrusivePtr indexMonPage, const TString& redirectRoot = {}) - : TActor(&THttpMonServiceLegacyIndex::StateWork) - , IndexMonPage(std::move(indexMonPage)) - , RedirectRoot(redirectRoot) - { - } - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_SERVICE; - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { - bool redirect = false; - if (RedirectRoot && ev->Get()->Request->URL == "/") { - NHttp::THeaders headers(ev->Get()->Request->Headers); - if (!headers.Has("Referer")) { - redirect = true; - } - } - if (redirect) { - TStringBuilder response; - response << "HTTP/1.1 302 Found\r\nLocation: " << RedirectRoot << "\r\n\r\n"; - Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); - return; - } else if (!ev->Get()->Request->URL.ends_with("/") && ev->Get()->Request->URL.find('?') == TStringBuf::npos) { - TString url(ev->Get()->Request->URL); - bool index = false; - auto itPage = IndexPages.find(url); - if (itPage == IndexPages.end()) { - auto page = IndexMonPage->FindPageByAbsolutePath(url); - if (page) { - index = page->IsIndex(); - IndexPages[url] = index; - } - } else { - index = itPage->second; - } - if (index) { - TStringBuilder response; - auto p = url.rfind('/'); - if (p != TString::npos) { - url = url.substr(p + 1); - } - url += '/'; - response << "HTTP/1.1 302 Found\r\nLocation: " << url << "\r\n\r\n"; - Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); - return; - } - } - Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get())); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - - TIntrusivePtr IndexMonPage; - TString RedirectRoot; - std::unordered_map IndexPages; -}; - -inline TActorId MakeNodeProxyId(ui32 node) { - char x[12] = "nodeproxy"; - return TActorId(node, TStringBuf(x, 12)); -} - -class THttpMonServiceNodeRequest : public TActorBootstrapped { -public: - std::shared_ptr Endpoint; - TEvMon::TEvMonitoringRequest::TPtr Event; - TActorId HttpProxyActorId; - - THttpMonServiceNodeRequest(std::shared_ptr endpoint, TEvMon::TEvMonitoringRequest::TPtr event, TActorId httpProxyActorId) - : Endpoint(std::move(endpoint)) - , Event(std::move(event)) - , HttpProxyActorId(httpProxyActorId) - {} - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_REQUEST; - } - - static void FromProto(NHttp::THttpConfig::SocketAddressType& address, const NKikimrMonProto::TSockAddr& proto) { - switch (proto.GetFamily()) { - case AF_INET: - address = std::make_shared(proto.GetAddress().data(), proto.GetPort()); - break; - case AF_INET6: - address = std::make_shared(proto.GetAddress().data(), proto.GetPort()); - break; - } - } - - TString RewriteWithForwardedFromNode(const TString& response) { - NHttp::THttpParser parser(response); - - NHttp::THeadersBuilder headers(parser.Headers); - headers.Set("X-Forwarded-From-Node", TStringBuilder() << Event->Sender.NodeId()); - - NHttp::THttpRenderer renderer; - renderer.InitRequest(parser.Method, parser.URL, parser.Protocol, parser.Version); - renderer.Set(headers); - if (parser.HaveBody()) { - renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea - } - renderer.Finish(); - return renderer.AsString(); - } - - void Bootstrap() { - NHttp::THttpConfig::SocketAddressType address; - FromProto(address, Event->Get()->Record.GetAddress()); - NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(RewriteWithForwardedFromNode(Event->Get()->Record.GetHttpRequest()), Endpoint, address); - TStringBuilder prefix; - prefix << "/node/" << TActivationContext::ActorSystem()->NodeId; - if (request->URL.SkipPrefix(prefix)) { - Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(std::move(request))); - Become(&THttpMonServiceNodeRequest::StateWork); - } else { - auto response = std::make_unique(); - auto httpResponse = request->CreateResponseBadRequest(); - response->Record.SetHttpResponse(httpResponse->AsString()); - Send(Event->Sender, response.release(), 0, Event->Cookie); - PassAway(); - } - } - - TString RewriteLocationWithNode(const TString& response) { - NHttp::THttpParser parser(response); - - NHttp::THeadersBuilder headers(parser.Headers); - headers.Set("Location", TStringBuilder() << "/node/" << TActivationContext::ActorSystem()->NodeId << headers["Location"]); - - NHttp::THttpRenderer renderer; - renderer.InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); - renderer.Set(headers); - if (parser.HaveBody()) { - renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea - } - renderer.Finish(); - return renderer.AsString(); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) { - TString httpResponse = ev->Get()->Response->AsString(); - switch (FromStringWithDefault(ev->Get()->Response->Status)) { - case 301: - case 303: - case 307: - case 308: - if (!NHttp::THeaders(ev->Get()->Response->Headers).Get("Location").starts_with("/node/")) { - httpResponse = RewriteLocationWithNode(httpResponse); - } - break; - } - auto response = std::make_unique(); - response->Record.SetHttpResponse(httpResponse); - Send(Event->Sender, response.release(), 0, Event->Cookie); - PassAway(); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); - } - } -}; - -class THttpMonServiceMonRequest : public TActorBootstrapped { -public: - NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; - ui32 NodeId; - - THttpMonServiceMonRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, ui32 nodeId) - : Event(std::move(event)) - , NodeId(nodeId) - {} - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_SERVICE_MON_REQUEST; - } - - static void ToProto(NKikimrMonProto::TSockAddr& proto, const NHttp::THttpConfig::SocketAddressType& address) { - if (address) { - switch (address->SockAddr()->sa_family) { - case AF_INET: { - proto.SetFamily(AF_INET); - sockaddr_in* addr = (sockaddr_in*)address->SockAddr(); - char ip[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, (void*)&addr->sin_addr, ip, INET_ADDRSTRLEN); - proto.SetAddress(ip); - proto.SetPort(htons(addr->sin_port)); - } - break; - case AF_INET6: { - proto.SetFamily(AF_INET6); - sockaddr_in6* addr = (sockaddr_in6*)address->SockAddr(); - char ip6[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET6, (void*)&addr->sin6_addr, ip6, INET6_ADDRSTRLEN); - proto.SetAddress(ip6); - proto.SetPort(htons(addr->sin6_port)); - } - break; - } - } - } - - void Bootstrap() { - TActorId monServiceNodeProxy = MakeNodeProxyId(NodeId); - auto request = std::make_unique(); - request->Record.SetHttpRequest(Event->Get()->Request->AsString()); - ToProto(*request->Record.MutableAddress(), Event->Get()->Request->Address); - Send(monServiceNodeProxy, request.release(), IEventHandle::FlagTrackDelivery); - Become(&THttpMonServiceMonRequest::StateWork); - } - - void Handle(TEvents::TEvUndelivered::TPtr& ev) { - TString reason; - switch (ev->Get()->Reason) { - case TEvents::TEvUndelivered::ReasonUnknown: - reason = "ReasonUnknown"; - break; - case TEvents::TEvUndelivered::ReasonActorUnknown: - reason = "ReasonActorUnknown"; - break; - case TEvents::TEvUndelivered::Disconnected: - reason = "Disconnected"; - break; - } - Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseServiceUnavailable(reason)), 0, Event->Cookie); - PassAway(); - } - - void Handle(TEvMon::TEvMonitoringResponse::TPtr& ev) { - TString responseTxt = ev->Get()->Record.GetHttpResponse(); - NHttp::THttpOutgoingResponsePtr responseObj = Event->Get()->Request->CreateResponseString(responseTxt); - if (responseObj->Status == "301" || responseObj->Status == "302") { - NHttp::THttpParser parser(responseTxt); - NHttp::THeadersBuilder headers(parser.Headers); - if (headers["Location"].starts_with('/')) { - NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(Event->Get()->Request); - response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); - - headers.Set("Location", TStringBuilder() << "/node/" << NodeId << headers["Location"]); - - response->Set(headers); - if (parser.HaveBody()) { - response->SetBody(parser.Body); - } - responseObj = response; - } - } - - Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(responseObj.Release()), 0, Event->Cookie); - PassAway(); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvMon::TEvMonitoringResponse, Handle); - hFunc(TEvents::TEvUndelivered, Handle); - } - } -}; - -// receives requests to another nodes -class THttpMonServiceNodeProxy : public TActor { -public: - THttpMonServiceNodeProxy(TActorId httpProxyActorId) - : TActor(&THttpMonServiceNodeProxy::StateWork) - , HttpProxyActorId(httpProxyActorId) - , Endpoint(std::make_shared()) - { - } - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_PROXY; - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { - TStringBuf url = ev->Get()->Request->URL; - TStringBuf node; - ui32 nodeId; - if (url.SkipPrefix("/node/") && url.NextTok('/', node) && TryFromStringWithDefault(node, nodeId)) { - Register(new THttpMonServiceMonRequest(std::move(ev), nodeId)); - return; - } - Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseBadRequest("bad request")), 0, ev->Cookie); - } - - void Handle(TEvMon::TEvMonitoringRequest::TPtr& ev) { - Register(new THttpMonServiceNodeRequest(Endpoint, ev, HttpProxyActorId)); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - hFunc(TEvMon::TEvMonitoringRequest, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - -protected: - TActorId HttpProxyActorId; - std::shared_ptr Endpoint; -}; - -TAsyncHttpMon::TAsyncHttpMon(TConfig config) - : Config(std::move(config)) - , IndexMonPage(new NMonitoring::TIndexMonPage("", Config.Title)) -{ -} - -void TAsyncHttpMon::Start(TActorSystem* actorSystem) { - if (actorSystem) { - TGuard g(Mutex); - ActorSystem = actorSystem; - Register(new TIndexRedirectMonPage(IndexMonPage)); - Register(new NMonitoring::TVersionMonPage); - Register(new NMonitoring::TBootstrapCssMonPage); - Register(new NMonitoring::TTablesorterCssMonPage); - Register(new NMonitoring::TBootstrapJsMonPage); - Register(new NMonitoring::TJQueryJsMonPage); - Register(new NMonitoring::TTablesorterJsMonPage); - Register(new NMonitoring::TBootstrapFontsEotMonPage); - Register(new NMonitoring::TBootstrapFontsSvgMonPage); - Register(new NMonitoring::TBootstrapFontsTtfMonPage); - Register(new NMonitoring::TBootstrapFontsWoffMonPage); - NLwTraceMonPage::RegisterPages(IndexMonPage.Get()); - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER)); - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER)); - HttpProxyActorId = ActorSystem->Register( - NHttp::CreateHttpProxy(), - TMailboxType::ReadAsFilled, - ActorSystem->AppData()->UserPoolId); - HttpMonServiceActorId = ActorSystem->Register( - new THttpMonServiceLegacyIndex(IndexMonPage, Config.RedirectMainPageTo), - TMailboxType::ReadAsFilled, - ActorSystem->AppData()->UserPoolId); - auto nodeProxyActorId = ActorSystem->Register( - new THttpMonServiceNodeProxy(HttpProxyActorId), - TMailboxType::ReadAsFilled, - ActorSystem->AppData()->UserPoolId); - NodeProxyServiceActorId = MakeNodeProxyId(ActorSystem->NodeId); - ActorSystem->RegisterLocalService(NodeProxyServiceActorId, nodeProxyActorId); - - TStringBuilder workerName; - workerName << FQDNHostName() << ":" << Config.Port; - auto addPort = std::make_unique(); - addPort->Port = Config.Port; - addPort->WorkerName = workerName; - addPort->Address = Config.Address; - addPort->CompressContentTypes = { - "text/plain", - "text/html", - "text/css", - "text/javascript", - "application/javascript", - "application/json", - "application/yaml", - }; - addPort->SslCertificatePem = Config.Certificate; - addPort->Secure = !Config.Certificate.empty(); - addPort->MaxRequestsPerSecond = Config.MaxRequestsPerSecond; - ActorSystem->Send(HttpProxyActorId, addPort.release()); - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId)); - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/node", NodeProxyServiceActorId)); - for (auto& pageInfo : ActorMonPages) { - if (pageInfo.Page) { - RegisterActorMonPage(pageInfo); - } else if (pageInfo.Handler) { - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, pageInfo.Handler)); - } - } - ActorMonPages.clear(); - } -} - -void TAsyncHttpMon::Stop() { - IndexMonPage->ClearPages(); // it's required to avoid loop-reference - if (ActorSystem) { - TGuard g(Mutex); - for (const auto& [path, actorId] : ActorServices) { - ActorSystem->Send(actorId, new TEvents::TEvPoisonPill); - } - ActorSystem->Send(NodeProxyServiceActorId, new TEvents::TEvPoisonPill); - ActorSystem->Send(HttpMonServiceActorId, new TEvents::TEvPoisonPill); - ActorSystem->Send(HttpProxyActorId, new TEvents::TEvPoisonPill); - ActorSystem = nullptr; - } -} - -void TAsyncHttpMon::Register(NMonitoring::IMonPage* page) { - IndexMonPage->Register(page); -} - -NMonitoring::TIndexMonPage* TAsyncHttpMon::RegisterIndexPage(const TString& path, const TString& title) { - auto page = IndexMonPage->RegisterIndexPage(path, title); - IndexMonPage->SortPages(); - return page; -} - -void TAsyncHttpMon::RegisterActorMonPage(const TActorMonPageInfo& pageInfo) { - if (ActorSystem) { - TActorMonPage* actorMonPage = static_cast(pageInfo.Page.Get()); - auto& actorId = ActorServices[pageInfo.Path]; - if (actorId) { - ActorSystem->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); - } - actorId = ActorSystem->Register( - new THttpMonServiceLegacyActor(actorMonPage), - TMailboxType::ReadAsFilled, - ActorSystem->AppData()->UserPoolId); - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, actorId)); - } -} - -NMonitoring::IMonPage* TAsyncHttpMon::RegisterActorPage(TRegisterActorPageFields fields) { - TGuard g(Mutex); - NMonitoring::TMonPagePtr page = new TActorMonPage( - fields.RelPath, - fields.Title, - Config.Host, - fields.PreTag, - fields.ActorSystem, - fields.ActorId, - fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs, - fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(), - fields.MonServiceName); - if (fields.Index) { - fields.Index->Register(page); - if (fields.SortPages) { - fields.Index->SortPages(); - } - } else { - Register(page.Get()); - } - - TActorMonPageInfo pageInfo = { - .Page = page, - .Path = GetPageFullPath(page.Get()), - }; - - if (ActorSystem && HttpProxyActorId) { - RegisterActorMonPage(pageInfo); - } else { - ActorMonPages.emplace_back(pageInfo); - } - - return page.Get(); -} - -NMonitoring::IMonPage* TAsyncHttpMon::RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { - TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters); - page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore); - Register(page); - return page; -} - -void TAsyncHttpMon::RegisterHandler(const TString& path, const TActorId& handler) { - if (ActorSystem) { - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(path, handler)); - } else { - TGuard g(Mutex); - ActorMonPages.emplace_back(TActorMonPageInfo{ - .Handler = handler, - .Path = path, - }); - } -} - -NMonitoring::IMonPage* TAsyncHttpMon::FindPage(const TString& relPath) { - return IndexMonPage->FindPage(relPath); -} - -} diff --git a/ydb/core/mon/async_http_mon.h b/ydb/core/mon/async_http_mon.h deleted file mode 100644 index 0f0abc1ec78c..000000000000 --- a/ydb/core/mon/async_http_mon.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "mon.h" - -namespace NActors { - -class TAsyncHttpMon : public TMon { -public: - TAsyncHttpMon(TConfig config); - - void Start(TActorSystem* actorSystem) override; - void Stop() override; - - void Register(NMonitoring::IMonPage* page) override; - NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) override; - NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) override; - NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) override; - NMonitoring::IMonPage* FindPage(const TString& relPath) override; - void RegisterHandler(const TString& path, const TActorId& handler) override; - -protected: - TConfig Config; - TIntrusivePtr IndexMonPage; - TActorSystem* ActorSystem = {}; - TActorId HttpProxyActorId; - TActorId HttpMonServiceActorId; - TActorId NodeProxyServiceActorId; - - struct TActorMonPageInfo { - NMonitoring::TMonPagePtr Page; - TActorId Handler; - TString Path; - }; - - TMutex Mutex; - std::vector ActorMonPages; - THashMap ActorServices; - - void RegisterActorMonPage(const TActorMonPageInfo& pageInfo); -}; - -} // NActors diff --git a/ydb/core/mon/mon.cpp b/ydb/core/mon/mon.cpp index 311132a04fda..e6926d9782a8 100644 --- a/ydb/core/mon/mon.cpp +++ b/ydb/core/mon/mon.cpp @@ -1,24 +1,55 @@ #include "mon.h" - +#include +#include #include -#include #include +#include -#include - -#include #include +#include #include +#include +#include +#include +#include -#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "mon_impl.h" namespace NActors { -using namespace NMonitoring; -using namespace NKikimr; +struct TEvMon { + enum { + EvMonitoringRequest = NActors::NMon::HttpInfo + 10, + EvMonitoringResponse, + End + }; + + static_assert(EvMonitoringRequest > NMon::End, "expect EvMonitoringRequest > NMon::End"); + static_assert(End < EventSpaceEnd(NActors::TEvents::ES_MON), "expect End < EventSpaceEnd(NActors::TEvents::ES_MON)"); + + struct TEvMonitoringRequest : TEventPB { + TEvMonitoringRequest() = default; + }; + + struct TEvMonitoringResponse : TEventPB { + TEvMonitoringResponse() = default; + }; +}; namespace { +using namespace NKikimr; + bool HasJsonContent(NMonitoring::IMonHttpRequest& request) { const TStringBuf header = request.GetHeader("Content-Type"); return header.empty() || AsciiEqualsIgnoreCase(header, "application/json"); // by default we will try to parse json, no error will be generated if parsing fails @@ -29,9 +60,8 @@ TString GetDatabase(NMonitoring::IMonHttpRequest& request) { return dbIt->second; } if (request.GetMethod() == HTTP_METHOD_POST && HasJsonContent(request)) { - static NJson::TJsonReaderConfig JsonConfig; NJson::TJsonValue requestData; - if (NJson::ReadJsonTree(request.GetPostContent(), &JsonConfig, &requestData)) { + if (NJson::ReadJsonTree(request.GetPostContent(), &requestData)) { return requestData["database"].GetString(); // empty if not string or no such key } } @@ -153,4 +183,933 @@ NActors::IEventHandle* TMon::DefaultAuthorizer(const NActors::TActorId& owner, N return GetAuthorizeTicketResult(owner); } +// compatibility layer +class THttpMonRequest : public NMonitoring::IMonHttpRequest { +public: + NHttp::THttpIncomingRequestPtr Request; + TStringStream& Response; + NMonitoring::IMonPage* Page; + TString PathInfo; + mutable std::unique_ptr Headers; + mutable std::unique_ptr Params; + mutable std::unique_ptr PostParams; + + THttpMonRequest(NHttp::THttpIncomingRequestPtr request, TStringStream& response, NMonitoring::IMonPage* page, const TString& pathInfo) + : Request(request) + , Response(response) + , Page(page) + , PathInfo(pathInfo) + { + } + + static TStringBuf GetPathFromUrl(TStringBuf url) { + return url.Before('?'); + } + + static TStringBuf GetPathInfoFromUrl(NMonitoring::IMonPage* page, TStringBuf url) { + TString path = GetPageFullPath(page); + url.SkipPrefix(path); + return GetPathFromUrl(url); + } + + virtual IOutputStream& Output() override { + return Response; + } + + virtual HTTP_METHOD GetMethod() const override { + if (Request->Method == "GET") { + return HTTP_METHOD_GET; + } + if (Request->Method == "OPTIONS") { + return HTTP_METHOD_OPTIONS; + } + if (Request->Method == "POST") { + return HTTP_METHOD_POST; + } + if (Request->Method == "HEAD") { + return HTTP_METHOD_HEAD; + } + if (Request->Method == "PUT") { + return HTTP_METHOD_PUT; + } + if (Request->Method == "DELETE") { + return HTTP_METHOD_DELETE; + } + return HTTP_METHOD_UNDEFINED; + } + + virtual TStringBuf GetPath() const override { + return GetPathFromUrl(Request->URL); + } + + virtual TStringBuf GetPathInfo() const override { + return PathInfo; + } + + virtual TStringBuf GetUri() const override { + return Request->URL; + } + + virtual const TCgiParameters& GetParams() const override { + if (!Params) { + Params = std::make_unique(Request->URL.After('?')); + } + return *Params; + } + + virtual const TCgiParameters& GetPostParams() const override { + if (!PostParams) { + PostParams = std::make_unique(Request->Body); + } + return *PostParams; + } + + virtual TStringBuf GetPostContent() const override { + return Request->Body; + } + + virtual const THttpHeaders& GetHeaders() const override { + if (!Headers) { + TString strHeaders(Request->Headers); + TStringInput headers(strHeaders); + Headers = std::make_unique(&headers); + } + return *Headers; + } + + virtual TStringBuf GetHeader(TStringBuf name) const override { + auto header = GetHeaders().FindHeader(name); + if (header) { + return header->Value(); + } + return {}; + } + + bool AcceptsJsonResponse() { + TStringBuf acceptHeader = GetHeader("Accept"); + return acceptHeader.find(TStringBuf("application/json")) != TStringBuf::npos; + } + + virtual TStringBuf GetCookie(TStringBuf name) const override { + NHttp::TCookies cookies(GetHeader("Cookie")); + return cookies.Get(name); + } + + virtual TString GetRemoteAddr() const override { + if (Request->Address) { + return Request->Address->ToString(); + } + return {}; + } + + virtual TString GetServiceTitle() const override { + return {}; + } + + virtual NMonitoring::IMonPage* GetPage() const override { + return Page; + } + + virtual IMonHttpRequest* MakeChild(NMonitoring::IMonPage* page, const TString& pathInfo) const override { + return new THttpMonRequest(Request, Response, page, pathInfo); + } +}; + +// container for legacy requests +class THttpMonRequestContainer : public TStringStream, public THttpMonRequest { +public: + THttpMonRequestContainer(NHttp::THttpIncomingRequestPtr request, NMonitoring::IMonPage* index) + : THttpMonRequest(request, *this, index, TString(GetPathInfoFromUrl(index, request->URL))) + { + } +}; + +// handles actor communication +class THttpMonLegacyActorRequest : public TActorBootstrapped { +public: + NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; + THttpMonRequestContainer Container; + TIntrusivePtr ActorMonPage; + + THttpMonLegacyActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TIntrusivePtr actorMonPage) + : Event(std::move(event)) + , Container(Event->Get()->Request, actorMonPage.Get()) + , ActorMonPage(actorMonPage) + {} + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_REQUEST; + } + + void Bootstrap() { + if (Event->Get()->Request->Method == "OPTIONS") { + return ReplyOptionsAndPassAway(); + } + Become(&THttpMonLegacyActorRequest::StateFunc); + if (ActorMonPage->Authorizer) { + NActors::IEventHandle* handle = ActorMonPage->Authorizer(SelfId(), Container); + if (handle) { + TActivationContext::Send(handle); + return; + } + } + SendRequest(); + } + void ReplyWith(NHttp::THttpOutgoingResponsePtr response) { + if (response->Status.StartsWith("2")) { + TString url(Event->Get()->Request->URL.Before('?')); + TString status(response->Status); + NMonitoring::THistogramPtr ResponseTimeHgram = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters, + ActorMonPage->MonServiceName) + ->GetSubgroup("subsystem", "mon") + ->GetSubgroup("url", url) + ->GetSubgroup("status", status) + ->GetHistogram("ResponseTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1)); + ResponseTimeHgram->Collect(Event->Get()->Request->Timer.Passed() * 1000); + } + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + } + + void ReplyOptionsAndPassAway() { + NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; + TString url(request->URL.Before('?')); + TString type = mimetypeByExt(url.data()); + if (type.empty()) { + type = "application/json"; + } + NHttp::THeaders headers(request->Headers); + TString origin = TString(headers["Origin"]); + if (origin.empty()) { + origin = "*"; + } + TStringBuilder response; + response << "HTTP/1.1 204 No Content\r\n" + "Access-Control-Allow-Origin: " << origin << "\r\n" + "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept,X-Trace-Verbosity,X-Want-Trace,traceparent\r\n" + "Access-Control-Expose-Headers: traceresponse,X-Worker-Name\r\n" + "Access-Control-Allow-Methods: OPTIONS,GET,POST,PUT,DELETE\r\n" + "Content-Type: " << type << "\r\n" + "Connection: keep-alive\r\n\r\n"; + ReplyWith(request->CreateResponseString(response)); + PassAway(); + } + + bool CredentialsProvided() { + return Container.GetCookie("ydb_session_id") || Container.GetHeader("Authorization"); + } + + TString YdbToHttpError(Ydb::StatusIds::StatusCode status) { + switch (status) { + case Ydb::StatusIds::UNAUTHORIZED: + // YDB status UNAUTHORIZED is used for both access denied case and if no credentials were provided. + return CredentialsProvided() ? "403 Forbidden" : "401 Unauthorized"; + case Ydb::StatusIds::INTERNAL_ERROR: + return "500 Internal Server Error"; + case Ydb::StatusIds::UNAVAILABLE: + return "503 Service Unavailable"; + case Ydb::StatusIds::OVERLOADED: + return "429 Too Many Requests"; + case Ydb::StatusIds::TIMEOUT: + return "408 Request Timeout"; + case Ydb::StatusIds::PRECONDITION_FAILED: + return "412 Precondition Failed"; + default: + return "400 Bad Request"; + } + } + + void ReplyErrorAndPassAway(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) { + ReplyErrorAndPassAway(result.Status, result.Issues, true); + } + + void ReplyErrorAndPassAway(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, bool addAccessControlHeaders) { + NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; + TStringBuilder response; + TStringBuilder body; + TStringBuf contentType; + const TString httpError = YdbToHttpError(status); + + if (Container.AcceptsJsonResponse()) { + contentType = "application/json"; + NJson::TJsonValue json; + TString message; + MakeJsonErrorReply(json, message, issues, NYdb::EStatus(status)); + NJson::WriteJson(&body.Out, &json); + } else { + contentType = "text/html"; + body << "

" << httpError << "

"; + if (issues) { + body << "

" << issues.ToString() << "

"; + } + body << ""; + } + + response << "HTTP/1.1 " << httpError << "\r\n"; + if (addAccessControlHeaders) { + NHttp::THeaders headers(request->Headers); + TString origin = TString(headers["Origin"]); + if (origin.empty()) { + origin = "*"; + } + response << "Access-Control-Allow-Origin: " << origin << "\r\n"; + response << "Access-Control-Allow-Credentials: true\r\n"; + response << "Access-Control-Allow-Headers: Content-Type,Authorization,Origin,Accept\r\n"; + response << "Access-Control-Allow-Methods: OPTIONS, GET, POST, PUT, DELETE\r\n"; + } + + response << "Content-Type: " << contentType << "\r\n"; + response << "Content-Length: " << body.size() << "\r\n"; + response << "\r\n"; + response << body; + ReplyWith(request->CreateResponseString(response)); + PassAway(); + } + + void ReplyForbiddenAndPassAway(const TString& error = {}) { + NYql::TIssues issues; + issues.AddIssue(error); + ReplyErrorAndPassAway(Ydb::StatusIds::UNAUTHORIZED, issues, false); + } + + void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr) { + NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; + if (ActorMonPage->Authorizer) { + TString user = (result && result->UserToken) ? result->UserToken->GetUserSID() : "anonymous"; + LOG_NOTICE_S(*TlsActivationContext, NActorsServices::HTTP, + (request->Address ? request->Address->ToString() : "") + << " " << user + << " " << request->Method + << " " << request->URL); + } + TString serializedToken; + if (result && result->UserToken) { + serializedToken = result->UserToken->GetSerializedToken(); + } + Send(ActorMonPage->TargetActorId, new NMon::TEvHttpInfo( + Container, serializedToken), IEventHandle::FlagTrackDelivery); + } + + void HandleUndelivered(TEvents::TEvUndelivered::TPtr&) { + NHttp::THttpIncomingRequestPtr request = Event->Get()->Request; + ReplyWith(request->CreateResponseServiceUnavailable( + TStringBuilder() << "Actor " << ActorMonPage->TargetActorId << " is not available")); + PassAway(); + } + + void HandleResponse(NMon::IEvHttpInfoRes::TPtr& ev) { + if (ev->Get()->GetContentType() == NMon::IEvHttpInfoRes::Html) { + THtmlResultMonPage resultPage(ActorMonPage->Path, ActorMonPage->Title, ActorMonPage->Host, ActorMonPage->PreTag, *(ev->Get())); + resultPage.Parent = ActorMonPage->Parent; + resultPage.Output(Container); + } else { + ev->Get()->Output(Container); + } + ReplyWith(Event->Get()->Request->CreateResponseString(Container.Str())); + PassAway(); + } + + void Handle(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult::TPtr& ev) { + const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result(*ev->Get()); + if (result.Status != Ydb::StatusIds::SUCCESS) { + return ReplyErrorAndPassAway(result); + } + bool found = false; + if (result.UserToken) { + for (const TString& sid : ActorMonPage->AllowedSIDs) { + if (result.UserToken->IsExist(sid)) { + found = true; + break; + } + } + } + if (found || ActorMonPage->AllowedSIDs.empty() || !result.UserToken) { + SendRequest(&result); + } else { + return ReplyForbiddenAndPassAway("SID is not allowed"); + } + } + + STATEFN(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvUndelivered, HandleUndelivered); + hFunc(NMon::IEvHttpInfoRes, HandleResponse); + hFunc(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle); + } + } +}; + +// handles all indexes and static data in synchronous way +class THttpMonLegacyIndexRequest : public TActorBootstrapped { +public: + NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; + THttpMonRequestContainer Container; + + THttpMonLegacyIndexRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, NMonitoring::IMonPage* index) + : Event(std::move(event)) + , Container(Event->Get()->Request, index) + {} + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_REQUEST; + } + + void Bootstrap() { + ProcessRequest(); + } + + void ProcessRequest() { + Container.Page->Output(Container); + NHttp::THttpOutgoingResponsePtr response = Event->Get()->Request->CreateResponseString(Container.Str()); + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + PassAway(); + } +}; + +// receives all requests for one actor page and converts them to request-actors +class THttpMonServiceLegacyActor : public TActorBootstrapped { +public: + THttpMonServiceLegacyActor(TIntrusivePtr actorMonPage) + : ActorMonPage(std::move(actorMonPage)) + { + } + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_LEGACY_ACTOR_SERVICE; + } + + void Bootstrap() { + Become(&THttpMonServiceLegacyActor::StateWork); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { + Register(new THttpMonLegacyActorRequest(std::move(ev), ActorMonPage)); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + } + } + + TIntrusivePtr ActorMonPage; +}; + +// receives everyhing not related to actor communcation, converts them to request-actors +class THttpMonServiceLegacyIndex : public TActor { +public: + THttpMonServiceLegacyIndex(TIntrusivePtr indexMonPage, const TString& redirectRoot = {}) + : TActor(&THttpMonServiceLegacyIndex::StateWork) + , IndexMonPage(std::move(indexMonPage)) + , RedirectRoot(redirectRoot) + { + } + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_LEGACY_INDEX_SERVICE; + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { + bool redirect = false; + if (RedirectRoot && ev->Get()->Request->URL == "/") { + NHttp::THeaders headers(ev->Get()->Request->Headers); + if (!headers.Has("Referer")) { + redirect = true; + } + } + if (redirect) { + TStringBuilder response; + response << "HTTP/1.1 302 Found\r\nLocation: " << RedirectRoot << "\r\n\r\n"; + Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); + return; + } else if (!ev->Get()->Request->URL.ends_with("/") && ev->Get()->Request->URL.find('?') == TStringBuf::npos) { + TString url(ev->Get()->Request->URL); + bool index = false; + auto itPage = IndexPages.find(url); + if (itPage == IndexPages.end()) { + auto page = IndexMonPage->FindPageByAbsolutePath(url); + if (page) { + index = page->IsIndex(); + IndexPages[url] = index; + } + } else { + index = itPage->second; + } + if (index) { + TStringBuilder response; + auto p = url.rfind('/'); + if (p != TString::npos) { + url = url.substr(p + 1); + } + url += '/'; + response << "HTTP/1.1 302 Found\r\nLocation: " << url << "\r\n\r\n"; + Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseString(response))); + return; + } + } + Register(new THttpMonLegacyIndexRequest(std::move(ev), IndexMonPage.Get())); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + } + } + + TIntrusivePtr IndexMonPage; + TString RedirectRoot; + std::unordered_map IndexPages; +}; + +inline TActorId MakeNodeProxyId(ui32 node) { + char x[12] = "nodeproxy"; + return TActorId(node, TStringBuf(x, 12)); +} + +class THttpMonServiceNodeRequest : public TActorBootstrapped { +public: + std::shared_ptr Endpoint; + TEvMon::TEvMonitoringRequest::TPtr Event; + TActorId HttpProxyActorId; + + THttpMonServiceNodeRequest(std::shared_ptr endpoint, TEvMon::TEvMonitoringRequest::TPtr event, TActorId httpProxyActorId) + : Endpoint(std::move(endpoint)) + , Event(std::move(event)) + , HttpProxyActorId(httpProxyActorId) + {} + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_REQUEST; + } + + static void FromProto(NHttp::THttpConfig::SocketAddressType& address, const NKikimrMonProto::TSockAddr& proto) { + switch (proto.GetFamily()) { + case AF_INET: + address = std::make_shared(proto.GetAddress().data(), proto.GetPort()); + break; + case AF_INET6: + address = std::make_shared(proto.GetAddress().data(), proto.GetPort()); + break; + } + } + + TString RewriteWithForwardedFromNode(const TString& response) { + NHttp::THttpParser parser(response); + + NHttp::THeadersBuilder headers(parser.Headers); + headers.Set("X-Forwarded-From-Node", TStringBuilder() << Event->Sender.NodeId()); + + NHttp::THttpRenderer renderer; + renderer.InitRequest(parser.Method, parser.URL, parser.Protocol, parser.Version); + renderer.Set(headers); + if (parser.HaveBody()) { + renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea + } + renderer.Finish(); + return renderer.AsString(); + } + + void Bootstrap() { + NHttp::THttpConfig::SocketAddressType address; + FromProto(address, Event->Get()->Record.GetAddress()); + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(RewriteWithForwardedFromNode(Event->Get()->Record.GetHttpRequest()), Endpoint, address); + TStringBuilder prefix; + prefix << "/node/" << TActivationContext::ActorSystem()->NodeId; + if (request->URL.SkipPrefix(prefix)) { + Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(std::move(request))); + Become(&THttpMonServiceNodeRequest::StateWork); + } else { + auto response = std::make_unique(); + auto httpResponse = request->CreateResponseBadRequest(); + response->Record.SetHttpResponse(httpResponse->AsString()); + Send(Event->Sender, response.release(), 0, Event->Cookie); + PassAway(); + } + } + + TString RewriteLocationWithNode(const TString& response) { + NHttp::THttpParser parser(response); + + NHttp::THeadersBuilder headers(parser.Headers); + headers.Set("Location", TStringBuilder() << "/node/" << TActivationContext::ActorSystem()->NodeId << headers["Location"]); + + NHttp::THttpRenderer renderer; + renderer.InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); + renderer.Set(headers); + if (parser.HaveBody()) { + renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea + } + renderer.Finish(); + return renderer.AsString(); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) { + TString httpResponse = ev->Get()->Response->AsString(); + switch (FromStringWithDefault(ev->Get()->Response->Status)) { + case 301: + case 303: + case 307: + case 308: + if (!NHttp::THeaders(ev->Get()->Response->Headers).Get("Location").starts_with("/node/")) { + httpResponse = RewriteLocationWithNode(httpResponse); + } + break; + } + auto response = std::make_unique(); + response->Record.SetHttpResponse(httpResponse); + Send(Event->Sender, response.release(), 0, Event->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + } + } +}; + +class THttpMonServiceMonRequest : public TActorBootstrapped { +public: + NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event; + ui32 NodeId; + + THttpMonServiceMonRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, ui32 nodeId) + : Event(std::move(event)) + , NodeId(nodeId) + {} + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_SERVICE_MON_REQUEST; + } + + static void ToProto(NKikimrMonProto::TSockAddr& proto, const NHttp::THttpConfig::SocketAddressType& address) { + if (address) { + switch (address->SockAddr()->sa_family) { + case AF_INET: { + proto.SetFamily(AF_INET); + sockaddr_in* addr = (sockaddr_in*)address->SockAddr(); + char ip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, (void*)&addr->sin_addr, ip, INET_ADDRSTRLEN); + proto.SetAddress(ip); + proto.SetPort(htons(addr->sin_port)); + } + break; + case AF_INET6: { + proto.SetFamily(AF_INET6); + sockaddr_in6* addr = (sockaddr_in6*)address->SockAddr(); + char ip6[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, (void*)&addr->sin6_addr, ip6, INET6_ADDRSTRLEN); + proto.SetAddress(ip6); + proto.SetPort(htons(addr->sin6_port)); + } + break; + } + } + } + + void Bootstrap() { + TActorId monServiceNodeProxy = MakeNodeProxyId(NodeId); + auto request = std::make_unique(); + request->Record.SetHttpRequest(Event->Get()->Request->AsString()); + ToProto(*request->Record.MutableAddress(), Event->Get()->Request->Address); + Send(monServiceNodeProxy, request.release(), IEventHandle::FlagTrackDelivery); + Become(&THttpMonServiceMonRequest::StateWork); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + TString reason; + switch (ev->Get()->Reason) { + case TEvents::TEvUndelivered::ReasonUnknown: + reason = "ReasonUnknown"; + break; + case TEvents::TEvUndelivered::ReasonActorUnknown: + reason = "ReasonActorUnknown"; + break; + case TEvents::TEvUndelivered::Disconnected: + reason = "Disconnected"; + break; + } + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(Event->Get()->Request->CreateResponseServiceUnavailable(reason)), 0, Event->Cookie); + PassAway(); + } + + void Handle(TEvMon::TEvMonitoringResponse::TPtr& ev) { + TString responseTxt = ev->Get()->Record.GetHttpResponse(); + NHttp::THttpOutgoingResponsePtr responseObj = Event->Get()->Request->CreateResponseString(responseTxt); + if (responseObj->Status == "301" || responseObj->Status == "302") { + NHttp::THttpParser parser(responseTxt); + NHttp::THeadersBuilder headers(parser.Headers); + if (headers["Location"].starts_with('/')) { + NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(Event->Get()->Request); + response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); + + headers.Set("Location", TStringBuilder() << "/node/" << NodeId << headers["Location"]); + + response->Set(headers); + if (parser.HaveBody()) { + response->SetBody(parser.Body); + } + responseObj = response; + } + } + + Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(responseObj.Release()), 0, Event->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvMon::TEvMonitoringResponse, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } +}; + +// receives requests to another nodes +class THttpMonServiceNodeProxy : public TActor { +public: + THttpMonServiceNodeProxy(TActorId httpProxyActorId) + : TActor(&THttpMonServiceNodeProxy::StateWork) + , HttpProxyActorId(httpProxyActorId) + , Endpoint(std::make_shared()) + { + } + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::HTTP_MON_SERVICE_NODE_PROXY; + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev) { + TStringBuf url = ev->Get()->Request->URL; + TStringBuf node; + ui32 nodeId; + if (url.SkipPrefix("/node/") && url.NextTok('/', node) && TryFromStringWithDefault(node, nodeId)) { + Register(new THttpMonServiceMonRequest(std::move(ev), nodeId)); + return; + } + Send(ev->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(ev->Get()->Request->CreateResponseBadRequest("bad request")), 0, ev->Cookie); + } + + void Handle(TEvMon::TEvMonitoringRequest::TPtr& ev) { + Register(new THttpMonServiceNodeRequest(Endpoint, ev, HttpProxyActorId)); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + hFunc(TEvMon::TEvMonitoringRequest, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + } + } + +protected: + TActorId HttpProxyActorId; + std::shared_ptr Endpoint; +}; + +// initializes http and waits for the result +class THttpMonInitializator : public TActorBootstrapped { +public: + THttpMonInitializator(TActorId httpProxyActorId, std::unique_ptr config, std::promise promise) + : HttpProxyActorId(httpProxyActorId) + , Config(std::move(config)) + , Promise(std::move(promise)) + { + } + + void Bootstrap() { + Send(HttpProxyActorId, Config.release()); + Become(&THttpMonInitializator::StateWork); + } + + void Handle(NHttp::TEvHttpProxy::TEvConfirmListen::TPtr& ev) { + Promise.set_value(); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvConfirmListen, Handle); + } + } + +protected: + TActorId HttpProxyActorId; + std::unique_ptr Config; + std::promise Promise; +}; + +TMon::TMon(TConfig config) + : Config(std::move(config)) + , IndexMonPage(new NMonitoring::TIndexMonPage("", Config.Title)) +{ +} + +std::future TMon::Start(TActorSystem* actorSystem) { + Y_ABORT_UNLESS(actorSystem); + TGuard g(Mutex); + ActorSystem = actorSystem; + Register(new TIndexRedirectMonPage(IndexMonPage)); + Register(new NMonitoring::TVersionMonPage); + Register(new NMonitoring::TBootstrapCssMonPage); + Register(new NMonitoring::TTablesorterCssMonPage); + Register(new NMonitoring::TBootstrapJsMonPage); + Register(new NMonitoring::TJQueryJsMonPage); + Register(new NMonitoring::TTablesorterJsMonPage); + Register(new NMonitoring::TBootstrapFontsEotMonPage); + Register(new NMonitoring::TBootstrapFontsSvgMonPage); + Register(new NMonitoring::TBootstrapFontsTtfMonPage); + Register(new NMonitoring::TBootstrapFontsWoffMonPage); + NLwTraceMonPage::RegisterPages(IndexMonPage.Get()); + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER)); + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER)); + HttpProxyActorId = ActorSystem->Register( + NHttp::CreateHttpProxy(), + TMailboxType::ReadAsFilled, + ActorSystem->AppData()->UserPoolId); + HttpMonServiceActorId = ActorSystem->Register( + new THttpMonServiceLegacyIndex(IndexMonPage, Config.RedirectMainPageTo), + TMailboxType::ReadAsFilled, + ActorSystem->AppData()->UserPoolId); + auto nodeProxyActorId = ActorSystem->Register( + new THttpMonServiceNodeProxy(HttpProxyActorId), + TMailboxType::ReadAsFilled, + ActorSystem->AppData()->UserPoolId); + NodeProxyServiceActorId = MakeNodeProxyId(ActorSystem->NodeId); + ActorSystem->RegisterLocalService(NodeProxyServiceActorId, nodeProxyActorId); + + TStringBuilder workerName; + workerName << FQDNHostName() << ":" << Config.Port; + auto addPort = std::make_unique(); + addPort->Port = Config.Port; + addPort->WorkerName = workerName; + addPort->Address = Config.Address; + addPort->CompressContentTypes = { + "text/plain", + "text/html", + "text/css", + "text/javascript", + "application/javascript", + "application/json", + "application/yaml", + }; + addPort->SslCertificatePem = Config.Certificate; + addPort->Secure = !Config.Certificate.empty(); + addPort->MaxRequestsPerSecond = Config.MaxRequestsPerSecond; + + std::promise promise; + std::future future = promise.get_future(); + ActorSystem->Register(new THttpMonInitializator(HttpProxyActorId, std::move(addPort), std::move(promise))); + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId)); + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/node", NodeProxyServiceActorId)); + for (auto& pageInfo : ActorMonPages) { + if (pageInfo.Page) { + RegisterActorMonPage(pageInfo); + } else if (pageInfo.Handler) { + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, pageInfo.Handler)); + } + } + ActorMonPages.clear(); + return future; +} + +void TMon::Stop() { + IndexMonPage->ClearPages(); // it's required to avoid loop-reference + if (ActorSystem) { + TGuard g(Mutex); + for (const auto& [path, actorId] : ActorServices) { + ActorSystem->Send(actorId, new TEvents::TEvPoisonPill); + } + ActorSystem->Send(NodeProxyServiceActorId, new TEvents::TEvPoisonPill); + ActorSystem->Send(HttpMonServiceActorId, new TEvents::TEvPoisonPill); + ActorSystem->Send(HttpProxyActorId, new TEvents::TEvPoisonPill); + ActorSystem = nullptr; + } +} + +void TMon::Register(NMonitoring::IMonPage* page) { + IndexMonPage->Register(page); +} + +NMonitoring::TIndexMonPage* TMon::RegisterIndexPage(const TString& path, const TString& title) { + auto page = IndexMonPage->RegisterIndexPage(path, title); + IndexMonPage->SortPages(); + return page; +} + +void TMon::RegisterActorMonPage(const TActorMonPageInfo& pageInfo) { + if (ActorSystem) { + TActorMonPage* actorMonPage = static_cast(pageInfo.Page.Get()); + auto& actorId = ActorServices[pageInfo.Path]; + if (actorId) { + ActorSystem->Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); + } + actorId = ActorSystem->Register( + new THttpMonServiceLegacyActor(actorMonPage), + TMailboxType::ReadAsFilled, + ActorSystem->AppData()->UserPoolId); + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(pageInfo.Path, actorId)); + } +} + +NMonitoring::IMonPage* TMon::RegisterActorPage(TRegisterActorPageFields fields) { + TGuard g(Mutex); + NMonitoring::TMonPagePtr page = new TActorMonPage( + fields.RelPath, + fields.Title, + Config.Host, + fields.PreTag, + fields.ActorSystem, + fields.ActorId, + fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs, + fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(), + fields.MonServiceName); + if (fields.Index) { + fields.Index->Register(page); + if (fields.SortPages) { + fields.Index->SortPages(); + } + } else { + Register(page.Get()); + } + + TActorMonPageInfo pageInfo = { + .Page = page, + .Path = GetPageFullPath(page.Get()), + }; + + if (ActorSystem && HttpProxyActorId) { + RegisterActorMonPage(pageInfo); + } else { + ActorMonPages.emplace_back(pageInfo); + } + + return page.Get(); +} + +NMonitoring::IMonPage* TMon::RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { + TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters); + page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore); + Register(page); + return page; +} + +void TMon::RegisterHandler(const TString& path, const TActorId& handler) { + if (ActorSystem) { + ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler(path, handler)); + } else { + TGuard g(Mutex); + ActorMonPages.emplace_back(TActorMonPageInfo{ + .Handler = handler, + .Path = path, + }); + } +} + +NMonitoring::IMonPage* TMon::FindPage(const TString& relPath) { + return IndexMonPage->FindPage(relPath); +} + } diff --git a/ydb/core/mon/mon.h b/ydb/core/mon/mon.h index a7781cd7c77a..0fcbecf004b3 100644 --- a/ydb/core/mon/mon.h +++ b/ydb/core/mon/mon.h @@ -1,29 +1,26 @@ #pragma once -#include +#include #include #include +#include #include #include #include #include #include -#include #include +#include #include #include -namespace NActors { +#include "mon.h" -IEventHandle* SelectAuthorizationScheme(const NActors::TActorId& owner, NMonitoring::IMonHttpRequest& request); -IEventHandle* GetAuthorizeTicketResult(const NActors::TActorId& owner); +namespace NActors { -void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYql::TIssues& issues, NYdb::EStatus status); void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYdb::TStatus& status); - -class TActorSystem; -struct TActorId; +void MakeJsonErrorReply(NJson::TJsonValue& jsonResponse, TString& message, const NYql::TIssues& issues, NYdb::EStatus status); class TMon { public: @@ -45,12 +42,14 @@ class TMon { TDuration InactivityTimeout = TDuration::Minutes(2); }; + TMon(TConfig config); virtual ~TMon() = default; - virtual void Start(TActorSystem* actorSystem = {}) = 0; - virtual void Stop() = 0; - virtual void Register(NMonitoring::IMonPage* page) = 0; - virtual NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) = 0; + std::future Start(TActorSystem* actorSystem); // signals when monitoring is ready + void Stop(); + + void Register(NMonitoring::IMonPage* page); + NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title); struct TRegisterActorPageFields { TString Title; @@ -65,12 +64,32 @@ class TMon { TString MonServiceName = "utils"; }; - virtual NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) = 0; + NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields); NMonitoring::IMonPage* RegisterActorPage(NMonitoring::TIndexMonPage* index, const TString& relPath, const TString& title, bool preTag, TActorSystem* actorSystem, const TActorId& actorId, bool useAuth = true, bool sortPages = true); - virtual NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) = 0; - virtual NMonitoring::IMonPage* FindPage(const TString& relPath) = 0; - virtual void RegisterHandler(const TString& path, const TActorId& handler) = 0; + NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); + NMonitoring::IMonPage* FindPage(const TString& relPath); + void RegisterHandler(const TString& path, const TActorId& handler); + +protected: + TConfig Config; + TIntrusivePtr IndexMonPage; + TActorSystem* ActorSystem = {}; + TActorId HttpProxyActorId; + TActorId HttpMonServiceActorId; + TActorId NodeProxyServiceActorId; + + struct TActorMonPageInfo { + NMonitoring::TMonPagePtr Page; + TActorId Handler; + TString Path; + }; + + TMutex Mutex; + std::vector ActorMonPages; + THashMap ActorServices; + + void RegisterActorMonPage(const TActorMonPageInfo& pageInfo); }; } // NActors diff --git a/ydb/core/mon/sync_http_mon.cpp b/ydb/core/mon/sync_http_mon.cpp deleted file mode 100644 index f7ae9159df21..000000000000 --- a/ydb/core/mon/sync_http_mon.cpp +++ /dev/null @@ -1,120 +0,0 @@ -#include "sync_http_mon.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "mon_impl.h" - -namespace NActors { - - //////////////////////////////////////////////////////////////////////////////// - // TMON CLASS - //////////////////////////////////////////////////////////////////////////////// - TSyncHttpMon::TSyncHttpMon(TSyncHttpMon::TConfig config) - : TBase(config.Port, config.Address, config.Threads, config.Title) - , Config(std::move(config)) - { - } - - TSyncHttpMon::~TSyncHttpMon() { - Stop(); - } - - void TSyncHttpMon::Start(TActorSystem*) { - TBase::Register(new TIndexRedirectMonPage(IndexMonPage)); - TBase::Register(new NMonitoring::TVersionMonPage); - TBase::Register(new NMonitoring::TBootstrapCssMonPage); - TBase::Register(new NMonitoring::TTablesorterCssMonPage); - TBase::Register(new NMonitoring::TBootstrapJsMonPage); - TBase::Register(new NMonitoring::TJQueryJsMonPage); - TBase::Register(new NMonitoring::TTablesorterJsMonPage); - TBase::Register(new NMonitoring::TBootstrapFontsEotMonPage); - TBase::Register(new NMonitoring::TBootstrapFontsSvgMonPage); - TBase::Register(new NMonitoring::TBootstrapFontsTtfMonPage); - TBase::Register(new NMonitoring::TBootstrapFontsWoffMonPage); - - NLwTraceMonPage::RegisterPages(IndexMonPage.Get()); - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(ACTORLIB_PROVIDER)); - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(MONITORING_PROVIDER)); - TBase::Start(); - } - - void TSyncHttpMon::Stop() { - IndexMonPage->ClearPages(); // it's required to avoid loop-reference - TBase::Stop(); - } - - void TSyncHttpMon::Register(NMonitoring::IMonPage* page) { - TBase::Register(page); - TBase::SortPages(); - } - - TIndexMonPage* TSyncHttpMon::RegisterIndexPage(const TString& path, const TString& title) { - auto page = TBase::RegisterIndexPage(path, title); - TBase::SortPages(); - return page; - } - - IMonPage* TSyncHttpMon::RegisterActorPage(TMon::TRegisterActorPageFields fields) { - IMonPage* page = new TActorMonPage( - fields.RelPath, - fields.Title, - Config.Host, - fields.PreTag, - fields.ActorSystem, - fields.ActorId, - fields.AllowedSIDs ? fields.AllowedSIDs : Config.AllowedSIDs, - fields.UseAuth ? Config.Authorizer : TRequestAuthorizer(), - fields.MonServiceName); - if (fields.Index) { - fields.Index->Register(page); - if (fields.SortPages) { - fields.Index->SortPages(); - } - } else { - Register(page); - } - - return page; - } - - IMonPage* TSyncHttpMon::RegisterCountersPage(const TString &path, const TString &title, TIntrusivePtr counters) { - TDynamicCountersPage* page = new TDynamicCountersPage(path, title, counters); - page->SetUnknownGroupPolicy(EUnknownGroupPolicy::Ignore); - Register(page); - return page; - } - - void TSyncHttpMon::OutputIndexPage(IOutputStream& out) { - if (Config.RedirectMainPageTo) { - // XXX manual http response construction - out << "HTTP/1.1 302 Found\r\n" - << "Location: " << Config.RedirectMainPageTo << "\r\n" - << "Connection: Close\r\n\r\n"; - } else { - NMonitoring::TMonService2::OutputIndexPage(out); - } - } - - IMonPage* TSyncHttpMon::FindPage(const TString& relPath) { - return TBase::FindPage(relPath); - } - - void TSyncHttpMon::RegisterHandler(const TString& path, const TActorId& handler) { - ALOG_ERROR(NActorsServices::HTTP, "Cannot register actor handler " << handler << " in sync mon for " << path); - } -} // NActors diff --git a/ydb/core/mon/sync_http_mon.h b/ydb/core/mon/sync_http_mon.h deleted file mode 100644 index 5634ca582266..000000000000 --- a/ydb/core/mon/sync_http_mon.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "mon.h" - -namespace NActors { - -class TSyncHttpMon : public TMon, public NMonitoring::TMonService2 { -public: - TSyncHttpMon(TConfig config); - virtual ~TSyncHttpMon(); - void Start(TActorSystem* actorSystem = {}) override; - void Stop() override; - - void Register(NMonitoring::IMonPage *page) override; - NMonitoring::TIndexMonPage* RegisterIndexPage(const TString& path, const TString& title) override; - NMonitoring::IMonPage* RegisterActorPage(TRegisterActorPageFields fields) override; - NMonitoring::IMonPage* RegisterCountersPage(const TString& path, const TString& title, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) override; - void OutputIndexPage(IOutputStream& out) override; - NMonitoring::IMonPage* FindPage(const TString& relPath) override; - void RegisterHandler(const TString& path, const TActorId& handler) override; - -protected: - typedef NMonitoring::TMonService2 TBase; - TConfig Config; -}; - -} // NActors diff --git a/ydb/core/mon/ya.make b/ydb/core/mon/ya.make index 8d9d3e58a3c3..9edde6bd8455 100644 --- a/ydb/core/mon/ya.make +++ b/ydb/core/mon/ya.make @@ -1,12 +1,8 @@ LIBRARY() SRCS( - async_http_mon.cpp - async_http_mon.h mon.cpp mon.h - sync_http_mon.cpp - sync_http_mon.h crossref.cpp crossref.h ) diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index a2d38248a424..a8f20cb20562 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index d035ca163ac6..a97d0d80ee38 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -83,7 +83,7 @@ message TFeatureFlags { optional bool EnableImplicitScanQueryInScripts = 61 [default = true]; reserved 62; // EnablePredicateExtractForScanQueries optional bool AllowVDiskDefrag = 63 [default = true]; - optional bool EnableAsyncHttpMon = 64 [default = true]; + optional bool EnableAsyncHttpMon = 64 [default = true]; // deprecated: always true optional bool EnableChangefeeds = 65 [default = true]; reserved 66; // EnableKqpScanQueryStreamLookup optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false]; diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index fa9c522ca3ba..92a4666a748b 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -4,8 +4,7 @@ #include #include #include -#include -#include +#include #include #include #include @@ -193,19 +192,11 @@ namespace NActors { if (NeedMonitoring && !SingleSysEnv) { ui16 port = MonitoringPortOffset ? MonitoringPortOffset + nodeIndex : GetPortManager().GetPort(); - if (MonitoringTypeAsync) { - node->Mon.Reset(new NActors::TAsyncHttpMon({ - .Port = port, - .Threads = 10, - .Title = "KIKIMR monitoring" - })); - } else { - node->Mon.Reset(new NActors::TSyncHttpMon({ - .Port = port, - .Threads = 10, - .Title = "KIKIMR monitoring" - })); - } + node->Mon.Reset(new NActors::TMon({ + .Port = port, + .Threads = 10, + .Title = "KIKIMR monitoring" + })); nodeAppData->Mon = node->Mon.Get(); node->Mon->RegisterCountersPage("counters", "Counters", node->DynamicCounters); auto actorsMonPage = node->Mon->RegisterIndexPage("actors", "Actors"); diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 1c2d05b5b9ec..5705af7ba05b 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include @@ -370,7 +370,7 @@ namespace NKikimr::NPersQueueTests { const auto monPort = TPortManager().GetPort(); auto Counters = server.CleverServer->GetGRpcServerRootCounters(); - NActors::TSyncHttpMon Monitoring({ + NActors::TMon Monitoring({ .Port = monPort, .Address = "localhost", .Threads = 3, @@ -378,7 +378,7 @@ namespace NKikimr::NPersQueueTests { .Host = "localhost", }); Monitoring.RegisterCountersPage("counters", "Counters", Counters); - Monitoring.Start(); + Monitoring.Start(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem()); auto ydbDriver = MakeHolder(driverCfg); auto persQueueClient = MakeHolder(*ydbDriver); @@ -562,7 +562,7 @@ namespace NKikimr::NPersQueueTests { const auto monPort = TPortManager().GetPort(); auto Counters = server.CleverServer->GetGRpcServerRootCounters(); - NActors::TSyncHttpMon Monitoring({ + NActors::TMon Monitoring({ .Port = monPort, .Address = "localhost", .Threads = 3, @@ -570,7 +570,7 @@ namespace NKikimr::NPersQueueTests { .Host = "localhost", }); Monitoring.RegisterCountersPage("counters", "Counters", Counters); - Monitoring.Start(); + Monitoring.Start(server.CleverServer->GetRuntime()->GetAnyNodeActorSystem()); auto driverCfg = NYdb::TDriverConfig() .SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort) diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 7b1c98fd41c5..3c24074969ce 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -9,7 +9,7 @@ #include #include -#include +#include #include #include #include @@ -4012,7 +4012,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { const auto monPort = TPortManager().GetPort(); auto Counters = server.CleverServer->GetGRpcServerRootCounters(); - NActors::TSyncHttpMon Monitoring({ + NActors::TMon Monitoring({ .Port = monPort, .Address = "localhost", .Threads = 3, @@ -4020,7 +4020,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { .Host = "localhost", }); Monitoring.RegisterCountersPage("counters", "Counters", Counters); - Monitoring.Start(); + Monitoring.Start(server.CleverServer->GetRuntime()->SingleSys()); server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER }); server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR);