Skip to content

Commit

Permalink
fix(interactive): Align Data Type in GIE Physical Pb with Flex (#4367)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
1. support flex type system in GIE physical
[proto](https://github.com/shirly121/GraphScope/blob/ir_align_type/interactive_engine/executor/ir/proto/basic_type.proto).
2. support type conversion among different type systems, we have 3 type
systems currently:
a. Groot: defined in
[proto](https://github.com/shirly121/GraphScope/blob/ir_align_type/proto/schema_common.proto)
b. Flex: defined in
[proto](https://github.com/shirly121/GraphScope/blob/ir_align_type/interactive_engine/executor/ir/proto/basic_type.proto)
c. Calcite, defined in
[java](https://github.com/shirly121/GraphScope/blob/ir_align_type/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/type/GraphTypeFactoryImpl.java)
   
The Calcite type system serves as the foundation for type inference in
the compiler. Consequently, any other type system (e.g., Groot or Flex)
must be convertible to Calcite.

The
[IrDataTypeConvertor](https://github.com/alibaba/GraphScope/pull/4367/files#diff-6844283ba782602bd37d9f17f56646af7188f4ba849abc5ac8d1f404ce122e22)
plays a key role in this process. It defines the bidirectional type
conversions between Calcite and the other two type systems.

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #4362

---------

Co-authored-by: BingqingLyu <[email protected]>
Co-authored-by: xiaolei.zl <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent bbc7786 commit 3b0e2f7
Show file tree
Hide file tree
Showing 115 changed files with 3,936 additions and 3,888 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,9 @@ jobs:
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
sed -i 's/interactive_workspace/temp_workspace/g' ./interactive_config_test.yaml
# set thread_num_per_worker to 4
sed -i 's/thread_num_per_worker: 1/thread_num_per_worker: 4/g' ./interactive_config_test.yaml
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml java
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml python
sed -i 's/temp_workspace/interactive_workspace/g' ./interactive_config_test.yaml
sed -i 's/thread_num_per_worker: 4/thread_num_per_worker: 1/g' ./interactive_config_test.yaml
- name: Robustness test
env:
Expand Down
19 changes: 16 additions & 3 deletions flex/codegen/src/codegen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,23 @@ std::string generate_output_list(std::string input_name, int32_t input_size,
// check type consistent
bool data_type_consistent(const common::DataType& left,
const common::DataType& right) {
if (left == common::DataType::NONE || right == common::DataType::NONE) {
return true;
if (left.item_case() == common::DataType::ITEM_NOT_SET) {
return false;
}
if (left.item_case() != right.item_case()) {
return false;
}
if (left.item_case() == common::DataType::kPrimitiveType) {
return left.primitive_type() == right.primitive_type();
} else if (left.item_case() == common::DataType::kArray ||
left.item_case() == common::DataType::kMap) {
LOG(FATAL) << "Not support list or map type";
} else if (left.item_case() == common::DataType::kString) {
return true; // string type is always consistent
} else {
LOG(FATAL) << "Unexpected data type";
return false;
}
return left == right;
}

std::tuple<std::string, std::string> decode_param_from_decoder(
Expand Down
127 changes: 88 additions & 39 deletions flex/codegen/src/graph_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <type_traits>

#include "flex/codegen/src/string_utils.h"
#include "flex/proto_generated_gie/basic_type.pb.h"
#include "flex/proto_generated_gie/common.pb.h"
#include "glog/logging.h"
#include "google/protobuf/any.h"
Expand Down Expand Up @@ -63,62 +64,109 @@ inline bool operator==(const ParamConst& lhs, const ParamConst& rhs) {

} // namespace codegen

static codegen::DataType common_data_type_pb_2_data_type(
const common::DataType& data_type) {
switch (data_type) {
case common::DataType::INT32:
static codegen::DataType primitive_type_to_data_type(
const common::PrimitiveType& type) {
switch (type) {
case common::PrimitiveType::DT_SIGNED_INT32:
return codegen::DataType::kInt32;
case common::DataType::INT64:
case common::PrimitiveType::DT_SIGNED_INT64:
return codegen::DataType::kInt64;
case common::DataType::DOUBLE:
case common::PrimitiveType::DT_FLOAT:
return codegen::DataType::kFloat;
case common::PrimitiveType::DT_DOUBLE:
return codegen::DataType::kDouble;
case common::DataType::STRING:
return codegen::DataType::kString;
case common::DataType::INT64_ARRAY:
return codegen::DataType::kInt64Array;
case common::DataType::INT32_ARRAY:
return codegen::DataType::kInt32Array;
case common::DataType::BOOLEAN:
case common::PrimitiveType::DT_BOOL:
return codegen::DataType::kBoolean;
case common::DataType::DATE32:
default:
// LOG(FATAL) << "unknown primitive type";
throw std::runtime_error(
"unknown primitive type when converting primitive type to data type:" +
std::to_string(static_cast<int>(type)));
}
}

static codegen::DataType temporal_type_to_data_type(
const common::Temporal& type) {
switch (type.item_case()) {
case common::Temporal::ItemCase::kDate:
return codegen::DataType::kDate;
case common::DataType::TIME32:
case common::Temporal::ItemCase::kTime:
return codegen::DataType::kTime;
case common::DataType::TIMESTAMP:
case common::Temporal::kTimestamp:
return codegen::DataType::kTimeStamp;
default:
throw std::runtime_error(
"unknown temporal type when converting temporal type to data type:" +
std::to_string(static_cast<int>(type.item_case())));
}
}

static codegen::DataType common_data_type_pb_2_data_type(
const common::DataType& data_type) {
switch (data_type.item_case()) {
case common::DataType::ItemCase::kPrimitiveType:
return primitive_type_to_data_type(data_type.primitive_type());
case common::DataType::ItemCase::kDecimal:
LOG(FATAL) << "Not support decimal type";
case common::DataType::ItemCase::kString:
return codegen::DataType::kString;
case common::DataType::ItemCase::kTemporal:
return temporal_type_to_data_type(data_type.temporal());
case common::DataType::ItemCase::kArray:
case common::DataType::ItemCase::kMap:
LOG(FATAL) << "Not support array or map type";
default:
// LOG(FATAL) << "unknown data type";
throw std::runtime_error(
"unknown data type when converting common_data_type to inner data "
"type:" +
std::to_string(static_cast<int>(data_type)));
data_type.DebugString());
}
}

static std::string single_common_data_type_pb_2_str(
const common::DataType& data_type) {
switch (data_type) {
case common::DataType::BOOLEAN:
return "bool";
case common::DataType::INT32:
static std::string primitive_type_to_str(const common::PrimitiveType& type) {
switch (type) {
case common::PrimitiveType::DT_SIGNED_INT32:
return "int32_t";
case common::DataType::INT64:
case common::PrimitiveType::DT_UNSIGNED_INT32:
return "uint32_t";
case common::PrimitiveType::DT_SIGNED_INT64:
return "int64_t";
case common::DataType::DOUBLE:
case common::PrimitiveType::DT_UNSIGNED_INT64:
return "uint64_t";
case common::PrimitiveType::DT_FLOAT:
return "float";
case common::PrimitiveType::DT_DOUBLE:
return "double";
case common::DataType::STRING:
case common::PrimitiveType::DT_BOOL:
return "bool";
default:
// LOG(FATAL) << "unknown primitive type";
throw std::runtime_error(
"unknown primitive type when converting primitive type to string:" +
std::to_string(static_cast<int>(type)));
}
}

static std::string single_common_data_type_pb_2_str(
const common::DataType& data_type) {
switch (data_type.item_case()) {
case common::DataType::ItemCase::kPrimitiveType:
return primitive_type_to_str(data_type.primitive_type());
case common::DataType::ItemCase::kDecimal:
LOG(FATAL) << "Not support decimal type";
case common::DataType::ItemCase::kString:
return "std::string_view";
case common::DataType::INT64_ARRAY:
return "std::vector<int64_t>";
case common::DataType::INT32_ARRAY:
return "std::vector<int32_t>";
case common::DataType::DATE32:
return "Date";
case common::DataType::ItemCase::kTemporal:
LOG(FATAL) << "Not support temporal type";
case common::DataType::ItemCase::kArray:
case common::DataType::ItemCase::kMap:
LOG(FATAL) << "Not support array or map type";
// TODO: support time32 and timestamp
default:
throw std::runtime_error(
"unknown data type when convert common data type to string:" +
std::to_string(static_cast<int>(data_type)));
data_type.DebugString());
}
}

Expand Down Expand Up @@ -266,21 +314,22 @@ static std::string data_type_2_rust_string(const codegen::DataType& data_type) {
}

static common::DataType common_value_2_data_type(const common::Value& value) {
common::DataType ret;
switch (value.item_case()) {
case common::Value::kI32:
return common::DataType::INT32;
ret.set_primitive_type(common::PrimitiveType::DT_SIGNED_INT32);
case common::Value::kI64:
return common::DataType::INT64;
ret.set_primitive_type(common::PrimitiveType::DT_SIGNED_INT64);
case common::Value::kBoolean:
return common::DataType::BOOLEAN;
ret.set_primitive_type(common::PrimitiveType::DT_BOOL);
case common::Value::kF64:
return common::DataType::DOUBLE;
ret.set_primitive_type(common::PrimitiveType::DT_DOUBLE);
case common::Value::kStr:
return common::DataType::STRING;
ret.mutable_string()->mutable_long_text();
default:
LOG(FATAL) << "unknown value" << value.DebugString();
}
return common::DataType::NONE;
return ret;
}

static void parse_param_const_from_pb(
Expand Down
5 changes: 2 additions & 3 deletions flex/codegen/src/hqps/hqps_case_when_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ class CaseWhenBuilder : public ExprBuilder {

std::string str = formater.str();

return std::make_tuple(
class_name_, construct_params_, tag_selectors_, str,
std::vector{common::DataType::DataType_INT_MIN_SENTINEL_DO_NOT_USE_});
return std::make_tuple(class_name_, construct_params_, tag_selectors_, str,
std::vector{common::DataType()});
}

protected:
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_edge_expand_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ static void BuildExprFromPredicate(BuildingContext& ctx,
std::string& func_construct_params_str,
std::string& property_selectors_str) {
auto expr_builder = ExprBuilder(ctx);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType type;
type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(type);
expr_builder.AddAllExprOpr(expr.operators());
std::string expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_get_v_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ class GetVOpBuilder {

auto& expr_oprs = expr.operators();
expr_builder.AddAllExprOpr(expr_oprs);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType data_type;
data_type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(data_type);
std::vector<common::DataType> unused_expr_ret_type;
if (!expr_builder.empty()) {
std::tie(expr_name_, expr_call_param_, tag_properties_, expr_code_,
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ class ScanOpBuilder {

// TODO: make expr_builder a member of ScanOpBuilder
// auto expr_builder = ExprBuilder(ctx_);
expr_builder_.set_return_type(common::DataType::BOOLEAN);
common::DataType type;
type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder_.set_return_type(common::DataType(type));
// Add extra (, ) to wrap the code, since we may append index_predicate
// afterwards.
common::ExprOpr left_brace, right_brace;
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_select_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class SelectOpBuilder {

SelectOpBuilder& expr(const common::Expression expr) {
ExprBuilder expr_builder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType data_type;
data_type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(data_type);
expr_builder.AddAllExprOpr(expr.operators());

std::string func_code;
Expand Down
29 changes: 19 additions & 10 deletions flex/codegen/src/pegasus/pegasus_order_by_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,29 @@ class OrderByOpBuilder {
ss << ".then(";
}
std::string cmp_type;
switch (data_type) {
case common::DataType::BOOLEAN:
case common::DataType::INT32:
case common::DataType::INT64:
case common::DataType::STRING: {
cmp_type = "cmp";
break;
switch (data_type.item_case()) {
case common::DataType::kPrimitiveType: {
switch (data_type.primitive_type()) {
case common::PrimitiveType::DT_BOOL:
case common::PrimitiveType::DT_SIGNED_INT32:
case common::PrimitiveType::DT_SIGNED_INT64:
cmp_type = "cmp";
break;
case common::PrimitiveType::DT_DOUBLE: {
cmp_type = "partial_cmp";
break;
}
default:
LOG(FATAL) << "Unsupported type "
<< static_cast<int32_t>(data_type.primitive_type());
}
}
case common::DataType::DOUBLE: {
cmp_type = "partial_cmp";
case common::DataType::kString: {
cmp_type = "cmp";
break;
}
default:
LOG(FATAL) << "Unsupported type " << data_type;
LOG(FATAL) << "Unsupported type " << data_type.DebugString();
}
std::string reverse_str;
if (ordering_pair_[i].order() == algebra::OrderBy_OrderingPair_Order::
Expand Down
24 changes: 17 additions & 7 deletions flex/codegen/src/pegasus/pegasus_project_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,24 @@ class ProjectOpBuilder {
ctx_.SetOutput(i, data_types);
} else if (column_meta.type().type_case() ==
common::IrDataType::kDataType) {
switch (column_meta.type().data_type()) {
case common::DataType::INT64: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kInt64);
ctx_.SetOutput(i, data_types);
break;
switch (column_meta.type().data_type().item_case()) {
case common::DataType::kPrimitiveType: {
auto data_type = column_meta.type().data_type().primitive_type();
switch (data_type) {
case common::PrimitiveType::DT_SIGNED_INT64: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kInt64);
ctx_.SetOutput(i, data_types);
break;
}
default: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kString);
ctx_.SetOutput(i, data_types);
}
}
}
case common::DataType::STRING: {
case common::DataType::kString: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kString);
ctx_.SetOutput(i, data_types);
Expand Down
Loading

0 comments on commit 3b0e2f7

Please sign in to comment.