Skip to content

Commit

Permalink
[C++] Support property filter pushdown by utilizing payload file form…
Browse files Browse the repository at this point in the history
…ats (#178)


---------

Signed-off-by: Ziy1-Tan <[email protected]>
Co-authored-by: Weibin Zeng <[email protected]>
  • Loading branch information
Ziy1-Tan and acezen authored Jul 24, 2023
1 parent f4d01b4 commit 4968568
Show file tree
Hide file tree
Showing 13 changed files with 931 additions and 77 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ macro(build_gar)
if(APPLE)
target_link_libraries(gar PRIVATE -Wl,-force_load gar_arrow_static
"${GAR_PARQUET_STATIC_LIB}"
"${GAR_DATASET_STATIC_LIB}"
"${GAR_ACERO_STATIC_LIB}"
"${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}")
else()
target_link_libraries(gar PRIVATE -Wl,--exclude-libs,ALL -Wl,--whole-archive gar_arrow_static
"${GAR_PARQUET_STATIC_LIB}"
"${GAR_DATASET_STATIC_LIB}"
"${GAR_ARROW_ACERO_STATIC_LIB}"
"${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}" -Wl,--no-whole-archive)
endif()
Expand Down
10 changes: 9 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ function(build_arrow)
set(GAR_PARQUET_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}parquet${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_PARQUET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_PARQUET_STATIC_LIB_FILENAME}" CACHE INTERNAL "parquet lib")
set(GAR_DATASET_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_dataset${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_DATASET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_DATASET_STATIC_LIB_FILENAME}" CACHE INTERNAL "arrow dataset lib")
set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_bundled_dependencies${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB
Expand Down Expand Up @@ -83,7 +86,7 @@ function(build_arrow)
"-DARROW_S3=ON")

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}" "${GAR_DATASET_STATIC_LIB}")

find_package(Threads)
find_package(Arrow QUIET)
Expand All @@ -104,16 +107,21 @@ function(build_arrow)

set(GAR_ARROW_LIBRARY_TARGET gar_arrow_static)
set(GAR_PARQUET_LIBRARY_TARGET gar_parquet_static)
set(GAR_DATASET_LIBRARY_TARGET gar_dataset_static)

file(MAKE_DIRECTORY "${GAR_ARROW_INCLUDE_DIR}")
add_library(${GAR_ARROW_LIBRARY_TARGET} STATIC IMPORTED)
add_library(${GAR_PARQUET_LIBRARY_TARGET} STATIC IMPORTED)
add_library(${GAR_DATASET_LIBRARY_TARGET} STATIC IMPORTED)
set_target_properties(${GAR_ARROW_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_ARROW_STATIC_LIB})
set_target_properties(${GAR_PARQUET_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_PARQUET_STATIC_LIB})
set_target_properties(${GAR_DATASET_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_DATASET_STATIC_LIB})
if (ARROW_VERSION_TO_BUILD GREATER_EQUAL "12.0.0")
set(GAR_ARROW_ACERO_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_acero${CMAKE_STATIC_LIBRARY_SUFFIX}")
Expand Down
12 changes: 12 additions & 0 deletions cpp/include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <vector>

#include "utils/adj_list_type.h"
Expand All @@ -43,6 +44,11 @@ struct Property {
std::string name; // property name
DataType type; // property data type
bool is_primary; // primary key tag

Property() {}
explicit Property(const std::string& name) : name(name) {}
Property(const std::string& name, const DataType& type, bool is_primary)
: name(name), type(type), is_primary(is_primary) {}
};

static bool operator==(const Property& lhs, const Property& rhs) {
Expand Down Expand Up @@ -87,6 +93,7 @@ class PropertyGroup {
std::vector<std::string> names;
for (auto& property : properties_) {
names.push_back(property.name);
property_names_.insert(property.name);
}
prefix_ = util::ConcatStringWithDelimiter(names, REGULAR_SEPERATOR) + "/";
}
Expand Down Expand Up @@ -121,6 +128,10 @@ class PropertyGroup {
return properties_;
}

inline bool ContainProperty(const std::string& property_name) const {
return property_names_.find(property_name) != property_names_.end();
}

/** Get the file type of property group chunk file.
*
* @return The file type of group.
Expand All @@ -146,6 +157,7 @@ class PropertyGroup {

private:
std::vector<Property> properties_;
std::unordered_set<std::string> property_names_;
FileType file_type_;
std::string prefix_;
};
Expand Down
79 changes: 62 additions & 17 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ limitations under the License.
#include "gar/graph_info.h"
#include "gar/utils/data_type.h"
#include "gar/utils/filesystem.h"
#include "gar/utils/reader_utils.h"
#include "gar/utils/result.h"
#include "gar/utils/status.h"
#include "gar/utils/utils.h"
Expand Down Expand Up @@ -52,18 +51,22 @@ class VertexPropertyArrowChunkReader {
VertexPropertyArrowChunkReader(const VertexInfo& vertex_info,
const PropertyGroup& property_group,
const std::string& prefix,
IdType chunk_index = 0)
IdType chunk_index = 0,
const utils::FilterOptions& options = {})
: vertex_info_(vertex_info),
property_group_(property_group),
chunk_index_(chunk_index),
seek_id_(chunk_index * vertex_info.GetChunkSize()),
chunk_table_(nullptr) {
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
vertex_info.GetPathPrefix(property_group));
std::string base_dir = prefix_ + pg_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
utils::GetVertexChunkNum(prefix_, vertex_info));
GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
utils::GetVertexNum(prefix_, vertex_info_));
}

/**
Expand Down Expand Up @@ -126,14 +129,32 @@ class VertexPropertyArrowChunkReader {
*/
IdType GetChunkNum() const noexcept { return chunk_num_; }

/**
* @brief Apply the row filter to the table. No parameter call Filter() will
* clear the filter.
*
* @param filter Predicate expression to filter rows.
*/
void Filter(utils::Filter filter = nullptr);

/**
* @brief Apply the projection to the table to be read. No parameter call
* Select() will clear the projection.
*
* @param column_names The name of columns to be selected.
*/
void Select(utils::ColumnNames column_names = std::nullopt);

private:
VertexInfo vertex_info_;
PropertyGroup property_group_;
std::string prefix_;
IdType chunk_index_;
IdType seek_id_;
IdType chunk_num_;
IdType vertex_num_;
std::shared_ptr<arrow::Table> chunk_table_;
utils::FilterOptions filter_options_;
std::shared_ptr<FileSystem> fs_;
};

Expand Down Expand Up @@ -227,7 +248,8 @@ class AdjListArrowChunkReader {
}

/**
* @brief Return the current chunk of chunk position indicator as arrow::Table
* @brief Return the current chunk of chunk position indicator as
* arrow::Table
*/
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;

Expand Down Expand Up @@ -420,7 +442,8 @@ class AdjListPropertyArrowChunkReader {
* @brief Initialize the AdjListPropertyArrowChunkReader.
*
* @param edge_info The edge info that describes the edge type.
* @param property_group The property group that describes the property group.
* @param property_group The property group that describes the property
* group.
* @param adj_list_type The adj list type for the edges.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
Expand All @@ -429,15 +452,17 @@ class AdjListPropertyArrowChunkReader {
const PropertyGroup& property_group,
AdjListType adj_list_type,
const std::string prefix,
IdType vertex_chunk_index = 0)
IdType vertex_chunk_index = 0,
const utils::FilterOptions& options = {})
: edge_info_(edge_info),
property_group_(property_group),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr) {
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
Expand All @@ -463,6 +488,7 @@ class AdjListPropertyArrowChunkReader {
chunk_index_(other.chunk_index_),
seek_offset_(other.seek_offset_),
chunk_table_(nullptr),
filter_options_(other.filter_options_),
vertex_chunk_num_(other.vertex_chunk_num_),
chunk_num_(other.chunk_num_),
base_dir_(other.base_dir_),
Expand Down Expand Up @@ -506,7 +532,8 @@ class AdjListPropertyArrowChunkReader {
}

/**
* @brief Return the current chunk of chunk position indicator as arrow::Table
* @brief Return the current chunk of chunk position indicator as
* arrow::Table
*/
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;

Expand Down Expand Up @@ -564,6 +591,22 @@ class AdjListPropertyArrowChunkReader {
return Status::OK();
}

/**
* @brief Apply the row filter to the table. No parameter call Filter() will
* clear the filter.
*
* @param filter Predicate expression to filter rows.
*/
void Filter(utils::Filter filter = nullptr);

/**
* @brief Apply the projection to the table to be read. No parameter call
* Select() will clear the projection.
*
* @param column_names The name of columns to be selected.
*/
void Select(utils::ColumnNames column_names = std::nullopt);

private:
EdgeInfo edge_info_;
PropertyGroup property_group_;
Expand All @@ -572,6 +615,7 @@ class AdjListPropertyArrowChunkReader {
IdType vertex_chunk_index_, chunk_index_;
IdType seek_offset_;
std::shared_ptr<arrow::Table> chunk_table_;
utils::FilterOptions filter_options_;
IdType vertex_chunk_num_, chunk_num_;
std::string base_dir_;
std::shared_ptr<FileSystem> fs_;
Expand All @@ -587,15 +631,16 @@ class AdjListPropertyArrowChunkReader {
static inline Result<VertexPropertyArrowChunkReader>
ConstructVertexPropertyArrowChunkReader(
const GraphInfo& graph_info, const std::string& label,
const PropertyGroup& property_group) noexcept {
const PropertyGroup& property_group,
const utils::FilterOptions& options = {}) noexcept {
VertexInfo vertex_info;
GAR_ASSIGN_OR_RAISE(vertex_info, graph_info.GetVertexInfo(label));
if (!vertex_info.ContainPropertyGroup(property_group)) {
return Status::KeyError("No property group ", property_group, " in vertex ",
label, ".");
}
return VertexPropertyArrowChunkReader(vertex_info, property_group,
graph_info.GetPrefix());
graph_info.GetPrefix(), 0, options);
}

/**
Expand Down Expand Up @@ -663,12 +708,11 @@ ConstructAdjListOffsetArrowChunkReader(const GraphInfo& graph_info,
* @param adj_list_type The adj list type for the edges.
*/
static inline Result<AdjListPropertyArrowChunkReader>
ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info,
const std::string& src_label,
const std::string& edge_label,
const std::string& dst_label,
const PropertyGroup& property_group,
AdjListType adj_list_type) noexcept {
ConstructAdjListPropertyArrowChunkReader(
const GraphInfo& graph_info, const std::string& src_label,
const std::string& edge_label, const std::string& dst_label,
const PropertyGroup& property_group, AdjListType adj_list_type,
const utils::FilterOptions& options = {}) noexcept {
EdgeInfo edge_info;
GAR_ASSIGN_OR_RAISE(edge_info,
graph_info.GetEdgeInfo(src_label, edge_label, dst_label));
Expand All @@ -683,7 +727,8 @@ ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info,
AdjListTypeToString(adj_list_type), ".");
}
return AdjListPropertyArrowChunkReader(edge_info, property_group,
adj_list_type, graph_info.GetPrefix());
adj_list_type, graph_info.GetPrefix(),
0, options);
}

} // namespace GAR_NAMESPACE_INTERNAL
Expand Down
Loading

0 comments on commit 4968568

Please sign in to comment.