Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for files in ftp servers #30

Merged
merged 5 commits into from
Jan 19, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion tap_spreadsheets_anywhere/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import tap_spreadsheets_anywhere.format_handler
import tap_spreadsheets_anywhere.conversion as conversion
import smart_open.ssh as ssh_transport
import smart_open.ftp as ftp_transport

LOGGER = logging.getLogger(__name__)

Expand All @@ -27,6 +28,14 @@ def resolve_target_uri(table_spec, target_filename):
return table_spec['path'] + "/" + target_filename


def _hide_credentials(path):
import re
if path.startswith('sftp'):
return re.sub('sftp://.*?@', "", path, flags=re.DOTALL)
elif path.startswith('ftp'):
return re.sub('ftp://.*?@', "", path, flags=re.DOTALL)
return path


def write_file(target_filename, table_spec, schema, max_records=-1):
LOGGER.info('Syncing file "{}".'.format(target_filename))
Expand All @@ -36,7 +45,7 @@ def write_file(target_filename, table_spec, schema, max_records=-1):
iterator = tap_spreadsheets_anywhere.format_handler.get_row_iterator(table_spec, target_uri)
for row in iterator:
metadata = {
'_smart_source_bucket': table_spec['path'],
'_smart_source_bucket': _hide_credentials(table_spec['path']),
'_smart_source_file': target_filename,
# index zero, +1 for header row
'_smart_source_lineno': records_synced + 2
Expand Down Expand Up @@ -122,6 +131,8 @@ def get_matching_objects(table_spec, modified_since=None):
target_objects = list_files_in_local_bucket(bucket, table_spec.get('search_prefix'))
elif protocol in ["sftp"]:
target_objects = list_files_in_SSH_bucket(table_spec['path'],table_spec.get('search_prefix'))
elif protocol in ["ftp"]:
target_objects = list_files_in_ftp_server(table_spec['path'],table_spec.get('search_prefix'))
elif protocol in ["gs"]:
target_objects = list_files_in_gs_bucket(bucket,table_spec.get('search_prefix'))
elif protocol in ["http", "https"]:
Expand Down Expand Up @@ -210,6 +221,27 @@ def convert_URL_to_file_list(table_spec):
raise ValueError(f"Configured URL {url} could not be read.")


def list_files_in_ftp_server(uri, search_prefix=None):
parsed_uri = ftp_transport.parse_uri(uri)
uri_path = parsed_uri.pop('uri_path')
secure_conn = True if parsed_uri["scheme"] == "ftps" else False
ftp = ftp_transport._connect(parsed_uri['host'], parsed_uri['user'], parsed_uri['port'], parsed_uri['password'], secure_conn, transport_params={})
entries = []
max_results = 10000
from stat import S_ISREG
import fnmatch
for row in ftp.mlsd(uri_path):
if search_prefix is None or fnmatch.fnmatch(entry[0],search_prefix):
if row[1]['type'] == 'file':
entries.append({'Key':row[0],'LastModified':datetime.strptime(row[1]['modify'], '%Y%m%d%H%M%S').replace(tzinfo=timezone.utc)})
if len(entries) > max_results:
raise print(f"Read more than {max_results} records from the path {uri_path}. Use a more specific "
f"search_prefix")

LOGGER.info("Found {} files.".format(entries))
return entries


def list_files_in_local_bucket(bucket, search_prefix=None):
local_filenames = []
path = bucket
Expand Down