diff --git a/Community-Supported/clouddb-extractor/README.md b/Community-Supported/clouddb-extractor/README.md index eb3f143..3d8c4fa 100644 --- a/Community-Supported/clouddb-extractor/README.md +++ b/Community-Supported/clouddb-extractor/README.md @@ -7,6 +7,8 @@ __Current Version__: 1.0 Cloud Database Extractor Utility - This sample shows how to extract data from a cloud database to a published hyper extract and append/update/delete rows to keep up to date. +A detailed article about this utility is availabe at: https://www.tableau.com/developer/learning/how-synchronize-your-cloud-data-tableau-extracts-scale + # Overview This package defines a standard Extractor Interface which is extended by specific implementations to support specific cloud databases. For most use cases you will probably only ever call the @@ -42,6 +44,7 @@ $ python3 extractor_cli.py --help {load_sample,export_load,append,update,delete} [--extractor {bigquery}] [--source_table_id SOURCE_TABLE_ID] + [--overwrite] [--tableau_project TABLEAU_PROJECT] --tableau_datasource TABLEAU_DATASOURCE [--tableau_hostname TABLEAU_HOSTNAME] @@ -63,16 +66,62 @@ $ python3 extractor_cli.py --help ``` ### Sample Usage +Before use you should modify the file config.yml with your tableau and database settings. +__Load Sample:__ Load a sample (default=1000 lines) from test_table to sample_extract in test_project: ```console -# Load a sample (default=1000 lines) from test_table to sample_extract in test_project -python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract +python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \ + --source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract +``` -# Load a full extract from test_table to full_extract in test_project -python3 extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource full_extract +__Full Export:__ Load a full extract from test_table to full_extract in test_project: +```console +python extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \ + --source_table_id "test_table" --tableau_project "test_project" --tableau_datasource "test_datasource" + ``` -# Execute updated_rows.sql to retrieve a changeset and update full_extract where ROW_ID in changeset matches -python3 extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --sqlfile updated_rows.sql --tableau_project test_project --tableau_datasource full_extract --match_columns ROW_ID ROW_ID + +__Append:__ Execute new_rows.sql to retrieve a changeset and append to test_datasource: +```console +# new_rows.sql: +SELECT * FROM staging_table + +python extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \ + --sqlfile new_rows.sql --tableau_project "test_project" --tableau_datasource "test_datasource" +``` + +__Update:__ Execute updated_rows.sql to retrieve a changeset and update test_datasource where primary key columns in changeset (METRIC_ID and METRIC_DATE) match corresponding columns in target datasource: +```console +# updated_rows.sql: +SELECT * FROM source_table WHERE LOAD_TIMESTAMP str: """Return a unique temporary file name.""" - return "{}/tableau_extractor_{}{}{}".format(TEMP_DIR, prefix, uuid.uuid4().hex, suffix) + return os.path.join(TEMP_DIR, "{}_tableau_extractor_{}{}".format(prefix, uuid.uuid4().hex, suffix)) class BaseExtractor(ABC): @@ -271,8 +272,7 @@ def quoted_sql_identifier(self, sql_identifier: str) -> str: if len(sql_identifier) > maxlength: raise Exception("Invalid SQL identifier: {} - exceeded max allowed length: {}".format(sql_identifier, maxlength)) - # char_whitelist = re.compile("^[A-Za-z0-9_-.]*$") - char_whitelist = re.compile(r"\A[\w\.\-]*\Z") + char_whitelist = re.compile(r"\A[\[\w\.\-\]]*\Z") if char_whitelist.match(sql_identifier) is None: raise Exception("Invalid SQL identifier: {} - found invalid characters".format(sql_identifier)) @@ -317,7 +317,7 @@ def _datasource_lock(self, tab_ds_name: str) -> FileLock: #exclusive lock active for datasource here #exclusive lock released for datasource here """ - lock_path = "{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name) + lock_path = os.path.join(TEMP_DIR,"{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name)) return FileLock(lock_path, timeout=DATASOURCE_LOCK_TIMEOUT) def _get_project_id(self, tab_project: str) -> str: @@ -400,13 +400,19 @@ def query_result_to_hyper_file( inserter.execute() else: assert cursor is not None + logger.info(f"Spooling cursor to hyper file, DBAPI_BATCHSIZE={self.dbapi_batchsize}") + batches=0 if rows: # We have rows in the buffer from where we determined the cursor.description for server side cursor inserter.add_rows(rows) + batches+=1 while True: rows = cursor.fetchmany(self.dbapi_batchsize) if rows: inserter.add_rows(rows) + batches+=1 + if batches % 10 == 0: + logger.info(f"Completed Batch {batches}") else: break inserter.execute() diff --git a/Community-Supported/clouddb-extractor/requirements.txt b/Community-Supported/clouddb-extractor/requirements.txt index e0a7bc5..1a86ac3 100644 --- a/Community-Supported/clouddb-extractor/requirements.txt +++ b/Community-Supported/clouddb-extractor/requirements.txt @@ -1,7 +1,6 @@ filelock==3.0.12 PyYAML==5.4.1 toml==0.10.2 -typed-ast==1.4.3 types-filelock==0.1.3 types-futures==0.1.3 types-protobuf==0.1.11