Skip to content

Commit

Permalink
[fix] UcToSnowflakeOperator multiple grantees and avoid timestamp ran…
Browse files Browse the repository at this point in the history
…ge error (#176)

* Enabled multiple grantees for UcToSnowflakeOperator

* Fix to avoid collect issues on invalid dates

* Moved strip for readability

* Fixed formatting

* Updated documentation for UcToSnowflakeOperator
  • Loading branch information
shinga authored Nov 13, 2024
1 parent 05134ef commit 3bfd80a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 31 deletions.
26 changes: 16 additions & 10 deletions brickflow_plugins/databricks/uc_to_snowflake_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ class UcToSnowflakeOperator(SnowflakeOperator):
sf_database (optional): database name in snowflake
sf_schema (required): snowflake schema in the database provided as part of scope
sf_table (required): name of the table in snowflake to which we want to append or overwrite
sf_grantee_roles (optional): downstream roles to which we want to grant access to the table, can be comma separated
incremental_filter (optional): mandatory parameter for incremental load type to delete existing data in snowflake table
dbx_data_filter (optional): parameter to filter databricks table if different from snowflake filter
sf_cluster_keys (optional): list of keys to cluster the data in snowflake
Expand Down Expand Up @@ -376,17 +377,22 @@ def get_sf_poststeps(self):
return queries

def get_sf_postgrants(self):
post_grantee_role = (
self.parameters["sf_grantee_roles"]
post_grantee_roles = (
[role.strip() for role in self.parameters["sf_grantee_roles"].split(",")]
if "sf_grantee_roles" in self.parameters.keys()
else self.role
)
queries = """ GRANT SELECT ON TABLE {sfDatabase}.{sfSchema}.{sfTable} TO ROLE {sfGrantee_roles};""".format(
sfSchema=self.parameters["sf_schema"],
sfTable=self.parameters["sf_table"],
sfGrantee_roles=post_grantee_role,
sfDatabase=self.sf_database,
else [self.role]
)

queries = ""
for role in post_grantee_roles:
query = """GRANT SELECT ON TABLE {sfDatabase}.{sfSchema}.{sfTable} TO ROLE {sfGrantee_role}; """.format(
sfSchema=self.parameters["sf_schema"],
sfTable=self.parameters["sf_table"],
sfGrantee_role=role,
sfDatabase=self.sf_database,
)
queries += query
queries = queries.strip()
return queries

def validate_input_params(self):
Expand Down Expand Up @@ -508,7 +514,7 @@ def load_snowflake(self, source_df, target_table):
if self.authenticator is not None:
sf_options["sfAuthenticator"] = self.authenticator
self.log.info("snowflake package and options defined...!!!")
if len(source_df.take(1)) == 0:
if source_df.isEmpty():
self.write_mode = "Append"
if len(self.sf_cluster_keys) == 0:
# Without order by clause compared to above
Expand Down
2 changes: 1 addition & 1 deletion docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def copy_from_uc_sf(*args):
secret_scope = "your_databricks secrets scope name",
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_grantee_roles':'downstream_read_role1,downstream_read_role2', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':''}
)
uc_to_sf_copy.execute()
Expand Down
40 changes: 20 additions & 20 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -749,28 +749,28 @@ def run_snowflake_files(*args):
copy data from databricks to snowflake

As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys
    username : user id created for connecting to snowflake for ex: sample_user
    password : password information for about user for ex: P@$$word
    account : snowflake account information, not entire url for ex: sample_enterprise
    warehouse: warehouse/cluster information that user has access for ex: sample_warehouse
    database : default database that we want to connect for ex: sample_database
    role : role to which the user has write access for ex: sample_write_role
- `username` : user id created for connecting to snowflake for ex: sample_user
- `password` : password information for about user for ex: P@$$word
- `account` : snowflake account information, not entire url for ex: sample_enterprise
- `warehouse`: warehouse/cluster information that user has access for ex: sample_warehouse
- `database` : default database that we want to connect for ex: sample_database
- `role` : role to which the user has write access for ex: sample_write_role

UcToSnowflakeOperator can expects the following as inputs to copy data in parameters
one of Either dbx_sql or (dbx_catalog, dbx_database, dbx_table ) needs to be provided
    load_type (required): type of data load , acceptable values full or incremental
    dbx_catalog (optional) : name of the databricks catalog in which object resides
    dbx_database (optional): name of the databricks schema in which object is available
    dbx_table (optional) : name of the databricks object we want to copy to snowflake
    dbx_sql (optional) : Custom sql to extract data from databricks Unity Catalog
    sf_database (optional) : name of the snowflake database if different from the one in secret_scope
    sf_schema (required): name of the snowflake schema in which we want to copy the data
    sf_table (required) : name of the snowflake object to which we want to copy from databricks
    incremental_filter (required for incrmental mode) : condition to manage data before writing to snowflake
    dbx_data_filter (optional): filter condition on databricks source for full or incremental (if different from inremental_filter)
    sf_grantee_roles (optional) : snowflake roles to which we want to grant select/read access
    sf_cluster_keys (optional) : list of keys we want to cluster our snowflake table.
    write_mode (optional) : write mode to write into snowflake table ( overwrite, append etc)
- `load_type`: (required) type of data load , acceptable values full or incremental
- `dbx_catalog`: (optional) name of the databricks catalog in which object resides
- `dbx_database`: (optional) name of the databricks schema in which object is available
- `dbx_table`: (optional) name of the databricks object we want to copy to snowflake
- `dbx_sql`: (optional) Custom sql to extract data from databricks Unity Catalog
- `sf_database`: (optional) name of the snowflake database if different from the one in secret_scope
- `sf_schema`: (required) name of the snowflake schema in which we want to copy the data
- `sf_table`: (required) name of the snowflake object to which we want to copy from databricks
- `incremental_filter`: (required for incrmental mode) condition to manage data before writing to snowflake
- `dbx_data_filter`: (optional) filter condition on databricks source for full or incremental (if different from inremental_filter)
- `sf_grantee_roles`: (optional) snowflake roles to which we want to grant select/read access, can be a comma seperated string
- `sf_cluster_keys`: (optional) list of keys we want to cluster our snowflake table.
- `write_mode`: (optional) write mode to write into snowflake table ( overwrite, append etc)

```python title="uc_to_snowflake_operator"
from brickflow_plugins import UcToSnowflakeOperator
Expand All @@ -784,7 +784,7 @@ def run_snowflake_queries(*args):
write_mode ="overwrite",
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_grantee_roles':'downstream_read_role1,downstream_read_role2', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':'', 'dbx_sql':'Custom sql query to read data from UC'}
)
uc_to_sf_copy.execute()
Expand Down

0 comments on commit 3bfd80a

Please sign in to comment.