Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1891] Create selftuning buffered ORC writer #3751

Merged
merged 19 commits into from
Sep 1, 2023

Conversation

Will-Lo
Copy link
Contributor

@Will-Lo Will-Lo commented Aug 25, 2023

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

The current ORCWriter which converts Avro to ORC frequently runs into OOM issues on large schemas. This is theorized to be partially due to the way that the converter allocates memory for large lists and maps, it uses a resize algorithm that multiplies the last array size by 3. This can lead to a lot of extra space, along with the large records already stored within the buffer and the file writer, will cause memory issues.

This PR introduces a few components/ideas to manage memory:

  1. Have the converter also estimate the size of each record since it needs to traverse through the record in order to perform the conversion.
  2. The internal buffer of the GobblinBaseOrcWriter should account for the memory available in the JVM (which is available through the Java runtime APIs) minus the size of the records that can be stored in the underlying file writer and the size of the Avro to ORC converter due to resizes. It should then divide this number by the average size of a record
  3. There was a conscious decision to not re-initialize the underlying ORCWriter every time a tune is performed because it would have to create a new file which can lead to a large number of files in the end. Since there is a compression done when rows are added to this writer, it should generally perform well enough* if it is tuned during each writer initialization at the beginning, in Fast Ingest this occurs every 5 minutes
  4. Average record size and the size allocated to the converter is stored in the Gobblin state and every writer initialization will use the previous run's calculations instead of slowly tuning up.

Future work/ Improvements around this:

  1. Expose more knobs around configurability once we can benchmark performance.
  2. Refactor all the resizing of the converter buffer into the memory manager, then implement a fix for smartResize algorithm which should effectively be a bounded exponential decay function, so that records that are marginally larger than the previous calculated records do not cause the converter to balloon out of control. This should improve performance.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Unit tests around memory manager
    Tested with Kafka ingestion pipelines

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

codecov-commenter commented Aug 25, 2023

Codecov Report

Merging #3751 (eb6435c) into master (7dbf7b6) will increase coverage by 0.07%.
Report is 14 commits behind head on master.
The diff coverage is 72.36%.

@@             Coverage Diff              @@
##             master    #3751      +/-   ##
============================================
+ Coverage     47.08%   47.15%   +0.07%     
- Complexity    10864    10895      +31     
============================================
  Files          2147     2148       +1     
  Lines         84825    85032     +207     
  Branches       9412     9440      +28     
============================================
+ Hits          39936    40095     +159     
- Misses        41264    41294      +30     
- Partials       3625     3643      +18     
Files Changed Coverage Δ
.../gobblin/writer/GenericRecordToOrcValueWriter.java 67.50% <67.64%> (+7.28%) ⬆️
...rg/apache/gobblin/writer/GobblinBaseOrcWriter.java 72.22% <73.49%> (+1.53%) ⬆️
...ache/gobblin/writer/OrcConverterMemoryManager.java 73.52% <73.52%> (ø)
...g/apache/gobblin/writer/PartitionedDataWriter.java 64.95% <100.00%> (+0.16%) ⬆️

... and 37 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass and some early feedback

@Will-Lo Will-Lo marked this pull request as ready for review August 29, 2023 17:04
@Will-Lo Will-Lo force-pushed the create-selftuning-orc-writer branch from d2fc245 to f9c0fde Compare August 29, 2023 17:53
@Will-Lo Will-Lo changed the title [DRAFT] Create selftuning buffered ORC writer [GOBBLIN-1891] Create selftuning buffered ORC writer Aug 29, 2023
if (this.orcFileWriter == null) {
initializeOrcFileWriter();
}
this.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to flush when we just initialize the writer?

Copy link
Contributor Author

@Will-Lo Will-Lo Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the rowbatch is empty then it will ignore flushing, mostly just because I reuse the tuneBatchSize both during first initialization and every tune frequency

this.flush();
this.rowBatch.ensureSize(this.batchSize);
}
if (this.orcFileWriter == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that because I don't lazy init the writer during flush, I need to initialize the writer during the first tune. I need to initialize before the first flush essentially, maybe this makes more sense in the flush function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I guess you can remove line 280-282 and in change 286 to
if (this.orcFileWriter == null) {
initializeOrcFileWriter();
} else{
this.flush();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to handle every edge case (in scenarios where low vol writers get closed before first tune) will always lazy init instead inside of the flush function, this way won't need to dupe the logic.

long converterBufferColSize = 0;
if (col instanceof ListColumnVector) {
ListColumnVector listColumnVector = (ListColumnVector) col;
converterBufferColSize += listColumnVector.child.isNull.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have some question here, likely I miss understand something:

  1. Is listColumnVector.child.isNull.length means the length of the current list? I'm confused by the "child" here
  2. on line 52, why it's + by not *?
  3. Seems like you try to get the length here, but don't we interested in the memory size here?

Copy link
Contributor Author

@Will-Lo Will-Lo Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It should be the length of the current list, child refers to the element within the columnVector. Since this is columnar need to think of the child schema, how many of that child is within this column.
  2. Ah good point, I guess this will catch scenarios of a list within a list, will modify. Originally I thought we had to address (2) but the columnar nature of ORC means that you just need to ensure that only that particular column's child (being the element in the array) can hold all the items in the array's length. So addition is fine here.
  3. I wanted to measure the space filled by the resizes, which is primarily represented by an array size increase of a boolean array. I'm not sure if Java measures null in an array with the size of the expected object * length. Edited: Okay after looking into the library implementation most of the primitive ORC types use a null value that maps to a default java primitive type, so there is a size associated which can be roughly estimated. Will use that as the benchmark and it should lead to improvements in accuracy

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one small comment

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 thanks for the work here!

@Will-Lo Will-Lo merged commit a0787aa into apache:master Sep 1, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants