Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Port to new SDK SQL classes #1

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

aaronsteers
Copy link

@aaronsteers aaronsteers commented Dec 17, 2021

@rabidaudio - I wanted to use your snowflake target project as a test of the the new SDK interfaces. This PR is the result of that work, still in progress...

Summary:

  • The migration functions and the connection functions all now live in the SnowflakeConnector class (subclass of SQLConnector).
  • Flattening is not built yet.
  • The actual upload of files is not tested at all but I put in some generic (theoretically viable) Snowflake SQL for the "PUT" and "COPY" functions. (My sample code uses table stages for now, but really you could change this to anything and/or support multiple options.)
  • Since most types are ANSI compatible, we only have to override the Snowflake-specific ones: ARRAY, VARIANT and TIMESTAMP_TZ in to_sql_type(). The other types are handled by the SDK base class.

Related SDK MR (WIP): https://gitlab.com/meltano/sdk/-/merge_requests/200

@aaronsteers aaronsteers marked this pull request as draft December 17, 2021 17:51
The connection url as a string.
"""
conf = config["snowflake"]
return URL(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snowflake automatically adds snowflakecomputing.com to the hostname. This seems like an easy mistake to make, so maybe doing something like this here would be helpful (but in a way backwards compatible with python<3.9):

conf["account"].removesuffix("snowflakecomputing.com")

Also, I find python packages' habits of explicitly defining every connection parameter instead of just taking a connection URL annoying. I might switch the config to simply config['snowflake_url']

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some users may have a harder time creating their own snowflake_url, and there may also be some params we want to inject into that url at runtime. It might make a good option to provide as an override of the piecemeal approach but I think providing them individually may still be helpful for certain cases and users.


@property
def table_name(self) -> str:
return self.stream_name.split("-")[-1].upper()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why split on - and take the last, as opposed to say, replace('-', '_')? That seems like a potential for collisions (e.g. streams users-vehicles and vehicles).

Copy link
Author

@aaronsteers aaronsteers Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabidaudio - I'm just replicating what I've seen with other connectors. Frankly, I'm not sure this is the right approach per se, except that we should expect getting stream names like db_name-schema_name-table_name as well as stream names like table_name by itself.

What I was thinking with this implementation is that a separate schema_name property uses the other parts of the stream name to determine the fully qualified target table name, but probably this is something for which we may need to open up some amount of configurability.

Please feel free to suggest patterns you see working well in other targets like target-snowflake. As long as we have a reasonable default, I think we can be free to add expanded capabilities in future.

self.table_schema = target.table_schema
def prepare_sink(self) -> None:
self.connector.connection.execute(
'CREATE SCHEMA IF NOT EXISTS "{self.schema_name}"'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much this. It's so annoying that the other targets expect the schema to be created out-of-band. 👍

"""Loads all files from the table's built-in stage."""
self.connector.connection.execute(
f"COPY INTO {full_table_name} "
f"FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = '\\t' SKIP_HEADER = 1);"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formats can be a bit tricky, esp with null handling (as I learned the hard way).

I'll test these and see if there's any issues

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if the stream has primary keys we'll need to do the more complicated MERGE operation:
https://github.com/transferwise/pipelinewise-target-snowflake/blob/master/target_snowflake/file_formats/csv.py#L37

Copy link
Author

@aaronsteers aaronsteers Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - The MERGE operation I admittedly have not tackled yet. In theory, there's a connector method SQLConnector.merge_upsert_from_table(target_table_name: str, from_table_name: str, join_keys: List[str]) created for this purpose but I haven't fully tested or implemented a solution with that method yet. (Nothing references it as of yet.)

The default implementation for tables with primary keys would (in theory) be something like:

If inserting directly (no PKs or deduping+staging is disabled):

  1. run SQLConnector.bulk_insert_records() to load the main table.

If staging and deduping (such as when PKs are defined):

  1. (optionally cleanup from past failed runs)
  2. run SQLConnector.bulk_insert_records() to load a temp table.
  3. run some means of deduping the temp table according to primary key (possibly as part of the below method).
  4. run SQLConnector.merge_upsert_from_table() to merge the temp table into the main table.
  5. (in a finally block:) cleanup the temp table.

def _upload_csv_to_table_stage(self, file_path: str, full_table_name: str) -> None:
"""Upload data to the table's built-in stage."""
self.connector.connection.execute(
f"PUT file://{file_path} @%{full_table_name};"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of having the target CREATE STAGE IF NOT EXISTS it's own internal stage, but I suppose there's nothing wrong with table stages instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabidaudio - Yeah, I am on the same page with you. Creating the stage is okay also - but it opens a new can of worms regarding the user providing a valid fully-qualified stage name, or if the name is auto-generated, can we reasonably find a DB and SCHEMA where we have CREATE access.

I felt like using table stages was a reasonable "config-less" default.

result = result.replace(replacement_pattern, val)

if self.config.get("output_path_prefix", None) is not None:
result = f"{self.config['output_path_prefix']}{result}"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to still have the tempfile path in between the prefix and filename, right? Is that a mistake?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very likely this was an oversight on my side. Feel free to fix/tweak.

self.connector.prepare_table(
self.full_table_name, schema=self.schema, primary_keys=self.key_properties
)
self.stage = self.stage_class(self)

@property
def max_size(self):
return self.config["batch_size_rows"]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine for now, but Snowflake suggests for optimal loading you break your files into 100-250MB compressed. Allowing this would require some significant changes to BatchSink I think.

Copy link
Author

@aaronsteers aaronsteers Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabidaudio - This shouldn't (in theory) be too difficult. Instead of overriding max_size (which is a row count), we can override is_full().

The default is_full implementation is:

@property
def is_full(self) -> bool:
    """Check against size limit.

    Returns:
        True if the sink needs to be drained.
    """
    return self.current_size >= self.max_size

So, if there were a way to estimate the file size (as opposed to row counts), you could put that logic here.

Or - alternatively, it looks like you could change the meaning of current_size and max_size to be an (integer) bytes calculation rather than a number of rows. Then your current_size property would just need to report back a size estimate instead of returning _self.batch_records_read. If we went this path, we'd probably want to add something into the SDK docstrings like:

By default current_size and max_size refer to the number of records in the batch but this can be overriden by the developer to be bytes, MB, compressed bytes, etc.

# (optional) archive the file
self._write_csv(output_file, records)
self._upload_csv_to_table_stage(output_file, full_table_name)
self._load_csv_files_from_table_stage(full_table_name)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I wanted to try and work into this target was smarter handling of connections. Snowflake warehouses can be spun down after a period of inactivity, but only when all the connections are closed. If you spend most of your time writing to a local CSV file, you're paying to keep that server running for no reason.

Another thing I ran into was connection timeouts. The Snowflake connection times out after a period of 4 hours. I had a tap for an API with very aggressive throttling regularly time out writing until we lowered the batch size to something tiny. Ideally We'd connect early and set up all the table schemas, start loading the data into CSVs letting the connection close after a few minutes of inactivity, and reconnecting when it's time to run PUT/COPY INTO(/ACTIVATE_VERSION)

@rabidaudio
Copy link
Owner

@aaronsteers Thanks!! I read through the SDK side too and I think generally it makes sense. Looking at it I definitely prefer the idea of letting SQLAlchemy handle normalizing dialects over targets maintaining their own SQL logic.

I left some thoughts, mostly notes for myself. I'm going to rebase this and play with it and let you know if I find anything else.

@property
def schema_name(self) -> Optional[str]:
"""Return the target schema name."""
return self.config["snowflake"]["schema"].upper()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to snowflake-sqlalchemy we should probably actually use lower-case for these:

https://github.com/snowflakedb/snowflake-sqlalchemy/tree/9118cf8f18a0039f9cb5d3892ff2b1e5c82a05e0#object-name-case-handling

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabidaudio - Good catch! 👍

@aaronsteers
Copy link
Author

@aaronsteers Thanks!! I read through the SDK side too and I think generally it makes sense. Looking at it I definitely prefer the idea of letting SQLAlchemy handle normalizing dialects over targets maintaining their own SQL logic.

I left some thoughts, mostly notes for myself. I'm going to rebase this and play with it and let you know if I find anything else.

Thanks so much! This feedback is super helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants