Skip to content

Commit

Permalink
feat: Avoid small batches in Exchange
Browse files Browse the repository at this point in the history
Summary:
Prevent exchange client from unblocking to early. Unblocking to early impedes
effectiveness of page merging. When the cost of creating a vector is high (for
example for data sets with high number of columns) creating small pages can
make queries significantly less efficient.

For example it was observed that when network is congested and Exchange buffers
are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421

Reviewed By: xiaoxmeng

Differential Revision: D67615570
  • Loading branch information
arhimondr authored and facebook-github-bot committed Jan 22, 2025
1 parent 05fc7f8 commit 34fbab7
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 25 deletions.
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ class QueryConfig {
static constexpr const char* kMaxMergeExchangeBufferSize =
"merge_exchange.max_buffer_size";

/// The minimum number of bytes to accumulate in the ExchangeQueue
/// before unblocking a consumer. This is used to avoid creating tiny
/// batches which may have a negative impact on performance when the
/// cost of creating vectors is high (for example, when there are many
/// columns). To avoid latency degradation, the exchange client unblocks a
/// consumer when 1% of the data size observed so far is accumulated.
static constexpr const char* kMinExchangeOutputBatchBytes =
"min_exchange_output_batch_bytes";

static constexpr const char* kMaxPartialAggregationMemory =
"max_partial_aggregation_memory";

Expand Down Expand Up @@ -594,6 +603,11 @@ class QueryConfig {
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
}

uint64_t minExchangeOutputBatchBytes() const {
static constexpr uint64_t kDefault = 2UL << 20;
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
}

uint64_t preferredOutputBatchBytes() const {
static constexpr uint64_t kDefault = 10UL << 20;
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);
Expand Down
11 changes: 9 additions & 2 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ Generic Configuration
- Size of buffer in the exchange client that holds data fetched from other nodes before it is processed.
A larger buffer can increase network throughput for larger clusters and thus decrease query processing time
at the expense of reducing the amount of memory available for other usage.
* - min_exchange_output_batch_bytes
- integer
- 2MB
- The minimum number of bytes to accumulate in the ExchangeQueue before unblocking a consumer. This is used to avoid
creating tiny batches which may have a negative impact on performance when the cost of creating vectors is high
(for example, when there are many columns). To avoid latency degradation, the exchange client unblocks a consumer
when 1% of the data size observed so far is accumulated.
* - merge_exchange.max_buffer_size
- integer
- 128MB
Expand Down Expand Up @@ -670,13 +677,13 @@ Each query can override the config by setting corresponding query session proper
- Default AWS secret key to use.
* - hive.s3.endpoint
- string
-
-
- The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS.
* - hive.s3.endpoint.region
- string
- us-east-1
- The S3 storage endpoint server region. Default is set by the AWS SDK. If not configured, region will be attempted
to be parsed from the hive.s3.endpoint value.
to be parsed from the hive.s3.endpoint value.
* - hive.s3.path-style-access
- bool
- false
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
std::string taskId,
int destination,
int64_t maxQueuedBytes,
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes,
memory::MemoryPool* pool,
folly::Executor* executor)
: taskId_{std::move(taskId)},
destination_(destination),
maxQueuedBytes_{maxQueuedBytes},
pool_(pool),
executor_(executor),
queue_(std::make_shared<ExchangeQueue>()) {
queue_(std::make_shared<ExchangeQueue>(
numberOfConsumers,
minOutputBatchBytes)) {
VELOX_CHECK_NOT_NULL(pool_);
VELOX_CHECK_NOT_NULL(executor_);
// NOTE: the executor is used to run async response callback from the
Expand Down
28 changes: 27 additions & 1 deletion velox/exec/ExchangeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/ExchangeQueue.h"
#include <algorithm>

namespace facebook::velox::exec {

Expand Down Expand Up @@ -64,6 +65,15 @@ void ExchangeQueue::close() {
clearPromises(promises);
}

int64_t ExchangeQueue::minOutputBatchBytesLocked() const {
// always allow to unblock when at end
if (atEnd_) {
return 0;
}
// At most 1% of received bytes so far to minimize latency for small exchanges
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
}

void ExchangeQueue::enqueueLocked(
std::unique_ptr<SerializedPage>&& page,
std::vector<ContinuePromise>& promises) {
Expand All @@ -86,7 +96,15 @@ void ExchangeQueue::enqueueLocked(
receivedBytes_ += page->size();

queue_.push_back(std::move(page));
if (!promises_.empty()) {
const auto minBatchSize = minOutputBatchBytesLocked();
while (!promises_.empty()) {
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size();
const int64_t unasignedBytes =
totalBytes_ - unblockedConsumers * minBatchSize;
if (unasignedBytes < minBatchSize) {
break;
}
// Resume one of the waiting drivers.
promises.push_back(std::move(promises_.back()));
promises_.pop_back();
Expand All @@ -105,6 +123,14 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(

*atEnd = false;

// If we don't have enough bytes to return, we wait for more data to be
// available
if (totalBytes_ < minOutputBatchBytesLocked()) {
promises_.emplace_back("ExchangeQueue::dequeue");
*future = promises_.back().getSemiFuture();
return {};
}

std::vector<std::unique_ptr<SerializedPage>> pages;
uint32_t pageBytes = 0;
for (;;) {
Expand Down
17 changes: 17 additions & 0 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ class SerializedPage {
/// for input.
class ExchangeQueue {
public:
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
explicit ExchangeQueue() : ExchangeQueue(1, 0) {}
#endif

explicit ExchangeQueue(
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes)
: numberOfConsumers_{numberOfConsumers},
minOutputBatchBytes_{minOutputBatchBytes} {
VELOX_CHECK_GE(numberOfConsumers, 1);
}

~ExchangeQueue() {
clearAllPromises();
}
Expand Down Expand Up @@ -185,6 +197,11 @@ class ExchangeQueue {
}
}

int64_t minOutputBatchBytesLocked() const;

const int32_t numberOfConsumers_;
const uint64_t minOutputBatchBytes_;

int numCompleted_{0};
int numSources_{0};
bool noMoreSources_{false};
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class MergeExchangeSource : public MergeSource {
mergeExchange->taskId(),
destination,
maxQueuedBytes,
1,
// Deliver right away to avoid blocking other sources
0,
pool,
executor)) {
client_->addRemoteTaskId(taskId);
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,8 @@ void Task::initializePartitionOutput() {
// exchange client for each merge source to fetch data as we can't mix
// the data from different sources for merging.
if (auto exchangeNodeId = factory->needsExchangeClient()) {
createExchangeClientLocked(pipeline, exchangeNodeId.value());
createExchangeClientLocked(
pipeline, exchangeNodeId.value(), factory->numDrivers);
}
}
}
Expand Down Expand Up @@ -2982,7 +2983,8 @@ bool Task::pauseRequested(ContinueFuture* future) {

void Task::createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId) {
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers) {
VELOX_CHECK_NULL(
getExchangeClientLocked(pipelineId),
"Exchange client has been created at pipeline: {} for planNode: {}",
Expand All @@ -2998,6 +3000,8 @@ void Task::createExchangeClientLocked(
taskId_,
destination_,
queryCtx()->queryConfig().maxExchangeBufferSize(),
numberOfConsumers,
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
addExchangeClientPool(planNodeId, pipelineId),
queryCtx()->executor());
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ class Task : public std::enable_shared_from_this<Task> {
// pipeline.
void createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId);
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers);

// Get a shared reference to the exchange client with the specified exchange
// plan node 'planNodeId'. The function returns null if there is no client
Expand Down
Loading

0 comments on commit 34fbab7

Please sign in to comment.