From 8293ac5921ef7541c29ece1104e1e55505c96213 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Fri, 22 Nov 2024 15:54:06 -0500 Subject: [PATCH] change drop table --- target_mssql/connector.py | 40 ++++++++++++++++++++++++++++++++++ target_mssql/sinks.py | 46 +++++++++++++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/target_mssql/connector.py b/target_mssql/connector.py index 090cf78..35b806d 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -19,6 +19,46 @@ class mssqlConnector(SQLConnector): allow_column_alter: bool = True # Whether altering column types is supported. allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported. allow_temp_tables: bool = True # Whether temp tables are supported. + dropped_tables = dict() + + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + # NOTE: Force create the table + # TODO: remove this + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True + + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) def create_table_with_records( self, diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index ade41a7..103cbea 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -148,6 +148,45 @@ def column_representation( ) return columns + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + # NOTE: Force create the table + # TODO: remove this + if not self.dropped_tables.get(self.stream_name, False): + self.logger.info("Force dropping the table!") + self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") + self.dropped_tables[self.stream_name] = True + + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) + def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. Writes a batch to the SQL target. Developers may override this method @@ -160,13 +199,6 @@ def process_batch(self, context: dict) -> None: if self.key_properties: self.logger.info(f"Preparing table {self.full_table_name}") - # NOTE: Force create the table - # TODO: remove this - if not self.dropped_tables.get(self.stream_name, False): - self.logger.info("Force dropping the table!") - self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};") - self.dropped_tables[self.stream_name] = True - conformed_schema = self.conform_schema(self.schema) self.connector.prepare_table( full_table_name=self.full_table_name,