diff --git a/docs/flex/interactive/data_import.md b/docs/flex/interactive/data_import.md index 93e460f356f0..7b7b454b5aa6 100644 --- a/docs/flex/interactive/data_import.md +++ b/docs/flex/interactive/data_import.md @@ -234,6 +234,7 @@ The table below offers a detailed breakdown of each configuration item. In this | loading_config.format.metadata.double_quote | true | Whether a quote inside a value is double-quoted | No | | loading_config.format.metadata.escaping | false | Whether escaping is used | No | | loading_config.format.metadata.escape_char | '\\' | Escaping character (if `escaping` is true) | No | +| loading_config.format.metadata.null_values | [] | Recognized spellings for null values | No | | loading_config.format.metadata.batch_size | 4MB | The size of batch for reading from files | No | | loading_config.x_csr_params.parallelism | 1 | Number of threads used for bulk loading | No | | loading_config.x_csr_params.build_csr_in_mem | false | Whether to build csr fully in memory | No | @@ -274,9 +275,10 @@ The **loading_config** section defines the primary settings for data loading. Th - loading_config.format.metadata.quoting: Whether quoting is used - loading_config.format.metadata.quote_char: Quoting character (if `quoting` is true) - loading_config.format.metadata.double_quote: Whether a quote inside a value is double-quoted - - loading_config.format.metadata.escape_char: Whether escaping is used + - loading_config.format.metadata.escaping: Whether escaping is used - loading_config.format.metadata.escape_char: Escaping character (if `escaping` is true) - loading_config.format.metadata.block_size: The size of batch for reading from files + - loading_config.format.metadata.null_values: Recognized spellings for null values ### Vertex Mappings The **vertex_mappings** section outlines how raw data maps to graph vertices based on the defined schema. diff --git a/flex/interactive/examples/modern_graph/bulk_load.yaml b/flex/interactive/examples/modern_graph/bulk_load.yaml index 6bda64715771..5e3999691b7c 100644 --- a/flex/interactive/examples/modern_graph/bulk_load.yaml +++ b/flex/interactive/examples/modern_graph/bulk_load.yaml @@ -16,6 +16,7 @@ loading_config: escaping: false block_size: 4MB batch_reader: true + null_values: [""] vertex_mappings: - type_name: person # must align with the schema diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index dbb50b8886a0..40bee19a14a7 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -214,7 +214,8 @@ static void put_column_names_option(const LoadingConfig& loading_config, bool header_row, const std::string& file_path, char delimiter, - arrow::csv::ReadOptions& read_options) { + arrow::csv::ReadOptions& read_options, + size_t len) { std::vector all_column_names; if (header_row) { all_column_names = read_header(file_path, delimiter, loading_config); @@ -242,12 +243,7 @@ static void put_column_names_option(const LoadingConfig& loading_config, << gs::to_string(all_column_names); } else { // just get the number of columns. - size_t num_cols = 0; - { - auto tmp = read_header(file_path, delimiter, loading_config); - num_cols = tmp.size(); - } - all_column_names.resize(num_cols); + all_column_names.resize(len); for (size_t i = 0; i < all_column_names.size(); ++i) { all_column_names[i] = std::string("f") + std::to_string(i); } @@ -257,6 +253,14 @@ static void put_column_names_option(const LoadingConfig& loading_config, << gs::to_string(all_column_names); } +static void put_null_values(const LoadingConfig& loading_config, + arrow::csv::ConvertOptions& convert_options) { + auto null_values = loading_config.GetNullValues(); + for (auto& null_value : null_values) { + convert_options.null_values.emplace_back(null_value); + } +} + std::shared_ptr CSVFragmentLoader::Make( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config) { @@ -384,11 +388,14 @@ void CSVFragmentLoader::fillVertexReaderMeta( put_delimiter_option(loading_config_, parse_options); bool header_row = put_skip_rows_option(loading_config_, read_options); + auto property_names = schema_.get_vertex_property_names(v_label); put_column_names_option(loading_config_, header_row, v_file, - parse_options.delimiter, read_options); + parse_options.delimiter, read_options, + property_names.size() + 1); put_escape_char_option(loading_config_, parse_options); put_quote_char_option(loading_config_, parse_options); put_block_size_option(loading_config_, read_options); + put_null_values(loading_config_, convert_options); // parse all column_names @@ -522,11 +529,16 @@ void CSVFragmentLoader::fillEdgeReaderMeta( put_delimiter_option(loading_config_, parse_options); bool header_row = put_skip_rows_option(loading_config_, read_options); + auto edge_prop_names = + schema_.get_edge_property_names(src_label_id, dst_label_id, label_id); + put_column_names_option(loading_config_, header_row, e_file, - parse_options.delimiter, read_options); + parse_options.delimiter, read_options, + edge_prop_names.size() + 2); put_escape_char_option(loading_config_, parse_options); put_quote_char_option(loading_config_, parse_options); put_block_size_option(loading_config_, read_options); + put_null_values(loading_config_, convert_options); auto src_dst_cols = loading_config_.GetEdgeSrcDstCol(src_label_id, dst_label_id, label_id); @@ -558,7 +570,11 @@ void CSVFragmentLoader::fillEdgeReaderMeta( schema_.get_edge_property_names(src_label_id, dst_label_id, label_id); for (size_t i = 0; i < edge_prop_names.size(); ++i) { auto property_name = edge_prop_names[i]; - included_col_names.emplace_back(property_name); + if (loading_config_.GetHasHeaderRow()) { + included_col_names.emplace_back(property_name); + } else { + included_col_names.emplace_back(read_options.column_names[i + 2]); + } mapped_property_names.emplace_back(property_name); } } else { @@ -581,7 +597,11 @@ void CSVFragmentLoader::fillEdgeReaderMeta( << read_options.column_names[col_id]; } } - included_col_names.emplace_back(col_name); + if (loading_config_.GetHasHeaderRow()) { + included_col_names.emplace_back(col_name); + } else { + included_col_names.emplace_back(read_options.column_names[col_id]); + } mapped_property_names.emplace_back(property_name); } } diff --git a/flex/storages/rt_mutable_graph/loading_config.cc b/flex/storages/rt_mutable_graph/loading_config.cc index ce8109f73065..b1adbde24484 100644 --- a/flex/storages/rt_mutable_graph/loading_config.cc +++ b/flex/storages/rt_mutable_graph/loading_config.cc @@ -618,6 +618,10 @@ Status parse_bulk_load_config_yaml(const YAML::Node& root, const Schema& schema, auto block_size = parse_block_size(block_size_str); load_config.metadata_[reader_options::BATCH_SIZE_KEY] = std::to_string(block_size); + } else if (key == reader_options::NULL_VALUES) { + // special case for null values + auto null_values = it->second.as>(); + load_config.null_values_ = null_values; } else { load_config.metadata_[key] = it->second.as(); } @@ -844,6 +848,10 @@ bool LoadingConfig::GetIsDoubleQuoting() const { return str == "true" || str == "True" || str == "TRUE"; } +const std::vector& LoadingConfig::GetNullValues() const { + return null_values_; +} + int32_t LoadingConfig::GetBatchSize() const { if (metadata_.find(reader_options::BATCH_SIZE_KEY) == metadata_.end()) { return reader_options::DEFAULT_BLOCK_SIZE; diff --git a/flex/storages/rt_mutable_graph/loading_config.h b/flex/storages/rt_mutable_graph/loading_config.h index 38e72927cd26..bb662ca45299 100644 --- a/flex/storages/rt_mutable_graph/loading_config.h +++ b/flex/storages/rt_mutable_graph/loading_config.h @@ -53,11 +53,12 @@ static const char* BATCH_SIZE_KEY = "batch_size"; // whether or not to use record batch reader. If true, the reader will read // data in batches, otherwise, the reader will read data row by row. static const char* BATCH_READER = "batch_reader"; +static const char* NULL_VALUES = "null_values"; static const std::unordered_set CSV_META_KEY_WORDS = { DELIMITER, HEADER_ROW, INCLUDE_COLUMNS, COLUMN_TYPES, ESCAPING, ESCAPE_CHAR, QUOTING, QUOTE_CHAR, - DOUBLE_QUOTE, BATCH_SIZE_KEY, BATCH_READER}; + DOUBLE_QUOTE, BATCH_SIZE_KEY, BATCH_READER, NULL_VALUES}; } // namespace reader_options @@ -131,6 +132,7 @@ class LoadingConfig { char GetQuotingChar() const; bool GetIsQuoting() const; bool GetIsDoubleQuoting() const; + const std::vector& GetNullValues() const; int32_t GetBatchSize() const; bool GetIsBatchReader() const; std::string GetMetaData(const std::string& key) const; @@ -179,6 +181,8 @@ class LoadingConfig { bool build_csr_in_mem_; // Whether to build csr in memory bool use_mmap_vector_; // Whether to use mmap vector + std::vector null_values_; + // meta_data, stores all the meta info about loading std::unordered_map metadata_;