From b2f6e6701283f49fd060a9d1c029d859b654c1e7 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:20:23 +0200 Subject: [PATCH 01/12] Hardcode TRUNCATECOLUMNS to see if that helps --- target_redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 2ddbb35..3aba74a 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -155,7 +155,7 @@ def persist_csv_rows(self, aws_secret_access_key= credentials.get('aws_secret_access_key') aws_session_token = credentials.get('aws_session_token') - copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {}').format( + copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {} TRUNCATECOLUMNS').format( sql.Identifier(self.postgres_schema), sql.Identifier(temp_table_name), sql.SQL(', ').join(map(sql.Identifier, columns)), From 55bf591e0c6cacededa02e822ee0297769492772 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:34:52 +0200 Subject: [PATCH 02/12] Try to make this configurable --- target_redshift/__init__.py | 3 ++- target_redshift/redshift.py | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/target_redshift/__init__.py b/target_redshift/__init__.py index a962d66..01dabb2 100644 --- a/target_redshift/__init__.py +++ b/target_redshift/__init__.py @@ -40,7 +40,8 @@ def main(config, input_stream=None): redshift_schema=config.get('redshift_schema', 'public'), logging_level=config.get('logging_level'), default_column_length=config.get('default_column_length', 1000), - persist_empty_tables=config.get('persist_empty_tables') + persist_empty_tables=config.get('persist_empty_tables'), + truncate_columns=config.get('truncate_columns') ) if input_stream: diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 3aba74a..10420c7 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -44,12 +44,14 @@ class RedshiftTarget(PostgresTarget): MAX_VARCHAR = 65535 CREATE_TABLE_INITIAL_COLUMN = '_sdc_target_redshift_create_table_placeholder' CREATE_TABLE_INITIAL_COLUMN_TYPE = 'BOOLEAN' + TRUNCATE_COLUMNS_LITERAL = 'TRUNCATECOLUMNS' def __init__(self, connection, s3, *args, redshift_schema='public', logging_level=None, default_column_length=DEFAULT_COLUMN_LENGTH, persist_empty_tables=False, + truncate_columns=False **kwargs): self.LOGGER.info( @@ -58,6 +60,7 @@ def __init__(self, connection, s3, *args, self.s3 = s3 self.default_column_length = default_column_length + self.truncate_columns = truncate_columns PostgresTarget.__init__(self, connection, postgres_schema=redshift_schema, logging_level=logging_level, persist_empty_tables=persist_empty_tables, add_upsert_indexes=False) @@ -155,7 +158,7 @@ def persist_csv_rows(self, aws_secret_access_key= credentials.get('aws_secret_access_key') aws_session_token = credentials.get('aws_session_token') - copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {} TRUNCATECOLUMNS').format( + copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {} {}').format( sql.Identifier(self.postgres_schema), sql.Identifier(temp_table_name), sql.SQL(', ').join(map(sql.Identifier, columns)), @@ -165,7 +168,8 @@ def persist_csv_rows(self, aws_secret_access_key, ";token={}".format(aws_session_token) if aws_session_token else '', )), - sql.Literal(RESERVED_NULL_DEFAULT)) + sql.Literal(RESERVED_NULL_DEFAULT), + sql.Literal(self.TRUNCATE_COLUMNS_LITERAL) if self.truncate_columns else '') cur.execute(copy_sql) From 9ce1edfa9f9e458e9d008ccd19a05e3c9a8fe587 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:40:26 +0200 Subject: [PATCH 03/12] Add documentation --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index e84bcd1..67cbb3e 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ here. | `max_buffer_size` | `["integer", "null"]` | `104857600` (100MB in bytes) | The maximum number of bytes to buffer in memory before writing to the destination table in Redshift | `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. | `persist_empty_tables` | `["boolean", "null"]` | `False` | Whether the Target should create tables which have no records present in Remote. | +| `truncate_columns` | `["boolean", "null"]` | `False` | Whether the Target should truncate values longer than `default_column_length` when loading into Redshift. Adds the `TRUNCATECOLUMNS` data conversion parameter to Redshift's COPY command. | `default_column_length` | `["integer", "null"]` | `1000` | All columns with the VARCHAR(CHARACTER VARYING) type will be have this length.Range: 1-65535. | | `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. | | `target_s3` | `["object"]` | `N/A` | See `S3` below | From 4f7df3b450faffabaae9901256efd701747582ed Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:42:03 +0200 Subject: [PATCH 04/12] Add links --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 67cbb3e..32f283e 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ here. | `max_buffer_size` | `["integer", "null"]` | `104857600` (100MB in bytes) | The maximum number of bytes to buffer in memory before writing to the destination table in Redshift | `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. | `persist_empty_tables` | `["boolean", "null"]` | `False` | Whether the Target should create tables which have no records present in Remote. | -| `truncate_columns` | `["boolean", "null"]` | `False` | Whether the Target should truncate values longer than `default_column_length` when loading into Redshift. Adds the `TRUNCATECOLUMNS` data conversion parameter to Redshift's COPY command. +| `truncate_columns` | `["boolean", "null"]` | `False` | Whether the Target should truncate values longer than `default_column_length` when loading into Redshift. Adds the [`TRUNCATECOLUMNS` data conversion parameter](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-truncatecolumns) to Redshift's [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html). | `default_column_length` | `["integer", "null"]` | `1000` | All columns with the VARCHAR(CHARACTER VARYING) type will be have this length.Range: 1-65535. | | `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. | | `target_s3` | `["object"]` | `N/A` | See `S3` below | From ed95e8a27b12a75eea98d2c3e325965c1a2cd2fd Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:47:10 +0200 Subject: [PATCH 05/12] :facepalm: --- target_redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 10420c7..20f2c2d 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -51,7 +51,7 @@ def __init__(self, connection, s3, *args, logging_level=None, default_column_length=DEFAULT_COLUMN_LENGTH, persist_empty_tables=False, - truncate_columns=False + truncate_columns=False, **kwargs): self.LOGGER.info( From 770973089d16b4d689084e7192950322d5eee530 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:53:12 +0200 Subject: [PATCH 06/12] Not literal --- target_redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 20f2c2d..2f4e15e 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -169,7 +169,7 @@ def persist_csv_rows(self, ";token={}".format(aws_session_token) if aws_session_token else '', )), sql.Literal(RESERVED_NULL_DEFAULT), - sql.Literal(self.TRUNCATE_COLUMNS_LITERAL) if self.truncate_columns else '') + self.TRUNCATE_COLUMNS_LITERAL if self.truncate_columns else '') cur.execute(copy_sql) From 20514f523b32b889f49bbd7906946671929cceee Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Wed, 13 May 2020 11:54:31 +0200 Subject: [PATCH 07/12] At least SQL? --- target_redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 2f4e15e..02e159d 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -169,7 +169,7 @@ def persist_csv_rows(self, ";token={}".format(aws_session_token) if aws_session_token else '', )), sql.Literal(RESERVED_NULL_DEFAULT), - self.TRUNCATE_COLUMNS_LITERAL if self.truncate_columns else '') + sql.SQL(self.TRUNCATE_COLUMNS_LITERAL) if self.truncate_columns else '') cur.execute(copy_sql) From 30ba62ff66d7b824fd5e5edc8f9d6524492b9e21 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Fri, 15 May 2020 11:01:35 +0200 Subject: [PATCH 08/12] Include else branch in interpolation --- target_redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 02e159d..0b842f7 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -169,7 +169,7 @@ def persist_csv_rows(self, ";token={}".format(aws_session_token) if aws_session_token else '', )), sql.Literal(RESERVED_NULL_DEFAULT), - sql.SQL(self.TRUNCATE_COLUMNS_LITERAL) if self.truncate_columns else '') + sql.SQL(self.TRUNCATE_COLUMNS_LITERAL if self.truncate_columns else '')) cur.execute(copy_sql) From 68c837ccae659d19543033445ae99134df238120 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Fri, 15 May 2020 11:02:03 +0200 Subject: [PATCH 09/12] Add test for new config option This test adds 100 cats with a long description and asserts that they all insert correctly (Redshift bails if the content is too long if the TRUNCATECOLUMNS option is not set) and that the longest record for that column equals the max column length. Tested using the docker setup for this project: source /code/venv--target-redshift/bin/activate pytest tests/test_target_redshift.py -k 'test_truncate_columns' --- tests/fixtures.py | 13 +++++++++++++ tests/test_target_redshift.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 73e3b0a..aafcca8 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -65,6 +65,9 @@ 'age': { 'type': ['null', 'integer'] }, + 'description': { + 'type': ['null', 'string'] + }, 'adoption': { 'type': ['object', 'null'], 'properties': { @@ -217,6 +220,16 @@ def generate_record(self): } +class LongCatStream(CatStream): + def generate_record(self): + record = CatStream.generate_record(self) + + # add some seriously long text + record['description'] = fake.paragraph(nb_sentences=1000) + + return record + + class InvalidCatStream(CatStream): def generate_record(self): record = CatStream.generate_record(self) diff --git a/tests/test_target_redshift.py b/tests/test_target_redshift.py index b2f1b67..0eed750 100644 --- a/tests/test_target_redshift.py +++ b/tests/test_target_redshift.py @@ -6,7 +6,7 @@ import psycopg2.extras import pytest -from fixtures import CatStream, CONFIG, db_prep, MultiTypeStream, NestedStream, TEST_DB +from fixtures import CatStream, CONFIG, db_prep, MultiTypeStream, NestedStream, TEST_DB, LongCatStream from target_postgres import singer_stream from target_postgres.target_tools import TargetError @@ -711,3 +711,32 @@ def test_deduplication_existing_new_rows(db_prep): assert len(sequences) == 1 assert sequences[0][0] == original_sequence + + +def test_truncate_columns(db_prep): + stream = LongCatStream(100, version=1, nested_count=2) + + # this is what we're testing for + CONFIG['truncate_columns'] = True + CONFIG['default_column_length'] = 1000 + + main(CONFIG, input_stream=stream) + + with psycopg2.connect(**TEST_DB) as conn: + with conn.cursor() as cur: + cur.execute(get_count_sql('cats')) + table_count = cur.fetchone()[0] + + cur.execute(sql.SQL('SELECT {} FROM {}.{}').format( + sql.SQL('MAX(LEN(description))'), + sql.Identifier(CONFIG['redshift_schema']), + sql.Identifier('cats') + )) + + max_length = cur.fetchone()[0] + + # check if all records were inserted + assert table_count == 100 + + # check if they were truncated properly. LongCats description is definitely longer + assert max_length == CONFIG['default_column_length'] From 1372bbc2e15d15fdfd769e95f068a41f782e2eed Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Fri, 15 May 2020 11:08:47 +0200 Subject: [PATCH 10/12] Assert minimum length as well --- tests/test_target_redshift.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/test_target_redshift.py b/tests/test_target_redshift.py index 0eed750..7afb0fb 100644 --- a/tests/test_target_redshift.py +++ b/tests/test_target_redshift.py @@ -727,16 +727,22 @@ def test_truncate_columns(db_prep): cur.execute(get_count_sql('cats')) table_count = cur.fetchone()[0] - cur.execute(sql.SQL('SELECT {} FROM {}.{}').format( + cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format( sql.SQL('MAX(LEN(description))'), + sql.SQL('MIN(LEN(description))'), sql.Identifier(CONFIG['redshift_schema']), sql.Identifier('cats') )) - max_length = cur.fetchone()[0] + result = cur.fetchone() + max_length = result[0] + min_length = result[1] # check if all records were inserted assert table_count == 100 - # check if they were truncated properly. LongCats description is definitely longer + # check if they were truncated properly. + # LongCats' description is definitely longer than 1000 bytes, + # so it should always end up at exactly 1000 assert max_length == CONFIG['default_column_length'] + assert min_length == CONFIG['default_column_length'] From 1ba00cfd56b9e2f832d9d11b33316881266aac62 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Tue, 19 May 2020 09:37:23 +0200 Subject: [PATCH 11/12] Refactor into more generic redshift_copy_options list option This allows to pass a list of options to redshift's copy command instead of just enabling to set a single option. --- target_redshift/__init__.py | 2 +- target_redshift/redshift.py | 12 ++++++++---- tests/test_target_redshift.py | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/target_redshift/__init__.py b/target_redshift/__init__.py index 01dabb2..4142d50 100644 --- a/target_redshift/__init__.py +++ b/target_redshift/__init__.py @@ -41,7 +41,7 @@ def main(config, input_stream=None): logging_level=config.get('logging_level'), default_column_length=config.get('default_column_length', 1000), persist_empty_tables=config.get('persist_empty_tables'), - truncate_columns=config.get('truncate_columns') + redshift_copy_options=config.get('redshift_copy_options') ) if input_stream: diff --git a/target_redshift/redshift.py b/target_redshift/redshift.py index 0b842f7..830a0db 100644 --- a/target_redshift/redshift.py +++ b/target_redshift/redshift.py @@ -44,14 +44,13 @@ class RedshiftTarget(PostgresTarget): MAX_VARCHAR = 65535 CREATE_TABLE_INITIAL_COLUMN = '_sdc_target_redshift_create_table_placeholder' CREATE_TABLE_INITIAL_COLUMN_TYPE = 'BOOLEAN' - TRUNCATE_COLUMNS_LITERAL = 'TRUNCATECOLUMNS' def __init__(self, connection, s3, *args, redshift_schema='public', logging_level=None, default_column_length=DEFAULT_COLUMN_LENGTH, persist_empty_tables=False, - truncate_columns=False, + redshift_copy_options=[], **kwargs): self.LOGGER.info( @@ -60,7 +59,12 @@ def __init__(self, connection, s3, *args, self.s3 = s3 self.default_column_length = default_column_length - self.truncate_columns = truncate_columns + + if isinstance(redshift_copy_options, list): + self.redshift_copy_options = redshift_copy_options + else: + self.redshift_copy_options = [] + PostgresTarget.__init__(self, connection, postgres_schema=redshift_schema, logging_level=logging_level, persist_empty_tables=persist_empty_tables, add_upsert_indexes=False) @@ -169,7 +173,7 @@ def persist_csv_rows(self, ";token={}".format(aws_session_token) if aws_session_token else '', )), sql.Literal(RESERVED_NULL_DEFAULT), - sql.SQL(self.TRUNCATE_COLUMNS_LITERAL if self.truncate_columns else '')) + sql.SQL(' '.join(self.redshift_copy_options))) cur.execute(copy_sql) diff --git a/tests/test_target_redshift.py b/tests/test_target_redshift.py index 7afb0fb..01be183 100644 --- a/tests/test_target_redshift.py +++ b/tests/test_target_redshift.py @@ -717,7 +717,7 @@ def test_truncate_columns(db_prep): stream = LongCatStream(100, version=1, nested_count=2) # this is what we're testing for - CONFIG['truncate_columns'] = True + CONFIG['redshift_copy_options'] = ['TRUNCATECOLUMNS'] CONFIG['default_column_length'] = 1000 main(CONFIG, input_stream=stream) From f7b0b1873a80be56da3f4bd0c6869238a147f897 Mon Sep 17 00:00:00 2001 From: Stefan <1188614+stefankeidel@users.noreply.github.com> Date: Tue, 19 May 2020 09:42:34 +0200 Subject: [PATCH 12/12] Update docs --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 32f283e..66c8e1e 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ here. | `max_buffer_size` | `["integer", "null"]` | `104857600` (100MB in bytes) | The maximum number of bytes to buffer in memory before writing to the destination table in Redshift | `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. | `persist_empty_tables` | `["boolean", "null"]` | `False` | Whether the Target should create tables which have no records present in Remote. | -| `truncate_columns` | `["boolean", "null"]` | `False` | Whether the Target should truncate values longer than `default_column_length` when loading into Redshift. Adds the [`TRUNCATECOLUMNS` data conversion parameter](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-truncatecolumns) to Redshift's [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html). +| `redshift_copy_options` | `["list"]` | `[]` | Allows adding additional options to the [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) statement sent to Redshift. A list of available parameters can be found [here](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html). For example, this could be set to `["TRUNCATECOLUMNS"]` to enable the [`TRUNCATECOLUMNS` data conversion parameter](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-truncatecolumns). | `default_column_length` | `["integer", "null"]` | `1000` | All columns with the VARCHAR(CHARACTER VARYING) type will be have this length.Range: 1-65535. | | `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. | | `target_s3` | `["object"]` | `N/A` | See `S3` below |