Skip to content

Commit

Permalink
change drop table
Browse files Browse the repository at this point in the history
  • Loading branch information
hsyyid committed Nov 22, 2024
1 parent 49b0137 commit 8293ac5
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 7 deletions.
40 changes: 40 additions & 0 deletions target_mssql/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,46 @@ class mssqlConnector(SQLConnector):
allow_column_alter: bool = True # Whether altering column types is supported.
allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.
dropped_tables = dict()

def prepare_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str],
partition_keys: list[str] | None = None,
as_temp_table: bool = False,
) -> None:
"""Adapt target table to provided schema if possible.
Args:
full_table_name: the target table name.
schema: the JSON Schema for the table.
primary_keys: list of key properties.
partition_keys: list of partition keys.
as_temp_table: True to create a temp table.
"""
# NOTE: Force create the table
# TODO: remove this
if not self.dropped_tables.get(self.stream_name, False):
self.logger.info("Force dropping the table!")
self.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};")
self.dropped_tables[self.stream_name] = True

if not self.table_exists(full_table_name=full_table_name):
self.create_empty_table(
full_table_name=full_table_name,
schema=schema,
primary_keys=primary_keys,
partition_keys=partition_keys,
as_temp_table=as_temp_table,
)
return

for property_name, property_def in schema["properties"].items():
self.prepare_column(
full_table_name, property_name, self.to_sql_type(property_def)
)

def create_table_with_records(
self,
Expand Down
46 changes: 39 additions & 7 deletions target_mssql/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,45 @@ def column_representation(
)
return columns

def prepare_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str],
partition_keys: list[str] | None = None,
as_temp_table: bool = False,
) -> None:
"""Adapt target table to provided schema if possible.
Args:
full_table_name: the target table name.
schema: the JSON Schema for the table.
primary_keys: list of key properties.
partition_keys: list of partition keys.
as_temp_table: True to create a temp table.
"""
# NOTE: Force create the table
# TODO: remove this
if not self.dropped_tables.get(self.stream_name, False):
self.logger.info("Force dropping the table!")
self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};")
self.dropped_tables[self.stream_name] = True

if not self.table_exists(full_table_name=full_table_name):
self.create_empty_table(
full_table_name=full_table_name,
schema=schema,
primary_keys=primary_keys,
partition_keys=partition_keys,
as_temp_table=as_temp_table,
)
return

for property_name, property_def in schema["properties"].items():
self.prepare_column(
full_table_name, property_name, self.to_sql_type(property_def)
)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.
Writes a batch to the SQL target. Developers may override this method
Expand All @@ -160,13 +199,6 @@ def process_batch(self, context: dict) -> None:
if self.key_properties:
self.logger.info(f"Preparing table {self.full_table_name}")

# NOTE: Force create the table
# TODO: remove this
if not self.dropped_tables.get(self.stream_name, False):
self.logger.info("Force dropping the table!")
self.connector.connection.execute(f"DROP TABLE IF EXISTS {self.full_table_name};")
self.dropped_tables[self.stream_name] = True

conformed_schema = self.conform_schema(self.schema)
self.connector.prepare_table(
full_table_name=self.full_table_name,
Expand Down

0 comments on commit 8293ac5

Please sign in to comment.