Skip to content

Commit

Permalink
feat: Extend connector interface to support index lookup (#12187)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12187

Extend connector interface to support create index source for index join lookup
Add IndexSource interface which provides the lookup interface between velox operator
and backend index table
Add lookup request and result data structures for lookup operation with index sourc

Reviewed By: mbasmanova

Differential Revision: D68612814

fbshipit-source-id: 0f0900829dcafee8bf6c95d58627095b74be4757
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jan 28, 2025
1 parent cb6e652 commit 805fe4b
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 3 deletions.
122 changes: 122 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::core {
class ITypedExpr;
}

namespace facebook::velox::connector {

class DataSource;
Expand Down Expand Up @@ -287,6 +291,73 @@ class DataSource {
}
};

class IndexSource {
public:
virtual ~IndexSource() = default;

/// Represents a lookup request for a given input.
struct LookupRequest {
/// Contains the input column vectors used by lookup join and range
/// conditions.
RowVectorPtr input;

explicit LookupRequest(RowVectorPtr input) : input(std::move(input)) {}
};

/// Represents the lookup result for a subset of input produced by the
/// 'LookupResultIterator'.
struct LookupResult {
/// Specifies the indices of input row in the lookup request that have
/// matches in 'output'. It contains the input indices in the order
/// of the input rows in the lookup request. Any gap in the indices means
/// the input rows that has no matches in output.
///
/// Example:
/// LookupRequest: input = [0, 1, 2, 3, 4]
/// LookupResult: inputHits = [0, 0, 2, 2, 3, 4, 4, 4]
/// output = [0, 1, 2, 3, 4, 5, 6, 7]
///
/// Here is match results for each input row:
/// input row #0: match with output rows #0 and #1.
/// input row #1: no matches
/// input row #2: match with output rows #2 and #3.
/// input row #3: match with output row #4.
/// input row #4: match with output rows #5, #6 and #7.
///
/// 'LookupResultIterator' must also produce the output result in order of
/// input rows.
BufferPtr inputHits;

/// Contains the lookup result rows.
RowVectorPtr output;

LookupResult(BufferPtr _inputHits, RowVectorPtr _output)
: inputHits(std::move(_inputHits)), output(std::move(_output)) {
VELOX_CHECK_EQ(inputHits->size() / sizeof(int32_t), output->size());
}
};

/// The lookup result iterator used to fetch the lookup result in batch for a
/// given lookup request.
class LookupResultIterator {
public:
virtual ~LookupResultIterator() = default;

/// Invoked to fetch update to 'size' number of output rows. Returns nullptr
/// if all the lookup results have been fetched. Returns std::nullopt and
/// sets the 'future' if started asynchronous work and needs to wait for it
/// to complete to continue processing. The caller will wait for the
/// 'future' to complete before calling 'next' again.
virtual std::optional<std::shared_ptr<LookupResult>> next(
vector_size_t size,
velox::ContinueFuture& future) = 0;
};

virtual LookupResultIterator lookup(const LookupRequest& request) = 0;

virtual std::unordered_map<std::string, RuntimeCounter> runtimeStats() = 0;
};

/// Collection of context data for use in a DataSource, IndexSource or DataSink.
/// One instance of this per DataSource and DataSink. This may be passed between
/// threads but methods must be invoked sequentially. Serializing use is the
Expand Down Expand Up @@ -479,6 +550,57 @@ class Connector {
return false;
}

/// Creates index source for index join lookup.
/// @param inputType The list of probe-side columns that either used in
/// equi-clauses or join conditions.
/// @param numJoinKeys The number of key columns used in join equi-clauses.
/// The first 'numJoinKeys' columns in 'inputType' form a prefix of the
/// index, and the rest of the columns in inputType are expected to be used in
/// 'joinCondition'.
/// @param joinConditions The join conditions. It expects inputs columns from
/// the 'tail' of 'inputType' and from 'columnHandles'.
/// @param outputType The lookup output type from index source.
/// @param tableHandle The index table handle.
/// @param columnHandles The column handles which maps from column name
/// used in 'outputType' and 'joinConditions' to the corresponding column
/// handles in the index table.
/// @param connectorQueryCtx The query context.
///
/// Here is an example that how the lookup join operator uses index source:
///
/// SELECT t.sid, t.day_ts, u.event_value
/// FROM t LEFT JOIN u
/// ON t.sid = u.sid
/// AND contains(t.event_list, u.event_type)
/// AND t.ds BETWEEN '2024-01-01' AND '2024-01-07'
///
/// Here,
/// - 'inputType' is ROW{t.sid, t.event_list}
/// - 'numJoinKeys' is 1 since only t.sid is used in join equi-clauses.
/// - 'joinConditions' is list of one expression: contains(t.event_list,
/// u.event_type)
/// - 'outputType' is ROW{u.event_value}
/// - 'tableHandle' specifies the metadata of the index table.
/// - 'columnHandles' is a map from 'u.event_type' (in 'joinConditions') and
/// 'u.event_value' (in 'outputType') to the actual column names in the
/// index table.
/// - 'connectorQueryCtx' provide the connector query execution context.
///
virtual std::unique_ptr<IndexSource> createIndexSource(
const RowTypePtr& inputType,
size_t numJoinKeys,
const std::vector<std::shared_ptr<const core::ITypedExpr>>&
joinConditions,
const RowTypePtr& outputType,
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
VELOX_UNSUPPORTED(
"Connector {} does not support index source", connectorId());
}

virtual std::unique_ptr<DataSink> createDataSink(
RowTypePtr inputType,
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
Expand Down
4 changes: 1 addition & 3 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1776,9 +1776,7 @@ class MergeJoinNode : public AbstractJoinNode {
/// right side, it can only use one index column. Each index column can either
/// be a join key or a join condition once. The table scan node of the right
/// input is translated to a connector::IndexSource within
/// exec::IndexLookupJoin.
///
/// Only INNER and LEFT joins are supported.
/// exec::IndexLookupJoin. Only INNER and LEFT joins are supported.
///
/// Take the following query for example, t is left table, r is the right table
/// with indexed columns. 'sid' is the join keys. 'u.event_type in t.event_list'
Expand Down

0 comments on commit 805fe4b

Please sign in to comment.