Skip to content

Commit

Permalink
Cloud-DB extractor: Fix windows compatibility (#71)
Browse files Browse the repository at this point in the history
Updated path refs to make windows compatible
Support for table names with [ ] (transactSQL)
Improved logging during cursor read
Improved documentation
  • Loading branch information
FrancoisZim authored Jul 29, 2022
1 parent 0f9062b commit bf2484c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
61 changes: 55 additions & 6 deletions Community-Supported/clouddb-extractor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ __Current Version__: 1.0

Cloud Database Extractor Utility - This sample shows how to extract data from a cloud database to a published hyper extract and append/update/delete rows to keep up to date.

A detailed article about this utility is availabe at: https://www.tableau.com/developer/learning/how-synchronize-your-cloud-data-tableau-extracts-scale

# Overview
This package defines a standard Extractor Interface which is extended by specific implementations
to support specific cloud databases. For most use cases you will probably only ever call the
Expand Down Expand Up @@ -42,6 +44,7 @@ $ python3 extractor_cli.py --help
{load_sample,export_load,append,update,delete}
[--extractor {bigquery}]
[--source_table_id SOURCE_TABLE_ID]
[--overwrite]
[--tableau_project TABLEAU_PROJECT]
--tableau_datasource TABLEAU_DATASOURCE
[--tableau_hostname TABLEAU_HOSTNAME]
Expand All @@ -63,16 +66,62 @@ $ python3 extractor_cli.py --help
```

### Sample Usage
Before use you should modify the file config.yml with your tableau and database settings.

__Load Sample:__ Load a sample (default=1000 lines) from test_table to sample_extract in test_project:
```console
# Load a sample (default=1000 lines) from test_table to sample_extract in test_project
python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract
python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract
```

# Load a full extract from test_table to full_extract in test_project
python3 extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource full_extract
__Full Export:__ Load a full extract from test_table to full_extract in test_project:
```console
python extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--source_table_id "test_table" --tableau_project "test_project" --tableau_datasource "test_datasource"
```

# Execute updated_rows.sql to retrieve a changeset and update full_extract where ROW_ID in changeset matches
python3 extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --sqlfile updated_rows.sql --tableau_project test_project --tableau_datasource full_extract --match_columns ROW_ID ROW_ID

__Append:__ Execute new_rows.sql to retrieve a changeset and append to test_datasource:
```console
# new_rows.sql:
SELECT * FROM staging_table

python extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--sqlfile new_rows.sql --tableau_project "test_project" --tableau_datasource "test_datasource"
```

__Update:__ Execute updated_rows.sql to retrieve a changeset and update test_datasource where primary key columns in changeset (METRIC_ID and METRIC_DATE) match corresponding columns in target datasource:
```console
# updated_rows.sql:
SELECT * FROM source_table WHERE LOAD_TIMESTAMP<UPDATE_TIMESTAMP

python extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--sqlfile updated_rows.sql --tableau_project "test_project" --tableau_datasource "test_datasource" \
--match_columns METRIC_ID METRIC_ID --match_columns METRIC_DATE METRIC_DATE
```

__Delete:__ Execute deleted_rows.sql to retrieve a changeset containing the primary key columns that identify which rows have been deleted. a list of deleted rows. Delete full_extract where METRIC_ID and METRIC_DATE in changeset match corresponding columns in target datasource:
```console
# deleted_rows.sql:
SELECT METRIC_ID, METRIC_DATE FROM source_table_deleted_rows

python extractor_cli.py delete --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--sqlfile deleted_rows.sql --tableau_project "test_project" --tableau_datasource "full_extract" \
--match_columns METRIC_ID METRIC_ID --match_columns METRIC_DATE METRIC_DATE
```

__Conditional Delete:__ In this example no changeset is provided - records to be deleted are identified using the conditions specified in delete_conditions.json
```console
# delete_conditions.json
{
"op": "lt",
"target-col": "ORDER_DATE",
"const": {"type": "datetime", "v": "2018-02-01T00:00:00Z"}
}

python extractor_cli.py delete --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
--tableau_project "test_project" --tableau_datasource "full_extract" \
--match_conditions_json=delete_conditions.json
```

# Installation
Expand Down
30 changes: 18 additions & 12 deletions Community-Supported/clouddb-extractor/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@
Set to Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU to disable
"""

TEMP_DIR: str = "/tmp"
"""
TEMP_DIR (str): Local staging directory for hyper files, database exports etc.
"""

SAMPLE_ROWS: int = 1000
"""
SAMPLE_ROWS (int): Default number of rows for LIMIT when using load_sample
Expand All @@ -88,11 +83,17 @@
multiple hosts
"""

# DATASOURCE_LOCKFILE_PREFIX: str = "/var/lock/tableau_extractor"
DATASOURCE_LOCKFILE_PREFIX: str = "/tmp/lock.tableau_extractor"
DATASOURCE_LOCKFILE_PREFIX: str = "tableau_extractor"
"""
DATASOURCE_LOCKFILE_PREFIX (str): Defines the location of lockfiles
DATASOURCE_LOCKFILE_PREFIX (str): Defines the naming convention for lockfiles
"""

TEMP_DIR: str = "/tmp"
"""
TEMP_DIR (str): Local staging directory for hyper files, database exports etc.
"""
if os.name == 'nt':
TEMP_DIR = os.environ.get('TEMP')

DEFAULT_SITE_ID: str = ""
"""
Expand Down Expand Up @@ -190,7 +191,7 @@ def wrapper_debug(*args, **kwargs):

def tempfile_name(prefix: str = "", suffix: str = "") -> str:
"""Return a unique temporary file name."""
return "{}/tableau_extractor_{}{}{}".format(TEMP_DIR, prefix, uuid.uuid4().hex, suffix)
return os.path.join(TEMP_DIR, "{}_tableau_extractor_{}{}".format(prefix, uuid.uuid4().hex, suffix))


class BaseExtractor(ABC):
Expand Down Expand Up @@ -271,8 +272,7 @@ def quoted_sql_identifier(self, sql_identifier: str) -> str:
if len(sql_identifier) > maxlength:
raise Exception("Invalid SQL identifier: {} - exceeded max allowed length: {}".format(sql_identifier, maxlength))

# char_whitelist = re.compile("^[A-Za-z0-9_-.]*$")
char_whitelist = re.compile(r"\A[\w\.\-]*\Z")
char_whitelist = re.compile(r"\A[\[\w\.\-\]]*\Z")
if char_whitelist.match(sql_identifier) is None:
raise Exception("Invalid SQL identifier: {} - found invalid characters".format(sql_identifier))

Expand Down Expand Up @@ -317,7 +317,7 @@ def _datasource_lock(self, tab_ds_name: str) -> FileLock:
#exclusive lock active for datasource here
#exclusive lock released for datasource here
"""
lock_path = "{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name)
lock_path = os.path.join(TEMP_DIR,"{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name))
return FileLock(lock_path, timeout=DATASOURCE_LOCK_TIMEOUT)

def _get_project_id(self, tab_project: str) -> str:
Expand Down Expand Up @@ -400,13 +400,19 @@ def query_result_to_hyper_file(
inserter.execute()
else:
assert cursor is not None
logger.info(f"Spooling cursor to hyper file, DBAPI_BATCHSIZE={self.dbapi_batchsize}")
batches=0
if rows:
# We have rows in the buffer from where we determined the cursor.description for server side cursor
inserter.add_rows(rows)
batches+=1
while True:
rows = cursor.fetchmany(self.dbapi_batchsize)
if rows:
inserter.add_rows(rows)
batches+=1
if batches % 10 == 0:
logger.info(f"Completed Batch {batches}")
else:
break
inserter.execute()
Expand Down
1 change: 0 additions & 1 deletion Community-Supported/clouddb-extractor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
filelock==3.0.12
PyYAML==5.4.1
toml==0.10.2
typed-ast==1.4.3
types-filelock==0.1.3
types-futures==0.1.3
types-protobuf==0.1.11
Expand Down

0 comments on commit bf2484c

Please sign in to comment.