diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 630bebdc8fc0d..2f6c3e55cfd67 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -113,6 +113,15 @@ class QueryConfig { static constexpr const char* kMaxMergeExchangeBufferSize = "merge_exchange.max_buffer_size"; + /// The minimum number of bytes to accumulate in the ExchangeQueue + /// before unblocking a consumer. This is used to avoid creating tiny + /// batches which may have a negative impact on performance when the + /// cost of creating vectors is high (for example, when there are many + /// columns). To avoid latency degradation, the exchange client unblocks a + /// consumer when 1% of the data size observed so far is accumulated. + static constexpr const char* kMinExchangeOutputBatchBytes = + "min_exchange_output_batch_bytes"; + static constexpr const char* kMaxPartialAggregationMemory = "max_partial_aggregation_memory"; @@ -594,6 +603,11 @@ class QueryConfig { return get(kMaxMergeExchangeBufferSize, kDefault); } + uint64_t minExchangeOutputBatchBytes() const { + static constexpr uint64_t kDefault = 2UL << 20; + return get(kMinExchangeOutputBatchBytes, kDefault); + } + uint64_t preferredOutputBatchBytes() const { static constexpr uint64_t kDefault = 10UL << 20; return get(kPreferredOutputBatchBytes, kDefault); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 1b3fb9c231152..aa108546e9144 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -89,6 +89,13 @@ Generic Configuration - Size of buffer in the exchange client that holds data fetched from other nodes before it is processed. A larger buffer can increase network throughput for larger clusters and thus decrease query processing time at the expense of reducing the amount of memory available for other usage. + * - min_exchange_output_batch_bytes + - integer + - 2MB + - The minimum number of bytes to accumulate in the ExchangeQueue before unblocking a consumer. This is used to avoid + creating tiny batches which may have a negative impact on performance when the cost of creating vectors is high + (for example, when there are many columns). To avoid latency degradation, the exchange client unblocks a consumer + when 1% of the data size observed so far is accumulated. * - merge_exchange.max_buffer_size - integer - 128MB @@ -670,13 +677,13 @@ Each query can override the config by setting corresponding query session proper - Default AWS secret key to use. * - hive.s3.endpoint - string - - + - - The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS. * - hive.s3.endpoint.region - string - us-east-1 - The S3 storage endpoint server region. Default is set by the AWS SDK. If not configured, region will be attempted - to be parsed from the hive.s3.endpoint value. + to be parsed from the hive.s3.endpoint value. * - hive.s3.path-style-access - bool - false diff --git a/velox/exec/ExchangeClient.h b/velox/exec/ExchangeClient.h index 29bc3b884fdd9..946bfd93b69b6 100644 --- a/velox/exec/ExchangeClient.h +++ b/velox/exec/ExchangeClient.h @@ -33,6 +33,8 @@ class ExchangeClient : public std::enable_shared_from_this { std::string taskId, int destination, int64_t maxQueuedBytes, + int32_t numberOfConsumers, + uint64_t minOutputBatchBytes, memory::MemoryPool* pool, folly::Executor* executor) : taskId_{std::move(taskId)}, @@ -40,7 +42,9 @@ class ExchangeClient : public std::enable_shared_from_this { maxQueuedBytes_{maxQueuedBytes}, pool_(pool), executor_(executor), - queue_(std::make_shared()) { + queue_(std::make_shared( + numberOfConsumers, + minOutputBatchBytes)) { VELOX_CHECK_NOT_NULL(pool_); VELOX_CHECK_NOT_NULL(executor_); // NOTE: the executor is used to run async response callback from the diff --git a/velox/exec/ExchangeQueue.cpp b/velox/exec/ExchangeQueue.cpp index f19745191d477..eacf492addecb 100644 --- a/velox/exec/ExchangeQueue.cpp +++ b/velox/exec/ExchangeQueue.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/ExchangeQueue.h" +#include namespace facebook::velox::exec { @@ -64,6 +65,15 @@ void ExchangeQueue::close() { clearPromises(promises); } +int64_t ExchangeQueue::minOutputBatchBytesLocked() const { + // always allow to unblock when at end + if (atEnd_) { + return 0; + } + // At most 1% of received bytes so far to minimize latency for small exchanges + return std::min(minOutputBatchBytes_, receivedBytes_ / 100); +} + void ExchangeQueue::enqueueLocked( std::unique_ptr&& page, std::vector& promises) { @@ -86,7 +96,13 @@ void ExchangeQueue::enqueueLocked( receivedBytes_ += page->size(); queue_.push_back(std::move(page)); - if (!promises_.empty()) { + const auto minBatchSize = minOutputBatchBytesLocked(); + while (!promises_.empty()) { + const auto unblockedConsumers = numberOfConsumers_ - promises_.size(); + const auto unasignedBytes = totalBytes_ - unblockedConsumers * minBatchSize; + if (unasignedBytes < minBatchSize) { + break; + } // Resume one of the waiting drivers. promises.push_back(std::move(promises_.back())); promises_.pop_back(); @@ -105,6 +121,14 @@ std::vector> ExchangeQueue::dequeueLocked( *atEnd = false; + // If we don't have enough bytes to return, we wait for more data to be + // available + if (totalBytes_ < minOutputBatchBytesLocked()) { + promises_.emplace_back("ExchangeQueue::dequeue"); + *future = promises_.back().getSemiFuture(); + return {}; + } + std::vector> pages; uint32_t pageBytes = 0; for (;;) { diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index 91e3a663aa06c..c6b121eceb437 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -81,6 +81,18 @@ class SerializedPage { /// for input. class ExchangeQueue { public: +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + explicit ExchangeQueue() : ExchangeQueue(1, 0) {} +#endif + + explicit ExchangeQueue( + int32_t numberOfConsumers, + uint64_t minOutputBatchBytes) + : numberOfConsumers_{numberOfConsumers}, + minOutputBatchBytes_{minOutputBatchBytes} { + VELOX_CHECK_GE(numberOfConsumers, 1); + } + ~ExchangeQueue() { clearAllPromises(); } @@ -185,6 +197,11 @@ class ExchangeQueue { } } + int64_t minOutputBatchBytesLocked() const; + + const int32_t numberOfConsumers_; + const uint64_t minOutputBatchBytes_; + int numCompleted_{0}; int numSources_{0}; bool noMoreSources_{false}; diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index 786bd40045cca..8380b11d66a8d 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -128,6 +128,8 @@ class MergeExchangeSource : public MergeSource { mergeExchange->taskId(), destination, maxQueuedBytes, + 1, + 0, pool, executor)) { client_->addRemoteTaskId(taskId); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index b0c6b3e362389..aa6f0842f5dff 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -970,7 +970,8 @@ void Task::initializePartitionOutput() { // exchange client for each merge source to fetch data as we can't mix // the data from different sources for merging. if (auto exchangeNodeId = factory->needsExchangeClient()) { - createExchangeClientLocked(pipeline, exchangeNodeId.value()); + createExchangeClientLocked( + pipeline, exchangeNodeId.value(), factory->numDrivers); } } } @@ -2982,7 +2983,8 @@ bool Task::pauseRequested(ContinueFuture* future) { void Task::createExchangeClientLocked( int32_t pipelineId, - const core::PlanNodeId& planNodeId) { + const core::PlanNodeId& planNodeId, + int32_t numberOfConsumers) { VELOX_CHECK_NULL( getExchangeClientLocked(pipelineId), "Exchange client has been created at pipeline: {} for planNode: {}", @@ -2998,6 +3000,8 @@ void Task::createExchangeClientLocked( taskId_, destination_, queryCtx()->queryConfig().maxExchangeBufferSize(), + numberOfConsumers, + queryCtx()->queryConfig().minExchangeOutputBatchBytes(), addExchangeClientPool(planNodeId, pipelineId), queryCtx()->executor()); exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 253153420a835..ae843173f02f0 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -1003,7 +1003,8 @@ class Task : public std::enable_shared_from_this { // pipeline. void createExchangeClientLocked( int32_t pipelineId, - const core::PlanNodeId& planNodeId); + const core::PlanNodeId& planNodeId, + int32_t numberOfConsumers); // Get a shared reference to the exchange client with the specified exchange // plan node 'planNodeId'. The function returns null if there is no client diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 2ffcd78cbd7da..c1fee35657a26 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -33,6 +33,8 @@ namespace facebook::velox::exec { namespace { +static constexpr int32_t kDefaultMinExchangeOutputBatchBytes{2 << 20}; // 2 MB. + class ExchangeClientTest : public testing::Test, public velox::test::VectorTestBase, @@ -110,10 +112,10 @@ class ExchangeClientTest int32_t numPages) { std::vector> allPages; for (auto i = 0; i < numPages; ++i) { - bool atEnd; + bool atEnd{false}; ContinueFuture future; auto pages = client.next(1, &atEnd, &future); - if (pages.empty()) { + while (!atEnd && pages.empty()) { auto& exec = folly::QueuedImmediateExecutor::instance(); std::move(future).via(&exec).wait(); pages = client.next(1, &atEnd, &future); @@ -170,7 +172,7 @@ TEST_P(ExchangeClientTest, nonVeloxCreateExchangeSourceException) { }); auto client = std::make_shared( - "t", 1, ExchangeClient::kDefaultMaxQueuedBytes, pool(), executor()); + "t", 1, ExchangeClient::kDefaultMaxQueuedBytes, 1, 0, pool(), executor()); VELOX_ASSERT_THROW( client->addRemoteTaskId("task.1.2.3"), @@ -199,7 +201,13 @@ TEST_P(ExchangeClientTest, stats) { task, core::PartitionedOutputNode::Kind::kPartitioned, 100, 16); auto client = std::make_shared( - "t", 17, ExchangeClient::kDefaultMaxQueuedBytes, pool(), executor()); + "t", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); client->addRemoteTaskId(taskId); // Enqueue 3 pages. @@ -239,7 +247,13 @@ TEST_P(ExchangeClientTest, flowControl) { // Set limit at 3.5 pages. auto client = std::make_shared( - "flow.control", 17, page->size() * 3.5, pool(), executor()); + "flow.control", + 17, + page->size() * 3.5, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); // Make 10 tasks. std::vector> tasks; @@ -277,10 +291,17 @@ TEST_P(ExchangeClientTest, flowControl) { TEST_P(ExchangeClientTest, largeSinglePage) { auto data = { makeRowVector({makeFlatVector(10000, folly::identity)}), - makeRowVector({makeFlatVector(1, folly::identity)}), + // second page is >1% of total payload size + makeRowVector({makeFlatVector(150, folly::identity)}), }; - auto client = - std::make_shared("test", 1, 1000, pool(), executor()); + auto client = std::make_shared( + "test", + 1, + 1000, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); auto task = makeTask("local://producer"); bufferManager_->initializeTask( task, core::PartitionedOutputNode::Kind::kArbitrary, 1, 1); @@ -290,18 +311,24 @@ TEST_P(ExchangeClientTest, largeSinglePage) { client->addRemoteTaskId(task->taskId()); auto pages = fetchPages(*client, 1); ASSERT_EQ(pages.size(), 1); - ASSERT_GT(pages[0]->size(), 1000); + ASSERT_GT(pages[0]->size(), 80000); pages = fetchPages(*client, 1); ASSERT_EQ(pages.size(), 1); - ASSERT_LT(pages[0]->size(), 1000); + ASSERT_LT(pages[0]->size(), 4000); task->requestCancel(); bufferManager_->removeTask(task->taskId()); client->close(); } TEST_P(ExchangeClientTest, multiPageFetch) { - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); { bool atEnd; @@ -353,8 +380,14 @@ TEST_P(ExchangeClientTest, multiPageFetch) { TEST_P(ExchangeClientTest, sourceTimeout) { constexpr int32_t kNumSources = 3; - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); bool atEnd; ContinueFuture future; @@ -434,8 +467,14 @@ TEST_P(ExchangeClientTest, sourceTimeout) { TEST_P(ExchangeClientTest, callNextAfterClose) { constexpr int32_t kNumSources = 3; common::testutil::TestValue::enable(); - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + 1, + kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); bool atEnd; ContinueFuture future; @@ -497,6 +536,8 @@ TEST_P(ExchangeClientTest, acknowledge) { "local://test-acknowledge-client-task", 1, clientBufferSize, + 1, + kDefaultMinExchangeOutputBatchBytes, pool(), executor()); auto clientCloseGuard = folly::makeGuard([client]() { client->close(); }); @@ -615,6 +656,250 @@ TEST_P(ExchangeClientTest, acknowledge) { ASSERT_TRUE(atEnd); } +TEST_P(ExchangeClientTest, minOutputBatchBytesInitialBatches) { + // Initial batches should not block to avoid impacting latency of small + // exchanges + + const auto minOutputBatchBytes = 10000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + 1, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + ContinueFuture future = ContinueFuture::makeEmpty(); + + // first page should unblock right away + auto pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(2000)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + // page larger than 1% of total should unblock right away + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(100)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + // small page (<1% of total) should block + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(15)); + ASSERT_FALSE(future.isReady()); + // one more small page should unblock now + enqueue(*queue, makePage(10)); + ASSERT_TRUE(future.isReady()); + pages = client->next(100, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + + pages = client->next(10'000, &atEnd, &future); + ASSERT_EQ(0, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + +TEST_P(ExchangeClientTest, minOutputBatchBytesZero) { + // When minOutputBatchBytes is zero always unblock on the first page + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + 10, + 0, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + ContinueFuture future = ContinueFuture::makeEmpty(); + + auto pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(1)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(1)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + + pages = client->next(10'000, &atEnd, &future); + ASSERT_EQ(0, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + +TEST_P(ExchangeClientTest, minOutputBatchBytesSingleConsumer) { + const auto minOutputBatchBytes = 1000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + 1, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + ContinueFuture future = ContinueFuture::makeEmpty(); + + // first page should unblock right away + auto pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes * 150)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 3)); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 3)); + ASSERT_TRUE(future.isReady()); + + pages = client->next(minOutputBatchBytes, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(future.isReady()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + ASSERT_TRUE(future.isReady()); + + pages = client->next(10'000, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + +TEST_P(ExchangeClientTest, minOutputBatchBytesMultipleConsumers) { + const auto minOutputBatchBytes = 1000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + 3, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + + ContinueFuture consumer1 = ContinueFuture::makeEmpty(); + ContinueFuture consumer2 = ContinueFuture::makeEmpty(); + ContinueFuture consumer3 = ContinueFuture::makeEmpty(); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + // first page should unblock right away + + enqueue(*queue, makePage(minOutputBatchBytes * 150)); + ASSERT_TRUE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + auto pages = client->next(1, &atEnd, &consumer1); + ASSERT_EQ(1, pages.size()); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + pages = client->next(1, &atEnd, &consumer3); + ASSERT_EQ(1, pages.size()); + pages = client->next(1, &atEnd, &consumer2); + ASSERT_EQ(1, pages.size()); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_FALSE(consumer3.isReady()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + ASSERT_TRUE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + pages = client->next(10'000, &atEnd, &consumer1); + ASSERT_EQ(1, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + VELOX_INSTANTIATE_TEST_SUITE_P( ExchangeClientTest, ExchangeClientTest, diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 3902ead5ffc4e..2920411eb6dc9 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -1451,7 +1451,7 @@ TEST_P(OutputBufferManagerWithDifferentSerdeKindsTest, outOfOrderAcks) { } TEST_F(OutputBufferManagerTest, errorInQueue) { - auto queue = std::make_shared(); + auto queue = std::make_shared(1, 0); queue->setError("Forced failure"); std::lock_guard l(queue->mutex()); @@ -1473,7 +1473,7 @@ TEST_P( auto page = std::make_unique(std::move(iobuf)); - auto queue = std::make_shared(); + auto queue = std::make_shared(1, 0); std::vector promises; { std::lock_guard l(queue->mutex());