Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Avoid small batches in Exchange #12010

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ class QueryConfig {
static constexpr const char* kMaxMergeExchangeBufferSize =
"merge_exchange.max_buffer_size";

/// The minimum number of bytes to accumulate in the ExchangeQueue
/// before unblocking a consumer. This is used to avoid creating tiny
/// batches which may have a negative impact on performance when the
/// cost of creating vectors is high (for example, when there are many
/// columns). To avoid latency degradation, the exchange client unblocks a
/// consumer when 1% of the data size observed so far is accumulated.
static constexpr const char* kMinExchangeOutputBatchBytes =
arhimondr marked this conversation as resolved.
Show resolved Hide resolved
"min_exchange_output_batch_bytes";

static constexpr const char* kMaxPartialAggregationMemory =
"max_partial_aggregation_memory";

Expand Down Expand Up @@ -594,6 +603,11 @@ class QueryConfig {
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
}

uint64_t minExchangeOutputBatchBytes() const {
static constexpr uint64_t kDefault = 2UL << 20;
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
}

uint64_t preferredOutputBatchBytes() const {
static constexpr uint64_t kDefault = 10UL << 20;
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);
Expand Down
11 changes: 9 additions & 2 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ Generic Configuration
- Size of buffer in the exchange client that holds data fetched from other nodes before it is processed.
A larger buffer can increase network throughput for larger clusters and thus decrease query processing time
at the expense of reducing the amount of memory available for other usage.
* - min_exchange_output_batch_bytes
- integer
- 2MB
- The minimum number of bytes to accumulate in the ExchangeQueue before unblocking a consumer. This is used to avoid
creating tiny batches which may have a negative impact on performance when the cost of creating vectors is high
(for example, when there are many columns). To avoid latency degradation, the exchange client unblocks a consumer
when 1% of the data size observed so far is accumulated.
* - merge_exchange.max_buffer_size
- integer
- 128MB
Expand Down Expand Up @@ -670,13 +677,13 @@ Each query can override the config by setting corresponding query session proper
- Default AWS secret key to use.
* - hive.s3.endpoint
- string
-
-
- The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS.
* - hive.s3.endpoint.region
- string
- us-east-1
- The S3 storage endpoint server region. Default is set by the AWS SDK. If not configured, region will be attempted
to be parsed from the hive.s3.endpoint value.
to be parsed from the hive.s3.endpoint value.
* - hive.s3.path-style-access
- bool
- false
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Exchange::Exchange(
operatorCtx_->driverCtx()->queryConfig(),
serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
driverId_{driverCtx->driverId},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can just fetch from operatorCtx_->driverCtx()->driverId and not necessary save a copy of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to safe a couple of extra dereferences :-)

exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
Expand Down Expand Up @@ -111,8 +112,8 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) {
}

ContinueFuture dataFuture;
currentPages_ =
exchangeClient_->next(preferredOutputBatchBytes_, &atEnd_, &dataFuture);
currentPages_ = exchangeClient_->next(
driverId_, preferredOutputBatchBytes_, &atEnd_, &dataFuture);
if (!currentPages_.empty() || atEnd_) {
if (atEnd_ && noMoreSplits_) {
const auto numSplits = stats_.rlock()->numSplits;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Exchange : public SourceOperator {
/// and passing these to ExchangeClient.
const bool processSplits_;

const int driverId_;

bool noMoreSplits_ = false;

std::shared_ptr<ExchangeClient> exchangeClient_;
Expand Down
14 changes: 11 additions & 3 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ folly::F14FastMap<std::string, RuntimeMetric> ExchangeClient::stats() const {
return stats;
}

std::vector<std::unique_ptr<SerializedPage>>
ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
std::vector<std::unique_ptr<SerializedPage>> ExchangeClient::next(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future) {
std::vector<RequestSpec> requestSpecs;
std::vector<std::unique_ptr<SerializedPage>> pages;
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (closed_) {
Expand All @@ -130,7 +134,8 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
}

*atEnd = false;
pages = queue_->dequeueLocked(maxBytes, atEnd, future);
pages = queue_->dequeueLocked(
consumerId, maxBytes, atEnd, future, &stalePromise);
if (*atEnd) {
return pages;
}
Expand All @@ -143,6 +148,9 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
}

// Outside of lock
if (stalePromise.valid()) {
stalePromise.setValue();
}
request(std::move(requestSpecs));
return pages;
}
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
std::string taskId,
int destination,
int64_t maxQueuedBytes,
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes,
memory::MemoryPool* pool,
folly::Executor* executor)
: taskId_{std::move(taskId)},
destination_(destination),
maxQueuedBytes_{maxQueuedBytes},
pool_(pool),
executor_(executor),
queue_(std::make_shared<ExchangeQueue>()) {
queue_(std::make_shared<ExchangeQueue>(
numberOfConsumers,
minOutputBatchBytes)) {
VELOX_CHECK_NOT_NULL(pool_);
VELOX_CHECK_NOT_NULL(executor_);
// NOTE: the executor is used to run async response callback from the
Expand Down Expand Up @@ -91,7 +95,7 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
/// The data may be compressed, in which case 'maxBytes' applies to compressed
/// size.
std::vector<std::unique_ptr<SerializedPage>>
next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
next(int consumerId, uint32_t maxBytes, bool* atEnd, ContinueFuture* future);

std::string toString() const;

Expand Down
56 changes: 50 additions & 6 deletions velox/exec/ExchangeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/ExchangeQueue.h"
#include <algorithm>

namespace facebook::velox::exec {

Expand Down Expand Up @@ -64,6 +65,15 @@ void ExchangeQueue::close() {
clearPromises(promises);
}

int64_t ExchangeQueue::minOutputBatchBytesLocked() const {
// always allow to unblock when at end
if (atEnd_) {
return 0;
}
// At most 1% of received bytes so far to minimize latency for small exchanges
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
}

void ExchangeQueue::enqueueLocked(
std::unique_ptr<SerializedPage>&& page,
std::vector<ContinuePromise>& promises) {
Expand All @@ -86,17 +96,45 @@ void ExchangeQueue::enqueueLocked(
receivedBytes_ += page->size();

queue_.push_back(std::move(page));
if (!promises_.empty()) {
const auto minBatchSize = minOutputBatchBytesLocked();
while (!promises_.empty()) {
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size();
const int64_t unasignedBytes =
totalBytes_ - unblockedConsumers * minBatchSize;
if (unasignedBytes < minBatchSize) {
break;
}
// Resume one of the waiting drivers.
promises.push_back(std::move(promises_.back()));
promises_.pop_back();
auto it = promises_.begin();
promises.push_back(std::move(it->second));
promises_.erase(it);
}
}

void ExchangeQueue::addPromiseLocked(
int consumerId,
ContinueFuture* future,
ContinuePromise* stalePromise) {
ContinuePromise promise{"ExchangeQueue::dequeue"};
*future = promise.getSemiFuture();
auto it = promises_.find(consumerId);
if (it != promises_.end()) {
// resolve stale promises outside the lock to avoid broken promises
*stalePromise = std::move(it->second);
it->second = std::move(promise);
} else {
promises_[consumerId] = std::move(promise);
}
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
}

std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future) {
ContinueFuture* future,
ContinuePromise* stalePromise) {
VELOX_CHECK_NOT_NULL(future);
if (!error_.empty()) {
*atEnd = true;
Expand All @@ -105,15 +143,21 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(

*atEnd = false;

// If we don't have enough bytes to return, we wait for more data to be
// available
if (totalBytes_ < minOutputBatchBytesLocked()) {
addPromiseLocked(consumerId, future, stalePromise);
return {};
}

std::vector<std::unique_ptr<SerializedPage>> pages;
uint32_t pageBytes = 0;
for (;;) {
if (queue_.empty()) {
if (atEnd_) {
*atEnd = true;
} else if (pages.empty()) {
promises_.emplace_back("ExchangeQueue::dequeue");
*future = promises_.back().getSemiFuture();
addPromiseLocked(consumerId, future, stalePromise);
}
return pages;
}
Expand Down
49 changes: 46 additions & 3 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ class SerializedPage {
/// for input.
class ExchangeQueue {
public:
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
explicit ExchangeQueue() : ExchangeQueue(1, 0) {}
#endif

explicit ExchangeQueue(
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes)
: numberOfConsumers_{numberOfConsumers},
minOutputBatchBytes_{minOutputBatchBytes} {
VELOX_CHECK_GE(numberOfConsumers, 1);
}

~ExchangeQueue() {
clearAllPromises();
}
Expand Down Expand Up @@ -119,8 +131,20 @@ class ExchangeQueue {
///
/// The data may be compressed, in which case 'maxBytes' applies to compressed
/// size.
std::vector<std::unique_ptr<SerializedPage>> dequeueLocked(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future,
ContinuePromise* stalePromise);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
std::vector<std::unique_ptr<SerializedPage>>
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
return dequeueLocked(0, maxBytes, atEnd, future, &stalePromise);
}
#endif

/// Returns the total bytes held by SerializedPages in 'this'.
int64_t totalBytes() const {
Expand Down Expand Up @@ -166,6 +190,11 @@ class ExchangeQueue {
return {};
}

void addPromiseLocked(
int consumerId,
ContinueFuture* future,
ContinuePromise* stalePromise);

void clearAllPromises() {
std::vector<ContinuePromise> promises;
{
Expand All @@ -176,7 +205,14 @@ class ExchangeQueue {
}

std::vector<ContinuePromise> clearAllPromisesLocked() {
return std::move(promises_);
std::vector<ContinuePromise> promises(promises_.size());
auto it = promises_.begin();
while (it != promises_.end()) {
promises.push_back(std::move(it->second));
it = promises_.erase(it);
}
VELOX_CHECK(promises_.empty());
return promises;
}

static void clearPromises(std::vector<ContinuePromise>& promises) {
Expand All @@ -185,14 +221,21 @@ class ExchangeQueue {
}
}

int64_t minOutputBatchBytesLocked() const;

const int32_t numberOfConsumers_;
const uint64_t minOutputBatchBytes_;

int numCompleted_{0};
int numSources_{0};
bool noMoreSources_{false};
bool atEnd_{false};

std::mutex mutex_;
std::deque<std::unique_ptr<SerializedPage>> queue_;
std::vector<ContinuePromise> promises_;
// The map from consumer id to the waiting promise
folly::F14FastMap<int, ContinuePromise> promises_;
arhimondr marked this conversation as resolved.
Show resolved Hide resolved

// When set, all promises will be realized and the next dequeue will
// throw an exception with this message.
std::string error_;
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class MergeExchangeSource : public MergeSource {
mergeExchange->taskId(),
destination,
maxQueuedBytes,
1,
arhimondr marked this conversation as resolved.
Show resolved Hide resolved
// Deliver right away to avoid blocking other sources
0,
pool,
executor)) {
client_->addRemoteTaskId(taskId);
Expand All @@ -146,7 +149,7 @@ class MergeExchangeSource : public MergeSource {
}

if (!currentPage_) {
auto pages = client_->next(1, &atEnd_, future);
auto pages = client_->next(0, 1, &atEnd_, future);
VELOX_CHECK_LE(pages.size(), 1);
currentPage_ = pages.empty() ? nullptr : std::move(pages.front());

Expand Down
8 changes: 6 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,8 @@ void Task::initializePartitionOutput() {
// exchange client for each merge source to fetch data as we can't mix
// the data from different sources for merging.
if (auto exchangeNodeId = factory->needsExchangeClient()) {
createExchangeClientLocked(pipeline, exchangeNodeId.value());
createExchangeClientLocked(
pipeline, exchangeNodeId.value(), factory->numDrivers);
}
}
}
Expand Down Expand Up @@ -2982,7 +2983,8 @@ bool Task::pauseRequested(ContinueFuture* future) {

void Task::createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId) {
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers) {
VELOX_CHECK_NULL(
getExchangeClientLocked(pipelineId),
"Exchange client has been created at pipeline: {} for planNode: {}",
Expand All @@ -2998,6 +3000,8 @@ void Task::createExchangeClientLocked(
taskId_,
destination_,
queryCtx()->queryConfig().maxExchangeBufferSize(),
numberOfConsumers,
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
addExchangeClientPool(planNodeId, pipelineId),
queryCtx()->executor());
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ class Task : public std::enable_shared_from_this<Task> {
// pipeline.
void createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId);
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers);

// Get a shared reference to the exchange client with the specified exchange
// plan node 'planNodeId'. The function returns null if there is no client
Expand Down
Loading
Loading