Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Initialize max_replication_key_value via SELECT max(<replication_key>) ... before starting a native BATCH sync #976

Open
Tracked by #1
kgpayne opened this issue Sep 16, 2022 · 20 comments · May be fixed by #1894
Assignees
Labels

Comments

@kgpayne
Copy link
Contributor

kgpayne commented Sep 16, 2022

UPDATE (from @aaronsteers) per #976 (comment):

Since this is high urgency, I took an initial pass over the break here on tap-snowflake:

That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.

Details

Singer SDK Version

0.10.0

Python Version

3.8

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

macOS

Description

When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp() and Stream.get_starting_replication_key_value() methods.

I believe this is because of missing setup of a starting_replication_value, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records() and Stream._sync_batches() methods on singer_sdk.streams.core.Stream (snippets below). I think the critical missing call is self._write_starting_replication_value(current_context).

https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034

Code

def _sync_records(
        self,
        context: dict | None = None,
        write_messages: bool = True,
    ) -> Generator[dict, Any, Any]:
        """Sync records, emitting RECORD and STATE messages.

        Args:
            context: Stream partition or context dictionary.
            write_messages: Whether to write Singer messages to stdout.

        Raises:
            InvalidStreamSortException: TODO

        Yields:
            Each record from the source.
        """
        record_count = 0
        current_context: dict | None
        context_list: list[dict] | None
        context_list = [context] if context is not None else self.partitions
        selected = self.selected

        for current_context in context_list or [{}]:
            partition_record_count = 0
            current_context = current_context or None
            state = self.get_context_state(current_context)
            state_partition_context = self._get_state_partition_context(current_context)
            self._write_starting_replication_value(current_context)
            child_context: dict | None = (
                None if current_context is None else copy.copy(current_context)
            )

            for record_result in self.get_records(current_context):
              ...
    def _sync_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
    ) -> None:
        """Sync batches, emitting BATCH messages.

        Args:
            batch_config: The batch configuration.
            context: Stream partition or context dictionary.
        """
        for encoding, manifest in self.get_batches(batch_config, context):
            self._write_batch_message(encoding=encoding, manifest=manifest)
            self._write_state_message()
@kgpayne kgpayne added kind/Bug Something isn't working valuestream/SDK labels Sep 16, 2022
@kgpayne
Copy link
Contributor Author

kgpayne commented Sep 16, 2022

@edgarrmondragon I guess incremental replication with BATCH is still an open question, but it seems to me that this is worth adding for cases when i) users first enable BATCH having run incrementally previously or ii) self-manage a state.json (as I have done for testing). It is so close to 'just working', at least on the select side 😅

@kgpayne kgpayne changed the title [Bug]: _sync_batch does not handle STATE [Bug]: _sync_batch does not handle STATE Sep 16, 2022
@edgarrmondragon
Copy link
Collaborator

Yeah, so the implementation of Stream._sync_batches is missing the following:

  • Initial state via Stream._write_starting_replication_value
  • State update via State._increment_stream_state(record, context=...

They should be called at some point, but the latter relies on record-by-record sync so it's not useful really...

@kgpayne
Copy link
Contributor Author

kgpayne commented Sep 16, 2022

Makes sense 🙂 In terms of the mechanism to retrieve the bookmark (because it cannot be automatically inferred from the batch itself without decompressing and reading the last line of the last file 😱), what do you think of allowing the user to return it? Something like:

for encoding, manifest, state in self.get_batches(batch_config, context):
    ...

where state is just the max replication key value for that given manifest (defaulting to None)? For tap-snowflake I would then start a transaction and retrieve the max value before executing the copy command 🚀

@aaronsteers
Copy link
Contributor

aaronsteers commented Sep 19, 2022

Note to self: @aaronsteers - reply here after some thoughts.

@kgpayne - thanks for raising this! @edgarrmondragon and I spoke about it today in our regular 1:1.

What about this approach?

  1. Run a "min/max query" before starting the batch.
    • Before extracting a batch, we use generic SQLAlchemy methods to collect the starting_replication_key_value and ending_replication_key_value.
    • Non-SQL implementations would need their own way of initializing these values, but presumably all SQL-based implementations could use something built into the SDK, such as:
      SELECT 
          min({self.replication_key}) AS starting_replication_key_value,
          max({self.replication_key}) AS ending_replication_key_value
      FROM {self.table_name} AS source_table
      WHERE {self.replication_key} >= {self.get_starting_replication_key_value()}
      
    • The reason of running before the batch executes is because running afterwards would in some cases cause a gap between the max value at time of unload versus the max value at time of post-query. That gap could and would cause data loss, which we would want to avoid.
  2. Use the min/max to limit the batch.
    • When extracting the batch, developers can optionally limit their output to be within the bounds of min/max (inclusive).
    • If records are not able to be limited by min or the max range, then some records may arrive more than once - which is acceptable according to spec.
      • This should be a small number of sources that have native BATCH support but no support for range filtering. Certainly not ideal, but we may want to consider just declaring that STATE isn't supported for those streams when run in BATCH mode, which would mean that basically BATCH on streams with no batch filtering capability would require replication mode of FULL_TABLE mode instead of INCREMENTAL.
  3. Send STATE messages as usual, but with output from the "min/max query".
    • The value of ending_replication_key_value would determine the STATE message content after the batch is transmitted.

Other notes:

@aaronsteers aaronsteers changed the title [Bug]: _sync_batch does not handle STATE Bug: _sync_batch does not handle STATE Sep 19, 2022
@aaronsteers aaronsteers changed the title Bug: _sync_batch does not handle STATE feat: Add STATE support for BATCH messages in SQLStreams Sep 19, 2022
@kgpayne
Copy link
Contributor Author

kgpayne commented Sep 26, 2022

@aaronsteers approach sounds good 👍 What are the next steps? Would you like me to work on this, or will we ship tap-snowflake without incremental support in batch mode for now?

@aaronsteers
Copy link
Contributor

aaronsteers commented Sep 26, 2022

@kgpayne - Let's ship batch messaging as soon as it's available on tap- and target-snowflake. We need the example implementation as soon as possible for community members to reference.

If we can put placeholders, at discussed, that would light up based on upstream SDK capability getting implemented, that would be ideal.

Code comments and a note in the readme, with cross-links to the logged issue(s), would be much appreciated.

@kgpayne kgpayne self-assigned this Oct 4, 2022
@kgpayne
Copy link
Contributor Author

kgpayne commented Oct 26, 2022

@aaronsteers based on your comment above, how does:

Other notes:

square with #1011? In my mind, they are the same feature - i.e. min, max and limit values are passed in the get_records/get_batches signatures, and in the incremental case the value of max is derived from a SQLAlchemy MAX(replication_key) 🤔 The tap would then i) only get records <= the max and ii) emit the max as its state value.

It would be good to get to a complete spec for tap-snowflake as it sounded from our 1:1 like having INCREMENTAL support sooner rather than later is preferable? Not least to set the overall pattern for anyone wanting to build their own Bigquery/Redshift etc. with incremental support 😅

@edgarrmondragon FYI

@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 26, 2022

@kgpayne - Following from our past conversations, I logged a new proposal that would suggests a StreamFilter object that can be passed to get_records(), get_batches(), get_url_params(), etc.

This would eliminate the need for the developer directly calling get_starting_replication_key_value(), get_replication_key_signpost(), get_starting_timestamp(). The developer can then also completely ignore self.replication_method and self.replication_key, since all constraints are precalculated and baked into the filter object that is sent to the method.

This also bakes in the max_record_limit constraint discussed elsewhere.

cc @edgarrmondragon

@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 27, 2022

@kgpayne and @edgarrmondragon - The above discussion regarding how to communicate constraints to get_records() and get_batches() are important, but I don't think they necessarily should be a blocker here.

The critical path item here, I think, is to something like the following to capture the max_replication_key_value even while not touching any records ourselves:

  1. Before the SDK calls SQLStream.get_batches(), we run using SQL Alchemy the equivalent of SELECT MIN(replication_key), MAX(replication_key) FROM source_table.
    • The min value has no function as of now, so we can omit it if we want from the query.
  2. We set the internal signpost value to the max replication key value.
    • This has no critical function, and is purely cosmetic for our purposes. It does not filter records - unlike we previously were thinking, because it is unsafe for us to do so without ensuring we then do another 'catchup sync' afterwards.
  3. After the batch messages are sent, the same signpost value (MAX(replication_key) from our original query) should be set as the max_replication_key_value - and then would be sent automatically in the subsequent STATE message for that stream.
  4. On subsequent executions: any implementation of SQLStream.get_batches() that consults either get_starting_replication_key_value() or the new/proposed StreamFilter argument will immediately get incremental support.

Does this sound right?

@kgpayne - If this sounds reasonable, feel free to pick up as you have availability. Delivery sooner will mean less rework for those who build on the tap-snowflake example over the coming weeks.

@aaronsteers aaronsteers changed the title feat: Add STATE support for BATCH messages in SQLStreams fix: Initialize max_replication_key_value via SELECT max(<replication_key>) ... before starting BATCH sync Dec 30, 2022
@aaronsteers aaronsteers changed the title fix: Initialize max_replication_key_value via SELECT max(<replication_key>) ... before starting BATCH sync fix: Initialize max_replication_key_value via SELECT max(<replication_key>) ... before starting a native BATCH sync Dec 30, 2022
@aaronsteers
Copy link
Contributor

@kgpayne - Do I remember correctly that you had this fixed, either in SDK or in the Snowflake implementation?

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 16, 2023

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 17, 2023

@aaronsteers I think you might be thinking of this PR 🙂:

@kgpayne Thanks. I didn't realize until now that you'd added the commits into the existing PR. Definitely we should try to merge that and get into the SDK. I'll see if I can get is merged while you are out next week, and maybe Derek can assist with adapting to the SDK.

@kgpayne
Copy link
Contributor Author

kgpayne commented Apr 6, 2023

As the fix (linked above) has now merged into tap-snowflake, next step is to port to the SDK.

@luisvicenteatprima
Copy link

What's the progress of this?

@tayloramurphy
Copy link
Collaborator

ping @kgpayne since this is assigned to you. I've added it to the SDK v1 issue as well.

@kgpayne
Copy link
Contributor Author

kgpayne commented Jul 5, 2023

@tayloramurphy @luisvicenteatprima this is implemented in tap-snowflake but not yet ported to the SDK. Its marked as v1, so should hopefully be picked up in this cadence 👍

@edgarrmondragon FYI this was one of my next up which would be good to hand over 🐣

@kgpayne kgpayne removed their assignment Jul 5, 2023
@edgarrmondragon edgarrmondragon self-assigned this Jul 5, 2023
@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Aug 1, 2023

This is on my TODO list for the week.

Update: PR in #1894

@luisvicenteatprima
Copy link

Any progress on this? There haven't been any updates in the PR for almost 2 months.

@luisvicenteatprima
Copy link

@edgarrmondragon

@cooley-hi
Copy link

Bumping to see if there's any progress on this? Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants