Skip to content

Commit

Permalink
Merge pull request #30 from GtheSheep/main
Browse files Browse the repository at this point in the history
Add support for files in ftp servers
  • Loading branch information
menzenski authored Jan 19, 2023
2 parents ca8e7a5 + 2febc25 commit 34f135b
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions tap_spreadsheets_anywhere/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import tap_spreadsheets_anywhere.format_handler
import tap_spreadsheets_anywhere.conversion as conversion
import smart_open.ssh as ssh_transport
import smart_open.ftp as ftp_transport

LOGGER = logging.getLogger(__name__)

Expand All @@ -27,6 +28,14 @@ def resolve_target_uri(table_spec, target_filename):
return table_spec['path'] + "/" + target_filename


def _hide_credentials(path):
import re
if path.startswith('sftp'):
return re.sub('sftp://.*?@', "********", path, flags=re.DOTALL)
elif path.startswith('ftp'):
return re.sub('ftp://.*?@', "********", path, flags=re.DOTALL)
return path


def write_file(target_filename, table_spec, schema, max_records=-1):
LOGGER.info('Syncing file "{}".'.format(target_filename))
Expand All @@ -36,7 +45,7 @@ def write_file(target_filename, table_spec, schema, max_records=-1):
iterator = tap_spreadsheets_anywhere.format_handler.get_row_iterator(table_spec, target_uri)
for row in iterator:
metadata = {
'_smart_source_bucket': table_spec['path'],
'_smart_source_bucket': _hide_credentials(table_spec['path']),
'_smart_source_file': target_filename,
# index zero, +1 for header row
'_smart_source_lineno': records_synced + 2
Expand Down Expand Up @@ -122,6 +131,8 @@ def get_matching_objects(table_spec, modified_since=None):
target_objects = list_files_in_local_bucket(bucket, table_spec.get('search_prefix'))
elif protocol in ["sftp"]:
target_objects = list_files_in_SSH_bucket(table_spec['path'],table_spec.get('search_prefix'))
elif protocol in ["ftp"]:
target_objects = list_files_in_ftp_server(table_spec['path'],table_spec.get('search_prefix'))
elif protocol in ["gs"]:
target_objects = list_files_in_gs_bucket(bucket,table_spec.get('search_prefix'))
elif protocol in ["http", "https"]:
Expand Down Expand Up @@ -210,6 +221,27 @@ def convert_URL_to_file_list(table_spec):
raise ValueError(f"Configured URL {url} could not be read.")


def list_files_in_ftp_server(uri, search_prefix=None):
parsed_uri = ftp_transport.parse_uri(uri)
uri_path = parsed_uri.pop('uri_path')
secure_conn = True if parsed_uri["scheme"] == "ftps" else False
ftp = ftp_transport._connect(parsed_uri['host'], parsed_uri['user'], parsed_uri['port'], parsed_uri['password'], secure_conn, transport_params={})
entries = []
max_results = 10000
from stat import S_ISREG
import fnmatch
for row in ftp.mlsd(uri_path):
if search_prefix is None or fnmatch.fnmatch(entry[0],search_prefix):
if row[1]['type'] == 'file':
entries.append({'Key':row[0],'LastModified':datetime.strptime(row[1]['modify'], '%Y%m%d%H%M%S').replace(tzinfo=timezone.utc)})
if len(entries) > max_results:
raise print(f"Read more than {max_results} records from the path {uri_path}. Use a more specific "
f"search_prefix")

LOGGER.info("Found {} files.".format(entries))
return entries


def list_files_in_local_bucket(bucket, search_prefix=None):
local_filenames = []
path = bucket
Expand Down Expand Up @@ -344,4 +376,4 @@ def config_by_crawl(crawl_config):
else:
LOGGER.debug(f"Skipping config for {file['key']} because it looks like a folder not a file")
config['tables'] += entries.values()
return config
return config

0 comments on commit 34f135b

Please sign in to comment.