From 45447d06c7d542fff02b7b678c95efa061cdf642 Mon Sep 17 00:00:00 2001 From: Evan Zou Date: Mon, 3 Feb 2025 19:08:33 -0800 Subject: [PATCH] Make CPUConcurrencyController a shared_ptr Summary: Technically this is a behavioral change, but Kirill think it should be harmless Reviewed By: sazonovkirill Differential Revision: D67947852 fbshipit-source-id: c419d10b70e7c6c4ea0556587a5e9855a3b98a48 --- .../thrift/lib/cpp2/server/ServerConfigs.h | 4 +- .../thrift/lib/cpp2/server/ThriftServer.cpp | 15 +++-- .../src/thrift/lib/cpp2/server/ThriftServer.h | 19 +++++-- .../server/overload/QpsOverloadChecker.cpp | 4 +- .../QueueConcurrencyOverloadChecker.cpp | 3 +- ...MockCPUConcurrencyControllerConfigTest.cpp | 8 ++- .../lib/cpp2/server/test/TestService.thrift | 21 +++++++ .../server/test/ThriftServerCpuCCTest.cpp | 55 +++++++++++++++++++ .../transport/core/RequestStateMachine.cpp | 10 +++- .../cpp2/transport/core/RequestStateMachine.h | 4 +- .../lib/cpp2/transport/core/ThriftRequest.cpp | 9 ++- .../core/testutil/ServerConfigsMock.h | 8 +-- 12 files changed, 133 insertions(+), 27 deletions(-) create mode 100644 third-party/thrift/src/thrift/lib/cpp2/server/test/TestService.thrift create mode 100644 third-party/thrift/src/thrift/lib/cpp2/server/test/ThriftServerCpuCCTest.cpp diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/ServerConfigs.h b/third-party/thrift/src/thrift/lib/cpp2/server/ServerConfigs.h index ebc7bd80b3e43a..9ea03b9d55846f 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/ServerConfigs.h +++ b/third-party/thrift/src/thrift/lib/cpp2/server/ServerConfigs.h @@ -83,8 +83,8 @@ class ServerConfigs { virtual const AdaptiveConcurrencyController& getAdaptiveConcurrencyController() const = 0; - virtual CPUConcurrencyController& getCPUConcurrencyController() = 0; - virtual const CPUConcurrencyController& getCPUConcurrencyController() + virtual CPUConcurrencyController* getCPUConcurrencyController() = 0; + virtual const CPUConcurrencyController* getCPUConcurrencyController() const = 0; // @see ThriftServer::getNumIOWorkerThreads function. diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.cpp b/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.cpp index 8d8297239b2842..9ddfb70fc918c4 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.cpp @@ -265,10 +265,10 @@ ThriftServer::ThriftServer() apache::thrift::detail::makeAdaptiveConcurrencyConfig(), thriftConfig_.getMaxRequests().getObserver(), detail::getThriftServerConfig(*this)}, - cpuConcurrencyController_{ + cpuConcurrencyController_{std::make_shared( makeCPUConcurrencyControllerConfigInternal(), *this, - detail::getThriftServerConfig(*this)}, + detail::getThriftServerConfig(*this))}, addresses_(1), wShutdownSocketSet_(folly::tryGetShutdownSocketSet()), lastRequestTime_( @@ -1957,7 +1957,7 @@ folly::Optional ThriftServer::checkOverload( !getMethodsBypassMaxRequestsLimit().contains(method) && static_cast(getActiveRequests()) >= maxRequests) { LoadShedder loadShedder = LoadShedder::MAX_REQUESTS; - if (getCPUConcurrencyController().requestShed( + if (notifyCPUConcurrencyControllerOnRequestLoadShed( CPUConcurrencyController::Method::MAX_REQUESTS)) { loadShedder = LoadShedder::CPU_CONCURRENCY_CONTROLLER; } else if (getAdaptiveConcurrencyController().enabled()) { @@ -1975,7 +1975,7 @@ folly::Optional ThriftServer::checkOverload( !getMethodsBypassMaxRequestsLimit().contains(method) && !qpsTokenBucket_.consume(1.0, maxQps, maxQps)) { LoadShedder loadShedder = LoadShedder::MAX_QPS; - if (getCPUConcurrencyController().requestShed( + if (notifyCPUConcurrencyControllerOnRequestLoadShed( CPUConcurrencyController::Method::MAX_QPS)) { loadShedder = LoadShedder::CPU_CONCURRENCY_CONTROLLER; } @@ -2544,4 +2544,11 @@ bool ThriftServer::getTaskExpireTimeForRequest( } return queueTimeout != taskTimeout; } + +bool ThriftServer::notifyCPUConcurrencyControllerOnRequestLoadShed( + std::optional method) { + auto* cpuConcurrencyControllerPtr = getCPUConcurrencyController(); + return cpuConcurrencyControllerPtr != nullptr && + cpuConcurrencyControllerPtr->requestShed(method); +} } // namespace apache::thrift diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.h b/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.h index 65fa73a54424b9..a2fb4b4e3824b6 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.h +++ b/third-party/thrift/src/thrift/lib/cpp2/server/ThriftServer.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -902,6 +901,11 @@ class ThriftServer : public apache::thrift::concurrency::Runnable, void disableInfoLogging() { infoLoggingEnabled_ = false; } + void setCPUConcurrencyController( + std::shared_ptr controller) { + cpuConcurrencyController_ = std::move(controller); + } + private: friend ThriftServerConfig& detail::getThriftServerConfig(ThriftServer&); @@ -1015,7 +1019,7 @@ class ThriftServer : public apache::thrift::concurrency::Runnable, mockCPUConcurrencyControllerConfig_{std::nullopt}; folly::observer::Observer makeCPUConcurrencyControllerConfigInternal(); - CPUConcurrencyController cpuConcurrencyController_; + std::shared_ptr cpuConcurrencyController_; //! The server's listening addresses std::vector addresses_; @@ -1077,14 +1081,17 @@ class ThriftServer : public apache::thrift::concurrency::Runnable, return adaptiveConcurrencyController_; } - CPUConcurrencyController& getCPUConcurrencyController() final { - return cpuConcurrencyController_; + CPUConcurrencyController* getCPUConcurrencyController() final { + return cpuConcurrencyController_.get(); } - const CPUConcurrencyController& getCPUConcurrencyController() const final { - return cpuConcurrencyController_; + const CPUConcurrencyController* getCPUConcurrencyController() const final { + return cpuConcurrencyController_.get(); } + bool notifyCPUConcurrencyControllerOnRequestLoadShed( + std::optional method); + void setMockCPUConcurrencyControllerConfig( CPUConcurrencyController::Config config) { mockCPUConcurrencyControllerConfig_.setValue(config); diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/overload/QpsOverloadChecker.cpp b/third-party/thrift/src/thrift/lib/cpp2/server/overload/QpsOverloadChecker.cpp index 8be79d3797cf58..eb576e08006597 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/overload/QpsOverloadChecker.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/server/overload/QpsOverloadChecker.cpp @@ -28,7 +28,9 @@ folly::Optional QpsOverloadChecker::checkOverload( if (!server_.getMethodsBypassMaxRequestsLimit().contains(*params.method) && !qpsTokenBucket_.consume(1.0, maxQps, maxQps)) { LoadShedder loadShedder = LoadShedder::MAX_QPS; - if (server_.getCPUConcurrencyController().requestShed( + if (auto* cpuConcurrencyController = server_.getCPUConcurrencyController(); + cpuConcurrencyController != nullptr && + cpuConcurrencyController->requestShed( CPUConcurrencyController::Method::MAX_QPS)) { loadShedder = LoadShedder::CPU_CONCURRENCY_CONTROLLER; } diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/overload/QueueConcurrencyOverloadChecker.cpp b/third-party/thrift/src/thrift/lib/cpp2/server/overload/QueueConcurrencyOverloadChecker.cpp index ce8a1b39b1da67..38273251f711dc 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/overload/QueueConcurrencyOverloadChecker.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/server/overload/QueueConcurrencyOverloadChecker.cpp @@ -24,7 +24,8 @@ folly::Optional QueueConcurrencyOverloadChecker::checkOverload( !server_.getMethodsBypassMaxRequestsLimit().contains(*params.method)) && static_cast(server_.getActiveRequests()) >= maxRequests) { LoadShedder loadShedder = LoadShedder::MAX_REQUESTS; - if (server_.getCPUConcurrencyController().requestShed( + if (auto* cpuConcurrencyController = server_.getCPUConcurrencyController(); + cpuConcurrencyController->requestShed( CPUConcurrencyController::Method::MAX_REQUESTS)) { loadShedder = LoadShedder::CPU_CONCURRENCY_CONTROLLER; } else if (server_.getAdaptiveConcurrencyController().enabled()) { diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/test/MockCPUConcurrencyControllerConfigTest.cpp b/third-party/thrift/src/thrift/lib/cpp2/server/test/MockCPUConcurrencyControllerConfigTest.cpp index ddb5a6903acb02..de698bdb9d4800 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/server/test/MockCPUConcurrencyControllerConfigTest.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/server/test/MockCPUConcurrencyControllerConfigTest.cpp @@ -48,7 +48,9 @@ TEST(MockCpuConcurrencyControllerConfigTest, testOverride) { CPUConcurrencyController::Config{.concurrencyLowerBound = 1111}); folly::observer_detail::ObserverManager::waitForAllUpdates(); - auto config = server.getCPUConcurrencyController().config(); + auto* cpuConcurrencyController = server.getCPUConcurrencyController(); + ASSERT_NE(cpuConcurrencyController, nullptr); + auto config = cpuConcurrencyController->config(); ASSERT_EQ(config->concurrencyLowerBound, 1111); ASSERT_TRUE(kMakeCPUConcurrencyControllerConfigCalled); } @@ -57,7 +59,9 @@ TEST(MockCpuConcurrencyControllerConfigTest, testBase) { kMakeCPUConcurrencyControllerConfigCalled = false; ThriftServer server; - auto config = server.getCPUConcurrencyController().config(); + auto* cpuConcurrencyController = server.getCPUConcurrencyController(); + ASSERT_NE(cpuConcurrencyController, nullptr); + auto config = cpuConcurrencyController->config(); ASSERT_EQ(config->concurrencyLowerBound, 2222); ASSERT_TRUE(kMakeCPUConcurrencyControllerConfigCalled); } diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/test/TestService.thrift b/third-party/thrift/src/thrift/lib/cpp2/server/test/TestService.thrift new file mode 100644 index 00000000000000..e288b36386fca6 --- /dev/null +++ b/third-party/thrift/src/thrift/lib/cpp2/server/test/TestService.thrift @@ -0,0 +1,21 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace cpp2 apache.thrift.test + +service TestService { + string echo(1: string str); +} diff --git a/third-party/thrift/src/thrift/lib/cpp2/server/test/ThriftServerCpuCCTest.cpp b/third-party/thrift/src/thrift/lib/cpp2/server/test/ThriftServerCpuCCTest.cpp new file mode 100644 index 00000000000000..99d5f1e6fa70d9 --- /dev/null +++ b/third-party/thrift/src/thrift/lib/cpp2/server/test/ThriftServerCpuCCTest.cpp @@ -0,0 +1,55 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace ::testing; +using namespace apache::thrift; + +namespace { + +class TestServicehandler + : public apache::thrift::ServiceHandler { + folly::coro::Task> co_echo( + std::unique_ptr str) override { + co_return std::move(str); + } +}; + +} // namespace + +CO_TEST(ThriftServerCpuCCTest, CPUConcurrencyControllerCanBeNull) { + auto handler = std::make_shared(); + auto server = std::make_shared( + std::move(handler), [&](ThriftServer& thriftServer) { + thriftServer.setCPUConcurrencyController(nullptr); + }); + EXPECT_EQ(server->getThriftServer().getCPUConcurrencyController(), nullptr); + + std::unique_ptr> client = + server->newClient>(); + + std::string result = co_await client->co_echo("hello"); + EXPECT_EQ(result, "hello"); +} diff --git a/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.cpp b/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.cpp index 762bff410b7b22..6089294052fa4c 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.cpp @@ -25,13 +25,15 @@ namespace apache::thrift { RequestStateMachine::RequestStateMachine( bool includeInRecentRequests, AdaptiveConcurrencyController& controller, - CPUConcurrencyController& cpuController) + CPUConcurrencyController* cpuController) : includeInRecentRequests_(includeInRecentRequests), adaptiveConcurrencyController_(controller), cpuController_(cpuController) { if (includeInRecentRequests_) { adaptiveConcurrencyController_.requestStarted(started()); - cpuController_.requestStarted(); + if (cpuController_ != nullptr) { + cpuController_->requestStarted(); + } } } @@ -65,7 +67,9 @@ RequestStateMachine::~RequestStateMachine() { [[nodiscard]] bool RequestStateMachine::tryStopProcessing() { if (!startProcessingOrQueueTimeout_.exchange( true, std::memory_order_relaxed)) { - cpuController_.requestShed(); + if (cpuController_ != nullptr) { + cpuController_->requestShed(); + } dequeued_.store( std::chrono::steady_clock::now(), std::memory_order_relaxed); return true; diff --git a/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.h b/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.h index 0e8b0f897eb81a..55d3b5eb0f828e 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.h +++ b/third-party/thrift/src/thrift/lib/cpp2/transport/core/RequestStateMachine.h @@ -36,7 +36,7 @@ class RequestStateMachine { RequestStateMachine( bool includeInRecentRequests, AdaptiveConcurrencyController& controller, - CPUConcurrencyController& cpuController); + CPUConcurrencyController* cpuController); ~RequestStateMachine(); @@ -97,7 +97,7 @@ class RequestStateMachine { std::atomic dequeued_{ std::chrono::steady_clock::time_point::min()}; AdaptiveConcurrencyController& adaptiveConcurrencyController_; - CPUConcurrencyController& cpuController_; + CPUConcurrencyController* cpuController_; }; } // namespace apache::thrift diff --git a/third-party/thrift/src/thrift/lib/cpp2/transport/core/ThriftRequest.cpp b/third-party/thrift/src/thrift/lib/cpp2/transport/core/ThriftRequest.cpp index 64434a6cbd8e9b..6aa919e0a02804 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/transport/core/ThriftRequest.cpp +++ b/third-party/thrift/src/thrift/lib/cpp2/transport/core/ThriftRequest.cpp @@ -243,8 +243,13 @@ ThriftRequestCore::LogRequestSampleCallback::buildRequestLoggingContext( requestLoggingContext.clientTimeoutMs = thriftRequest.clientTimeout_; // CPUConcurrencyController mode - requestLoggingContext.cpuConcurrencyControllerMode = static_cast( - serverConfigs_.getCPUConcurrencyController().config()->mode); + if (serverConfigs_.getCPUConcurrencyController() != nullptr) { + requestLoggingContext.cpuConcurrencyControllerMode = static_cast( + serverConfigs_.getCPUConcurrencyController()->config()->mode); + } else { + requestLoggingContext.cpuConcurrencyControllerMode = + static_cast(CPUConcurrencyController::Mode::DISABLED); + } return requestLoggingContext; } diff --git a/third-party/thrift/src/thrift/lib/cpp2/transport/core/testutil/ServerConfigsMock.h b/third-party/thrift/src/thrift/lib/cpp2/transport/core/testutil/ServerConfigsMock.h index 2761bcb570300a..29e7edaf3a64d1 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/transport/core/testutil/ServerConfigsMock.h +++ b/third-party/thrift/src/thrift/lib/cpp2/transport/core/testutil/ServerConfigsMock.h @@ -85,14 +85,14 @@ class ServerConfigsMock : public ServerConfigs { return adaptiveConcurrencyController_; } - apache::thrift::CPUConcurrencyController& getCPUConcurrencyController() + apache::thrift::CPUConcurrencyController* getCPUConcurrencyController() override { - return cpuConcurrencyController_; + return &cpuConcurrencyController_; } - const apache::thrift::CPUConcurrencyController& getCPUConcurrencyController() + const apache::thrift::CPUConcurrencyController* getCPUConcurrencyController() const override { - return cpuConcurrencyController_; + return &cpuConcurrencyController_; } uint32_t getMaxRequests() const override {