-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Configurable batch size and max wait limit for targets #1876
base: main
Are you sure you want to change the base?
feat: Configurable batch size and max wait limit for targets #1876
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #1876 +/- ##
==========================================
+ Coverage 88.47% 88.69% +0.21%
==========================================
Files 54 55 +1
Lines 4721 4857 +136
Branches 919 952 +33
==========================================
+ Hits 4177 4308 +131
- Misses 383 385 +2
- Partials 161 164 +3 ☔ View full report in Codecov by Sentry. |
…gurable-batch-size-and-max-wait-limit-for-targets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this @BuzzCutNorman!
I think this
Lines 128 to 135 in f6bbf0c
@property | |
def is_full(self) -> bool: | |
"""Check against size limit. | |
Returns: | |
True if the sink needs to be drained. | |
""" | |
return self.current_size >= self.max_size |
has to be updated to take the timer into account, wdyt?
singer_sdk/sinks/sql.py
Outdated
# Finish Line for max_size perf counter | ||
if self.sink_timer is not None: | ||
if self.sink_timer.start_time is not None: | ||
self.sink_timer.stop() | ||
self.batch_size_rows = self.sink_timer.counter_based_max_size() | ||
|
||
# Starting Line for max_size perf counter | ||
self.sink_timer.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt about moving this up the stack? Perhaps to
Line 498 in f6bbf0c
sink.process_batch(draining_status) |
That way all targets, not just SQL, benefit from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. The code has been moved up. Please let me know if this matches up with what you had in mind.
Co-authored-by: Edgar R. M. <[email protected]>
@edgarrmondragon I tried to account for this in the following way. I reflect the timer batch size row changes in
Then have
The timer then updates
|
@edgarrmondragon I might be overselling what the option is accomplishing. When I look at the steps in the
The |
…gets' of https://github.com/BuzzCutNorman/sdk into 1626-configurable-batch-size-and-max-wait-limit-for-targets
@edgarrmondragon if you would please review this PR again when you get a chance. |
When the Meltano Target configuration option `batch_dynamic_management` is set to `True` you are asking the `Sink.sink_timer` to find the maximum rows is full mark that keeps the time to fill a batch with records and write those records to the Target's target within the time in seconds given. | ||
|
||
The `Sink.sink_timer` is passed the given `batch_size_rows` or the `DEFAULT_MAX_SIZE` constant which is `10000` if it is `None` and is also passed the given `batch_wait_limit_seconds` if present or the `WAIT_LIMIT_SECONDS_DEFAULT` constant which is `30` if it is `None`. Internally the `rows` passed turns into `Sink.sink_timer.SINK_MAX_SIZE_CEILING` which is the max size a batch can reach. The `time` in `seconds` passed turns into `Sink.sink_timer.max_perf_counter` which is the time in seconds a full cycle should take. The attribute `Sink.sink_timer.sink_max_size` starts at a predefined size of `100`. During the `Target.drain_one(Sink)` process `Sink._lap_manager` is called and the timer method `counter_based_max_size` runs and checks if `Sink.sink_timer.perf_diff`, which is `max_perf_counter` - `lap_time`, is greater than `Sink.sink_timer.perf_diff_allowed_max` or less than `Sink.sink_timer.perf_diff_allowed_min`. If `Sink.sink_timer.perf_diff` is greater than `Sink.sink_timer.perf_diff_allowed_max` the `Sink.sink_timer.sink_max_size` is increased as long as the `Sink.sink_timer.sink_max_size` is less than `Sink.sink_timer.SINK_MAX_SIZE_CEILING`. If `Sink.sink_timer.perf_diff` is less than `Sink.sink_timer.perf_diff_allowed_min` the `Sink.sink_timer.sink_max_size` is reduced. If the `Sink.sink_timer.perf_diff` is between `Sink.sink_timer.perf_diff_allowed_max` and `Sink.sink_timer.perf_diff_allowed_min` no correction to `Sink.sink_timer.sink_max_size` is made since the optimal rows size has been reached. This process is repeated when each `Sink` is initialized and starts processing records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still finding this a bit confusing. I think we want the max batch size to have a default value, and the max wait time to default to None
so that in the default case, only the current batch size is checked.
Essentially:
flowchart TD
A[Process next row] --> B{current_size ><br/>max_size?}:::q
B -- Yes --> C[Drain]:::drain
B -- No ---> D{is max_wait_time<br/>set?}:::q
D -- No ---> A
D -- Yes --> F{elapsed ><br/>max_wait_time?}:::q
F -- No ---> A
F -- Yes --> C
C --> G[Reset counter<br/>and timer]
G --> A
classDef q fill:#FFE666
classDef drain fill:#FF6680
Does it make sense? I'm sorry, I feel like I'm missing something 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diagram makes sense. I will need to work through it again tomorrow. I think you feeling like you are missing something is a result of my poor writing. Below is the best flowchart for batch_dynamic_management
I could make.
flowchart TD
C1-->A2
D2-->A3
E3-->E2
C3-->A4
G4-->D3
F2-->A1
subgraph Target._process_record_message
A1[Process next row] --> B1{is_full<p>_drain_function calls is_full_dynamic<p>sink.current_size > = sink_timer.sink_max_size}
B1-->|True| C1[drain]
B1-->|False| A1
end
subgraph Target.drain_one
A2([start])-->B2[sink.start_drain]
B2-->C2[sink.process_batch]
C2-->D2[sink._lap_manager]
D2~~~E2[sink.mark_drained]
E2-->F2([end])
end
subgraph Sink.lap_manager
A3([start])-->B3[sink_timer.stop]
B3-->C3[sink_timer.counter_based_max_size]
C3~~~D3[sink_timer.start]
D3-->E3([end])
end
subgraph sink_timer. counter_based_max_size
A4([start])-->B4{perf_diff < self.perf_diff_allowed_min}
B4-->|True| C4[correction = decrease amount]
B4-->D4{perf_diff >= perf_diff_allowed_max<p>and<p>sink_max_size < SINK_MAX_SIZE_CEILING}
D4-->|True| E4[correction = increase amount]
D4-->F4[sink_max_size += correction]
F4-->G4([end])
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the default case,
This would be when batch_dynamic_management
is set to True
in the target config and batch_size_rows
and batch_wait_limit_seconds
are not preset in the target config?
or
Are you meaning default as in none of the three target config settings batch_size_rows
, batch_wait_limit_seconds
, and batch_dynamic_management
are present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgarrmondragon When you have a free moment would you please provide me clarification on what you see the "default case" to be. 🙏😃
Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
…-configurable-batch-size-and-max-wait-limit-for-targets
CodSpeed Performance ReportMerging #1876 will not alter performanceComparing Summary
|
…-configurable-batch-size-and-max-wait-limit-for-targets
…-configurable-batch-size-and-max-wait-limit-for-targets
…-configurable-batch-size-and-max-wait-limit-for-targets
This is an attempt at implementing the batch size and wait limit for targets as outlined.
Closes #1626
📚 Documentation preview 📚: https://meltano-sdk--1876.org.readthedocs.build/en/1876/