-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-32566: [C++] Connect parquet to the new scan node #35889
Conversation
This is very much still a draft. There are still a lot of tests to add and some TODOs (column projection and row filtering) but I don't expect the overall structure to change too much if anyone wanted to take an early look. |
On the bright side, I can now reach max parallelism with about 3GB of RAM, regardless of the size of row groups (and performance looks to be about 10% better but still very early to say that) |
… considerably. Now each fragment contains one or more scan tasks. Each scan task can yield a stream of batches. So, CSV, for example, is a single scan task that covers the entire file. Parquet, on the other hand, has a scan task per row group. This also makes explicit a lot of the logic that was implicit around sequencing and trying to figure out the correct batch index.
a9659e7
to
e687c70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this would prevent from use_thread
deadlock?
cpp/src/parquet/arrow/reader.cc
Outdated
@@ -16,10 +16,12 @@ | |||
// under the License. | |||
|
|||
#include "parquet/arrow/reader.h" | |||
#include <sys/types.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we real need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, thank you for noticing. I will clean this up soon.
} | ||
if (!first) { | ||
// TODO(weston): Test this case | ||
return Status::Invalid("Unexpected empty row group"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm I guess a RowGroup can be empty currently. You can easily generate a case like this using python write_table
table.len() == 10000
write_table(table, 2000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this create a table with 5 row groups? Why would this be an empty row group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status WriteTable(const Table& table, int64_t chunk_size) override {
RETURN_NOT_OK(table.Validate());
if (chunk_size <= 0 && table.num_rows() > 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (!table.schema()->Equals(*schema_, false)) {
return Status::Invalid("table schema does not match this writer's. table:'",
table.schema()->ToString(), "' this:'", schema_->ToString(),
"'");
} else if (chunk_size > this->properties().max_row_group_length()) {
chunk_size = this->properties().max_row_group_length();
}
auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
}
return Status::OK();
};
if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
}
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
PARQUET_IGNORE_NOT_OK(Close()));
}
return Status::OK();
}
It's from this code. It's easy to flush the rowgroup that row_num == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I manually created some empty row groups. It turns this branch should be unreachable because, further up, we will have noticed that "remaining rows" is 0 and returned an end marker. I've updated this code and added a test case for this scenario in #36779
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, great!
Yes. Since this method is async then the caller can choose not to block. Previously we used |
I think the changes here are probably too extensive to expect review. I will be breaking this PR up into multiple PRs.
|
The first two of these PRs is now available: |
The second PR is now available. Once the two pre-reqs merge I will undraft this. |
Rationale for this change
This ended up being considerably more change than just connecting parquet to the new scan node. In order to do this I had to refactor the scan node itself somewhat. It introduces the concept of scan tasks (or maybe scan streams would be a better name) to help clarify the concept of a row group (which I didn't have to worry about with CSV). I also introduced the staging area which is a slightly different approach to sequencing that I think will be much simpler.
What changes are included in this PR?
The new scan node now supports the parquet format.
Are these changes tested?
Yes
Are there any user-facing changes?
There are breaking changes to the scan2 node but this feature hasn't really been released yet.