diff --git a/tap_spreadsheets_anywhere/file_utils.py b/tap_spreadsheets_anywhere/file_utils.py index e4c5290..29bbe8a 100644 --- a/tap_spreadsheets_anywhere/file_utils.py +++ b/tap_spreadsheets_anywhere/file_utils.py @@ -13,6 +13,7 @@ import tap_spreadsheets_anywhere.format_handler import tap_spreadsheets_anywhere.conversion as conversion import smart_open.ssh as ssh_transport +import smart_open.ftp as ftp_transport LOGGER = logging.getLogger(__name__) @@ -27,6 +28,14 @@ def resolve_target_uri(table_spec, target_filename): return table_spec['path'] + "/" + target_filename +def _hide_credentials(path): + import re + if path.startswith('sftp'): + return re.sub('sftp://.*?@', "********", path, flags=re.DOTALL) + elif path.startswith('ftp'): + return re.sub('ftp://.*?@', "********", path, flags=re.DOTALL) + return path + def write_file(target_filename, table_spec, schema, max_records=-1): LOGGER.info('Syncing file "{}".'.format(target_filename)) @@ -36,7 +45,7 @@ def write_file(target_filename, table_spec, schema, max_records=-1): iterator = tap_spreadsheets_anywhere.format_handler.get_row_iterator(table_spec, target_uri) for row in iterator: metadata = { - '_smart_source_bucket': table_spec['path'], + '_smart_source_bucket': _hide_credentials(table_spec['path']), '_smart_source_file': target_filename, # index zero, +1 for header row '_smart_source_lineno': records_synced + 2 @@ -122,6 +131,8 @@ def get_matching_objects(table_spec, modified_since=None): target_objects = list_files_in_local_bucket(bucket, table_spec.get('search_prefix')) elif protocol in ["sftp"]: target_objects = list_files_in_SSH_bucket(table_spec['path'],table_spec.get('search_prefix')) + elif protocol in ["ftp"]: + target_objects = list_files_in_ftp_server(table_spec['path'],table_spec.get('search_prefix')) elif protocol in ["gs"]: target_objects = list_files_in_gs_bucket(bucket,table_spec.get('search_prefix')) elif protocol in ["http", "https"]: @@ -210,6 +221,27 @@ def convert_URL_to_file_list(table_spec): raise ValueError(f"Configured URL {url} could not be read.") +def list_files_in_ftp_server(uri, search_prefix=None): + parsed_uri = ftp_transport.parse_uri(uri) + uri_path = parsed_uri.pop('uri_path') + secure_conn = True if parsed_uri["scheme"] == "ftps" else False + ftp = ftp_transport._connect(parsed_uri['host'], parsed_uri['user'], parsed_uri['port'], parsed_uri['password'], secure_conn, transport_params={}) + entries = [] + max_results = 10000 + from stat import S_ISREG + import fnmatch + for row in ftp.mlsd(uri_path): + if search_prefix is None or fnmatch.fnmatch(entry[0],search_prefix): + if row[1]['type'] == 'file': + entries.append({'Key':row[0],'LastModified':datetime.strptime(row[1]['modify'], '%Y%m%d%H%M%S').replace(tzinfo=timezone.utc)}) + if len(entries) > max_results: + raise print(f"Read more than {max_results} records from the path {uri_path}. Use a more specific " + f"search_prefix") + + LOGGER.info("Found {} files.".format(entries)) + return entries + + def list_files_in_local_bucket(bucket, search_prefix=None): local_filenames = [] path = bucket @@ -344,4 +376,4 @@ def config_by_crawl(crawl_config): else: LOGGER.debug(f"Skipping config for {file['key']} because it looks like a folder not a file") config['tables'] += entries.values() - return config \ No newline at end of file + return config