Skip to content

Commit

Permalink
fix(interactive): fix bulkloading data from csv files without header …
Browse files Browse the repository at this point in the history
…and add null_values option for arrow (#4083)

as titled.
  • Loading branch information
liulx20 authored Jul 25, 2024
1 parent d191184 commit 4a69336
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 13 deletions.
4 changes: 3 additions & 1 deletion docs/flex/interactive/data_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ The table below offers a detailed breakdown of each configuration item. In this
| loading_config.format.metadata.double_quote | true | Whether a quote inside a value is double-quoted | No |
| loading_config.format.metadata.escaping | false | Whether escaping is used | No |
| loading_config.format.metadata.escape_char | '\\' | Escaping character (if `escaping` is true) | No |
| loading_config.format.metadata.null_values | [] | Recognized spellings for null values | No |
| loading_config.format.metadata.batch_size | 4MB | The size of batch for reading from files | No |
| loading_config.x_csr_params.parallelism | 1 | Number of threads used for bulk loading | No |
| loading_config.x_csr_params.build_csr_in_mem | false | Whether to build csr fully in memory | No |
Expand Down Expand Up @@ -274,9 +275,10 @@ The **loading_config** section defines the primary settings for data loading. Th
- loading_config.format.metadata.quoting: Whether quoting is used
- loading_config.format.metadata.quote_char: Quoting character (if `quoting` is true)
- loading_config.format.metadata.double_quote: Whether a quote inside a value is double-quoted
- loading_config.format.metadata.escape_char: Whether escaping is used
- loading_config.format.metadata.escaping: Whether escaping is used
- loading_config.format.metadata.escape_char: Escaping character (if `escaping` is true)
- loading_config.format.metadata.block_size: The size of batch for reading from files
- loading_config.format.metadata.null_values: Recognized spellings for null values

### Vertex Mappings
The **vertex_mappings** section outlines how raw data maps to graph vertices based on the defined schema.
Expand Down
1 change: 1 addition & 0 deletions flex/interactive/examples/modern_graph/bulk_load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ loading_config:
escaping: false
block_size: 4MB
batch_reader: true
null_values: [""]

vertex_mappings:
- type_name: person # must align with the schema
Expand Down
42 changes: 31 additions & 11 deletions flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ static void put_column_names_option(const LoadingConfig& loading_config,
bool header_row,
const std::string& file_path,
char delimiter,
arrow::csv::ReadOptions& read_options) {
arrow::csv::ReadOptions& read_options,
size_t len) {
std::vector<std::string> all_column_names;
if (header_row) {
all_column_names = read_header(file_path, delimiter, loading_config);
Expand Down Expand Up @@ -242,12 +243,7 @@ static void put_column_names_option(const LoadingConfig& loading_config,
<< gs::to_string(all_column_names);
} else {
// just get the number of columns.
size_t num_cols = 0;
{
auto tmp = read_header(file_path, delimiter, loading_config);
num_cols = tmp.size();
}
all_column_names.resize(num_cols);
all_column_names.resize(len);
for (size_t i = 0; i < all_column_names.size(); ++i) {
all_column_names[i] = std::string("f") + std::to_string(i);
}
Expand All @@ -257,6 +253,14 @@ static void put_column_names_option(const LoadingConfig& loading_config,
<< gs::to_string(all_column_names);
}

static void put_null_values(const LoadingConfig& loading_config,
arrow::csv::ConvertOptions& convert_options) {
auto null_values = loading_config.GetNullValues();
for (auto& null_value : null_values) {
convert_options.null_values.emplace_back(null_value);
}
}

std::shared_ptr<IFragmentLoader> CSVFragmentLoader::Make(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config) {
Expand Down Expand Up @@ -384,11 +388,14 @@ void CSVFragmentLoader::fillVertexReaderMeta(

put_delimiter_option(loading_config_, parse_options);
bool header_row = put_skip_rows_option(loading_config_, read_options);
auto property_names = schema_.get_vertex_property_names(v_label);
put_column_names_option(loading_config_, header_row, v_file,
parse_options.delimiter, read_options);
parse_options.delimiter, read_options,
property_names.size() + 1);
put_escape_char_option(loading_config_, parse_options);
put_quote_char_option(loading_config_, parse_options);
put_block_size_option(loading_config_, read_options);
put_null_values(loading_config_, convert_options);

// parse all column_names

Expand Down Expand Up @@ -522,11 +529,16 @@ void CSVFragmentLoader::fillEdgeReaderMeta(

put_delimiter_option(loading_config_, parse_options);
bool header_row = put_skip_rows_option(loading_config_, read_options);
auto edge_prop_names =
schema_.get_edge_property_names(src_label_id, dst_label_id, label_id);

put_column_names_option(loading_config_, header_row, e_file,
parse_options.delimiter, read_options);
parse_options.delimiter, read_options,
edge_prop_names.size() + 2);
put_escape_char_option(loading_config_, parse_options);
put_quote_char_option(loading_config_, parse_options);
put_block_size_option(loading_config_, read_options);
put_null_values(loading_config_, convert_options);

auto src_dst_cols =
loading_config_.GetEdgeSrcDstCol(src_label_id, dst_label_id, label_id);
Expand Down Expand Up @@ -558,7 +570,11 @@ void CSVFragmentLoader::fillEdgeReaderMeta(
schema_.get_edge_property_names(src_label_id, dst_label_id, label_id);
for (size_t i = 0; i < edge_prop_names.size(); ++i) {
auto property_name = edge_prop_names[i];
included_col_names.emplace_back(property_name);
if (loading_config_.GetHasHeaderRow()) {
included_col_names.emplace_back(property_name);
} else {
included_col_names.emplace_back(read_options.column_names[i + 2]);
}
mapped_property_names.emplace_back(property_name);
}
} else {
Expand All @@ -581,7 +597,11 @@ void CSVFragmentLoader::fillEdgeReaderMeta(
<< read_options.column_names[col_id];
}
}
included_col_names.emplace_back(col_name);
if (loading_config_.GetHasHeaderRow()) {
included_col_names.emplace_back(col_name);
} else {
included_col_names.emplace_back(read_options.column_names[col_id]);
}
mapped_property_names.emplace_back(property_name);
}
}
Expand Down
8 changes: 8 additions & 0 deletions flex/storages/rt_mutable_graph/loading_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,10 @@ Status parse_bulk_load_config_yaml(const YAML::Node& root, const Schema& schema,
auto block_size = parse_block_size(block_size_str);
load_config.metadata_[reader_options::BATCH_SIZE_KEY] =
std::to_string(block_size);
} else if (key == reader_options::NULL_VALUES) {
// special case for null values
auto null_values = it->second.as<std::vector<std::string>>();
load_config.null_values_ = null_values;
} else {
load_config.metadata_[key] = it->second.as<std::string>();
}
Expand Down Expand Up @@ -844,6 +848,10 @@ bool LoadingConfig::GetIsDoubleQuoting() const {
return str == "true" || str == "True" || str == "TRUE";
}

const std::vector<std::string>& LoadingConfig::GetNullValues() const {
return null_values_;
}

int32_t LoadingConfig::GetBatchSize() const {
if (metadata_.find(reader_options::BATCH_SIZE_KEY) == metadata_.end()) {
return reader_options::DEFAULT_BLOCK_SIZE;
Expand Down
6 changes: 5 additions & 1 deletion flex/storages/rt_mutable_graph/loading_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ static const char* BATCH_SIZE_KEY = "batch_size";
// whether or not to use record batch reader. If true, the reader will read
// data in batches, otherwise, the reader will read data row by row.
static const char* BATCH_READER = "batch_reader";
static const char* NULL_VALUES = "null_values";

static const std::unordered_set<std::string> CSV_META_KEY_WORDS = {
DELIMITER, HEADER_ROW, INCLUDE_COLUMNS, COLUMN_TYPES,
ESCAPING, ESCAPE_CHAR, QUOTING, QUOTE_CHAR,
DOUBLE_QUOTE, BATCH_SIZE_KEY, BATCH_READER};
DOUBLE_QUOTE, BATCH_SIZE_KEY, BATCH_READER, NULL_VALUES};

} // namespace reader_options

Expand Down Expand Up @@ -131,6 +132,7 @@ class LoadingConfig {
char GetQuotingChar() const;
bool GetIsQuoting() const;
bool GetIsDoubleQuoting() const;
const std::vector<std::string>& GetNullValues() const;
int32_t GetBatchSize() const;
bool GetIsBatchReader() const;
std::string GetMetaData(const std::string& key) const;
Expand Down Expand Up @@ -179,6 +181,8 @@ class LoadingConfig {
bool build_csr_in_mem_; // Whether to build csr in memory
bool use_mmap_vector_; // Whether to use mmap vector

std::vector<std::string> null_values_;

// meta_data, stores all the meta info about loading
std::unordered_map<std::string, std::string> metadata_;

Expand Down

0 comments on commit 4a69336

Please sign in to comment.