From f4a89c8f7080cbeb4b430b0c7688884ec0943f24 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 5 Jan 2023 21:41:59 +0100 Subject: [PATCH 1/7] added azure discovery --- setup.py | 3 ++- tap_spreadsheets_anywhere/file_utils.py | 11 +++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d07a440..3dc9109 100755 --- a/setup.py +++ b/setup.py @@ -18,7 +18,8 @@ 'protobuf>=4.21.12', 'openpyxl', 'xlrd', - 'paramiko' + 'paramiko', + 'azure-storage-blob>=12.14.0' ], entry_points=""" [console_scripts] diff --git a/tap_spreadsheets_anywhere/file_utils.py b/tap_spreadsheets_anywhere/file_utils.py index e4c5290..fc759b9 100644 --- a/tap_spreadsheets_anywhere/file_utils.py +++ b/tap_spreadsheets_anywhere/file_utils.py @@ -13,6 +13,7 @@ import tap_spreadsheets_anywhere.format_handler import tap_spreadsheets_anywhere.conversion as conversion import smart_open.ssh as ssh_transport +from azure.storage.blob import BlobServiceClient LOGGER = logging.getLogger(__name__) @@ -126,6 +127,8 @@ def get_matching_objects(table_spec, modified_since=None): target_objects = list_files_in_gs_bucket(bucket,table_spec.get('search_prefix')) elif protocol in ["http", "https"]: target_objects = convert_URL_to_file_list(table_spec) + elif protocol in ["azure"]: + target_objects = list_files_in_azure_bucket(bucket,table_spec.get('search_prefix')) else: raise ValueError("Protocol {} not yet supported. Pull Requests are welcome!") @@ -245,6 +248,14 @@ def list_files_in_gs_bucket(bucket, search_prefix=None): return target_objects +def list_files_in_azure_bucket(container_name, search_prefix=None): + sas_key = os.environ['AZURE_STORAGE_CONNECTION_STRING'] + blob_service_client = BlobServiceClient.from_connection_string(sas_key) + container_client = blob_service_client.get_container_client(container_name) + blob_iterator = container_client.list_blobs(name_starts_with=search_prefix) + return [{'Key': blob.name, 'LastModified': blob.last_modified} for blob in blob_iterator if blob.size > 0] + + def list_files_in_s3_bucket(bucket, search_prefix=None): s3_client = boto3.client('s3') From 3c4fb339135d2efcb1c81c4403e394872d9f123b Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 5 Jan 2023 21:48:30 +0100 Subject: [PATCH 2/7] debugging --- tap_spreadsheets_anywhere/file_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_spreadsheets_anywhere/file_utils.py b/tap_spreadsheets_anywhere/file_utils.py index fc759b9..78d97df 100644 --- a/tap_spreadsheets_anywhere/file_utils.py +++ b/tap_spreadsheets_anywhere/file_utils.py @@ -115,6 +115,7 @@ def parse_path(path): def get_matching_objects(table_spec, modified_since=None): protocol, bucket = parse_path(table_spec['path']) + LOGGER.info(f"Getting protocol {protocol} and bucket {bucket} from {table_spec['path']}") # TODO Breakout the transport schemes here similar to the registry/loading pattern used by smart_open if protocol == 's3': From cd7103f7df5a41a910ebf1e24af2a5ff68456889 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 5 Jan 2023 22:15:01 +0100 Subject: [PATCH 3/7] debugging --- tap_spreadsheets_anywhere/file_utils.py | 2 +- tap_spreadsheets_anywhere/format_handler.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/tap_spreadsheets_anywhere/file_utils.py b/tap_spreadsheets_anywhere/file_utils.py index 78d97df..d7a9855 100644 --- a/tap_spreadsheets_anywhere/file_utils.py +++ b/tap_spreadsheets_anywhere/file_utils.py @@ -131,7 +131,7 @@ def get_matching_objects(table_spec, modified_since=None): elif protocol in ["azure"]: target_objects = list_files_in_azure_bucket(bucket,table_spec.get('search_prefix')) else: - raise ValueError("Protocol {} not yet supported. Pull Requests are welcome!") + raise ValueError("Protocol {} (possibly azure) not yet supported. Pull Requests are welcome!") pattern = table_spec['pattern'] matcher = re.compile(pattern) diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index f566886..123caae 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -5,7 +5,7 @@ import tap_spreadsheets_anywhere.excel_handler import tap_spreadsheets_anywhere.json_handler import tap_spreadsheets_anywhere.jsonl_handler - +from azure.storage.blob import BlobServiceClient class InvalidFormatError(Exception): def __init__(self, fname, message="The file was not in the expected format"): @@ -18,6 +18,15 @@ def __str__(self): def get_streamreader(uri, universal_newlines=True,newline='',open_mode='r'): + if uri.startswith('azure://'): + connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] + transport_params = { + 'client': BlobServiceClient.from_connection_string(connect_str), + } + streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape', + transport_params=transport_params) + return streamreader + streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape') if not universal_newlines and isinstance(streamreader, StreamReader): return monkey_patch_streamreader(streamreader) From 53d8429657be1cca363a7c2a2a23eb0f7d4a97bd Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 5 Jan 2023 22:42:01 +0100 Subject: [PATCH 4/7] debugging --- tap_spreadsheets_anywhere/format_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index 123caae..cbcbf97 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -6,6 +6,7 @@ import tap_spreadsheets_anywhere.json_handler import tap_spreadsheets_anywhere.jsonl_handler from azure.storage.blob import BlobServiceClient +import os class InvalidFormatError(Exception): def __init__(self, fname, message="The file was not in the expected format"): From 26b0fd478fd9af7611267b6ac103bd6a3fb913d8 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 12 Jan 2023 19:29:06 +0100 Subject: [PATCH 5/7] kwarg transport params --- tap_spreadsheets_anywhere/format_handler.py | 39 +++++++++++++++------ 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index cbcbf97..9977ed8 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -19,20 +19,39 @@ def __str__(self): def get_streamreader(uri, universal_newlines=True,newline='',open_mode='r'): - if uri.startswith('azure://'): - connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] - transport_params = { - 'client': BlobServiceClient.from_connection_string(connect_str), - } - streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape', - transport_params=transport_params) - return streamreader - - streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape') + kwarg_dispatch = { + "azure": lambda: { + "transport_params": { + "client": BlobServiceClient.from_connection_string( + os.environ['AZURE_STORAGE_CONNECTION_STRING'], + ) + } + }, + } + + SCHEME_SEP = "://" + kwargs = kwarg_dispatch.get(uri.split(SCHEME_SEP, 1)[0], lambda: {})() + + streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape', **kwargs) + if not universal_newlines and isinstance(streamreader, StreamReader): return monkey_patch_streamreader(streamreader) return streamreader + # if uri.startswith('azure://'): + # connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] + # transport_params = { + # 'client': BlobServiceClient.from_connection_string(connect_str), + # } + # streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape', + # transport_params=transport_params) + # return streamreader + + # streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape') + # if not universal_newlines and isinstance(streamreader, StreamReader): + # return monkey_patch_streamreader(streamreader) + # return streamreader + def monkey_patch_streamreader(streamreader): streamreader.mp_newline = '\n' From 8c6790c8ee405af8f463945f774a176ad7b33fed Mon Sep 17 00:00:00 2001 From: Henning Date: Fri, 13 Jan 2023 21:49:21 +0100 Subject: [PATCH 6/7] remove commented-out code --- tap_spreadsheets_anywhere/format_handler.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index 9977ed8..2d1fc68 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -38,20 +38,6 @@ def get_streamreader(uri, universal_newlines=True,newline='',open_mode='r'): return monkey_patch_streamreader(streamreader) return streamreader - # if uri.startswith('azure://'): - # connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] - # transport_params = { - # 'client': BlobServiceClient.from_connection_string(connect_str), - # } - # streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape', - # transport_params=transport_params) - # return streamreader - - # streamreader = smart_open.open(uri, open_mode, newline=newline, errors='surrogateescape') - # if not universal_newlines and isinstance(streamreader, StreamReader): - # return monkey_patch_streamreader(streamreader) - # return streamreader - def monkey_patch_streamreader(streamreader): streamreader.mp_newline = '\n' From 630d34b173702f40a4e071c37ca9526827d873da Mon Sep 17 00:00:00 2001 From: Henning Date: Fri, 13 Jan 2023 21:50:44 +0100 Subject: [PATCH 7/7] remove logging comment --- tap_spreadsheets_anywhere/file_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_spreadsheets_anywhere/file_utils.py b/tap_spreadsheets_anywhere/file_utils.py index d7a9855..fc759b9 100644 --- a/tap_spreadsheets_anywhere/file_utils.py +++ b/tap_spreadsheets_anywhere/file_utils.py @@ -115,7 +115,6 @@ def parse_path(path): def get_matching_objects(table_spec, modified_since=None): protocol, bucket = parse_path(table_spec['path']) - LOGGER.info(f"Getting protocol {protocol} and bucket {bucket} from {table_spec['path']}") # TODO Breakout the transport schemes here similar to the registry/loading pattern used by smart_open if protocol == 's3': @@ -131,7 +130,7 @@ def get_matching_objects(table_spec, modified_since=None): elif protocol in ["azure"]: target_objects = list_files_in_azure_bucket(bucket,table_spec.get('search_prefix')) else: - raise ValueError("Protocol {} (possibly azure) not yet supported. Pull Requests are welcome!") + raise ValueError("Protocol {} not yet supported. Pull Requests are welcome!") pattern = table_spec['pattern'] matcher = re.compile(pattern)