-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: Port to new SDK SQL classes #1
base: main
Are you sure you want to change the base?
Conversation
The connection url as a string. | ||
""" | ||
conf = config["snowflake"] | ||
return URL( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snowflake automatically adds snowflakecomputing.com
to the hostname. This seems like an easy mistake to make, so maybe doing something like this here would be helpful (but in a way backwards compatible with python<3.9):
conf["account"].removesuffix("snowflakecomputing.com")
Also, I find python packages' habits of explicitly defining every connection parameter instead of just taking a connection URL annoying. I might switch the config to simply config['snowflake_url']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some users may have a harder time creating their own snowflake_url, and there may also be some params we want to inject into that url at runtime. It might make a good option to provide as an override of the piecemeal approach but I think providing them individually may still be helpful for certain cases and users.
|
||
@property | ||
def table_name(self) -> str: | ||
return self.stream_name.split("-")[-1].upper() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why split on -
and take the last, as opposed to say, replace('-', '_')
? That seems like a potential for collisions (e.g. streams users-vehicles
and vehicles
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rabidaudio - I'm just replicating what I've seen with other connectors. Frankly, I'm not sure this is the right approach per se, except that we should expect getting stream names like db_name-schema_name-table_name
as well as stream names like table_name
by itself.
What I was thinking with this implementation is that a separate schema_name
property uses the other parts of the stream name to determine the fully qualified target table name, but probably this is something for which we may need to open up some amount of configurability.
Please feel free to suggest patterns you see working well in other targets like target-snowflake. As long as we have a reasonable default, I think we can be free to add expanded capabilities in future.
self.table_schema = target.table_schema | ||
def prepare_sink(self) -> None: | ||
self.connector.connection.execute( | ||
'CREATE SCHEMA IF NOT EXISTS "{self.schema_name}"' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So much this. It's so annoying that the other targets expect the schema to be created out-of-band. 👍
"""Loads all files from the table's built-in stage.""" | ||
self.connector.connection.execute( | ||
f"COPY INTO {full_table_name} " | ||
f"FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = '\\t' SKIP_HEADER = 1);" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formats can be a bit tricky, esp with null handling (as I learned the hard way).
I'll test these and see if there's any issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if the stream has primary keys we'll need to do the more complicated MERGE operation:
https://github.com/transferwise/pipelinewise-target-snowflake/blob/master/target_snowflake/file_formats/csv.py#L37
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - The MERGE operation I admittedly have not tackled yet. In theory, there's a connector method SQLConnector.merge_upsert_from_table(target_table_name: str, from_table_name: str, join_keys: List[str])
created for this purpose but I haven't fully tested or implemented a solution with that method yet. (Nothing references it as of yet.)
The default implementation for tables with primary keys would (in theory) be something like:
If inserting directly (no PKs or deduping+staging is disabled):
- run
SQLConnector.bulk_insert_records()
to load the main table.
If staging and deduping (such as when PKs are defined):
- (optionally cleanup from past failed runs)
- run
SQLConnector.bulk_insert_records()
to load a temp table. - run some means of deduping the temp table according to primary key (possibly as part of the below method).
- run
SQLConnector.merge_upsert_from_table()
to merge the temp table into the main table. - (in a
finally
block:) cleanup the temp table.
def _upload_csv_to_table_stage(self, file_path: str, full_table_name: str) -> None: | ||
"""Upload data to the table's built-in stage.""" | ||
self.connector.connection.execute( | ||
f"PUT file://{file_path} @%{full_table_name};" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of having the target CREATE STAGE IF NOT EXISTS
it's own internal stage, but I suppose there's nothing wrong with table stages instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rabidaudio - Yeah, I am on the same page with you. Creating the stage is okay also - but it opens a new can of worms regarding the user providing a valid fully-qualified stage name, or if the name is auto-generated, can we reasonably find a DB and SCHEMA where we have CREATE
access.
I felt like using table stages was a reasonable "config-less" default.
result = result.replace(replacement_pattern, val) | ||
|
||
if self.config.get("output_path_prefix", None) is not None: | ||
result = f"{self.config['output_path_prefix']}{result}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to still have the tempfile path in between the prefix and filename, right? Is that a mistake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very likely this was an oversight on my side. Feel free to fix/tweak.
self.connector.prepare_table( | ||
self.full_table_name, schema=self.schema, primary_keys=self.key_properties | ||
) | ||
self.stage = self.stage_class(self) | ||
|
||
@property | ||
def max_size(self): | ||
return self.config["batch_size_rows"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably fine for now, but Snowflake suggests for optimal loading you break your files into 100-250MB compressed. Allowing this would require some significant changes to BatchSink
I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rabidaudio - This shouldn't (in theory) be too difficult. Instead of overriding max_size (which is a row count), we can override is_full()
.
The default is_full implementation is:
@property def is_full(self) -> bool: """Check against size limit. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size
So, if there were a way to estimate the file size (as opposed to row counts), you could put that logic here.
Or - alternatively, it looks like you could change the meaning of current_size
and max_size
to be an (integer) bytes calculation rather than a number of rows. Then your current_size
property would just need to report back a size estimate instead of returning _self.batch_records_read
. If we went this path, we'd probably want to add something into the SDK docstrings like:
By default
current_size
andmax_size
refer to the number of records in the batch but this can be overriden by the developer to be bytes, MB, compressed bytes, etc.
# (optional) archive the file | ||
self._write_csv(output_file, records) | ||
self._upload_csv_to_table_stage(output_file, full_table_name) | ||
self._load_csv_files_from_table_stage(full_table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I wanted to try and work into this target was smarter handling of connections. Snowflake warehouses can be spun down after a period of inactivity, but only when all the connections are closed. If you spend most of your time writing to a local CSV file, you're paying to keep that server running for no reason.
Another thing I ran into was connection timeouts. The Snowflake connection times out after a period of 4 hours. I had a tap for an API with very aggressive throttling regularly time out writing until we lowered the batch size to something tiny. Ideally We'd connect early and set up all the table schemas, start loading the data into CSVs letting the connection close after a few minutes of inactivity, and reconnecting when it's time to run PUT
/COPY INTO
(/ACTIVATE_VERSION
)
@aaronsteers Thanks!! I read through the SDK side too and I think generally it makes sense. Looking at it I definitely prefer the idea of letting SQLAlchemy handle normalizing dialects over targets maintaining their own SQL logic. I left some thoughts, mostly notes for myself. I'm going to rebase this and play with it and let you know if I find anything else. |
@property | ||
def schema_name(self) -> Optional[str]: | ||
"""Return the target schema name.""" | ||
return self.config["snowflake"]["schema"].upper() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to snowflake-sqlalchemy
we should probably actually use lower-case for these:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rabidaudio - Good catch! 👍
Thanks so much! This feedback is super helpful. |
@rabidaudio - I wanted to use your snowflake target project as a test of the the new SDK interfaces. This PR is the result of that work, still in progress...
Summary:
ARRAY
,VARIANT
andTIMESTAMP_TZ
into_sql_type()
. The other types are handled by the SDK base class.Related SDK MR (WIP): https://gitlab.com/meltano/sdk/-/merge_requests/200