diff --git a/apps/filebrowser/src/filebrowser/api.py b/apps/filebrowser/src/filebrowser/api.py index 65a6bb9d932..c8fc8e2c0f7 100644 --- a/apps/filebrowser/src/filebrowser/api.py +++ b/apps/filebrowser/src/filebrowser/api.py @@ -21,6 +21,7 @@ import operator import mimetypes import posixpath +from io import BytesIO as string_io from django.core.paginator import EmptyPage, Paginator from django.http import HttpResponse, HttpResponseNotModified, HttpResponseRedirect, StreamingHttpResponse @@ -46,6 +47,7 @@ FILE_DOWNLOAD_CACHE_CONTROL, MAX_FILE_SIZE_UPLOAD_LIMIT, REDIRECT_DOWNLOAD, + RESTRICT_FILE_EXTENSIONS, SHOW_DOWNLOAD_BUTTON, ) from filebrowser.lib import xxd @@ -394,81 +396,42 @@ def upload_complete(request): @api_error_handler def upload_file(request): - """ - A wrapper around the actual upload view function to clean up the temporary file afterwards if it fails. - - Returns JSON. - """ - pass - # response = {} - - # try: - # response = _upload_file(request) - # except Exception as e: - # LOG.exception('Upload operation failed.') - - # file = request.FILES.get('file') - # if file and hasattr(file, 'remove'): # TODO: Call from proxyFS -- Check feasibility of this old comment - # file.remove() - - # return HttpResponse(str(e).split('\n', 1)[0], status=500) # TODO: Check error message and status code - - # return JsonResponse(response) + # Read request body first to prevent RawPostDataException later on which occurs when trying to access body after it has already been read + body_data_bytes = string_io(request.body) - -def _upload_file(request): - """ - Handles file uploaded by HDFSfileUploadHandler. - - The uploaded file is stored in HDFS at its destination with a .tmp suffix. - We just need to rename it to the destination path. - """ uploaded_file = request.FILES['file'] - dest_path = request.GET.get('dest') - response = {} - - if MAX_FILE_SIZE_UPLOAD_LIMIT.get() >= 0 and uploaded_file.size > MAX_FILE_SIZE_UPLOAD_LIMIT.get(): - return HttpResponse(f'File exceeds maximum allowed size of {MAX_FILE_SIZE_UPLOAD_LIMIT.get()} bytes.', status=500) + dest_path = request.POST.get('destination_path') - # Use form for now to triger the upload handler process by Django. - # Might be a better solution now to try directly using handler in request.fs.upload() for all FS. - # form = UploadAPIFileForm(request.POST, request.FILES) + # Check if the file type is restricted + _, file_type = os.path.splitext(uploaded_file.name) + if RESTRICT_FILE_EXTENSIONS.get() and file_type.lower() in [ext.lower() for ext in RESTRICT_FILE_EXTENSIONS.get()]: + return HttpResponse(f'File type "{file_type}" is not allowed. Please choose a file with a different extetypension.', status=400) - if request.META.get('upload_failed'): - raise Exception(request.META.get('upload_failed')) # TODO: Check error message and status code + # Check if the file size exceeds the maximum allowed size + max_size = MAX_FILE_SIZE_UPLOAD_LIMIT.get() + if max_size >= 0 and uploaded_file.size >= max_size: + return HttpResponse(f'File exceeds maximum allowed size of {max_size} bytes. Please upload a smaller file.', status=413) - # if not form.is_valid(): - # raise Exception(f"Error in upload form: {form.errors}") + # Check if the destination path is a directory and the file name contains a path separator + # This prevents directory traversal attacks + if request.fs.isdir(dest_path) and posixpath.sep in uploaded_file.name: + return HttpResponse(f'Invalid filename. Path separators are not allowed.', status=400) + # Check if the file already exists at the destination path filepath = request.fs.join(dest_path, uploaded_file.name) - - if request.fs.isdir(dest_path) and posixpath.sep in uploaded_file.name: - raise Exception(f'Upload failed: {posixpath.sep} is not allowed in the filename {uploaded_file.name}.') # TODO: status code + if request.fs.exists(filepath): + return HttpResponse(f'The file path {filepath} already exists.', status=409) try: - request.fs.upload(file=uploaded_file, path=dest_path, username=request.user.username) - except IOError as ex: - already_exists = False - try: - already_exists = request.fs.exists(dest_path) - except Exception: - pass - - if already_exists: - messsage = f'Upload failed: Destination {filepath} already exists.' - else: - messsage = f'Upload error: Copy to {filepath} failed: {str(ex)}' - raise Exception(messsage) # TODO: Check error messages above and status code + request.fs.upload_v1(request.META, input_data=body_data_bytes, destination=dest_path, username=request.user.username) + except Exception as ex: + return HttpResponse(f'Upload to {filepath} failed: {str(ex)}', status=500) - # TODO: Check response fields below - response.update( - { - 'path': filepath, - 'result': _massage_stats(request, stat_absolute_path(filepath, request.fs.stats(filepath))), - } - ) + response = { + 'uploaded_file_stats': _massage_stats(request, stat_absolute_path(filepath, request.fs.stats(filepath))), + } - return response + return JsonResponse(response) @api_error_handler @@ -766,7 +729,6 @@ def bulk_op(request, op): error_dict = {} for p in path_list: - tmp_dict = bulk_dict if op in (copy, move): tmp_dict['source_path'] = p @@ -795,10 +757,12 @@ def _massage_stats(request, stats): stats_dict = stats.to_json_dict() normalized_path = request.fs.normpath(stats_dict.get('path')) - stats_dict.update({ - 'path': normalized_path, - 'type': filetype(stats.mode), - 'rwx': rwx(stats.mode, stats.aclBit), - }) + stats_dict.update( + { + 'path': normalized_path, + 'type': filetype(stats.mode), + 'rwx': rwx(stats.mode, stats.aclBit), + } + ) return stats_dict diff --git a/apps/filebrowser/src/filebrowser/conf.py b/apps/filebrowser/src/filebrowser/conf.py index 80d3895dbf7..45ec47f807c 100644 --- a/apps/filebrowser/src/filebrowser/conf.py +++ b/apps/filebrowser/src/filebrowser/conf.py @@ -18,32 +18,32 @@ from django.utils.translation import gettext_lazy as _ from desktop.conf import ENABLE_DOWNLOAD, is_oozie_enabled -from desktop.lib.conf import Config, coerce_bool +from desktop.lib.conf import Config, coerce_bool, coerce_csv MAX_SNAPPY_DECOMPRESSION_SIZE = Config( - key="max_snappy_decompression_size", - help=_("Max snappy decompression size in bytes."), - private=True, - default=1024 * 1024 * 25, - type=int) + key="max_snappy_decompression_size", help=_("Max snappy decompression size in bytes."), private=True, default=1024 * 1024 * 25, type=int +) ARCHIVE_UPLOAD_TEMPDIR = Config( key="archive_upload_tempdir", help=_("Location on local filesystem where the uploaded archives are temporary stored."), default="/tmp/hue_uploads", - type=str) + type=str, +) FILE_UPLOAD_CHUNK_SIZE = Config( key="file_upload_chunk_size", default=5242880, type=int, - help=_('Configure chunk size of the chunked file uploader. Default chunk size is set to 5MB.')) + help=_('Configure chunk size of the chunked file uploader. Default chunk size is set to 5MB.'), +) CONCURRENT_MAX_CONNECTIONS = Config( key="concurrent_max_connections", default=5, type=int, - help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.')) + help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.'), +) def get_desktop_enable_download(): @@ -55,42 +55,44 @@ def get_desktop_enable_download(): key="show_download_button", help=_("whether to show the download button in hdfs file browser."), type=coerce_bool, - dynamic_default=get_desktop_enable_download) + dynamic_default=get_desktop_enable_download, +) SHOW_UPLOAD_BUTTON = Config( - key="show_upload_button", - help=_("whether to show the upload button in hdfs file browser."), - type=coerce_bool, - default=True) - + key="show_upload_button", help=_("whether to show the upload button in hdfs file browser."), type=coerce_bool, default=True +) ENABLE_EXTRACT_UPLOADED_ARCHIVE = Config( key="enable_extract_uploaded_archive", help=_("Flag to enable the extraction of a uploaded archive in HDFS."), type=coerce_bool, - dynamic_default=is_oozie_enabled + dynamic_default=is_oozie_enabled, ) REDIRECT_DOWNLOAD = Config( key="redirect_download", - help=_("Redirect client to WebHdfs or S3 for file download. Note: Turning this on will " - "override notebook/redirect_whitelist for user selected file downloads on WebHdfs & S3."), + help=_( + "Redirect client to WebHdfs or S3 for file download. Note: Turning this on will " + "override notebook/redirect_whitelist for user selected file downloads on WebHdfs & S3." + ), type=coerce_bool, - default=False) + default=False, +) # DEPRECATED in favor of DEFAULT_HOME_PATH per FS config level. REMOTE_STORAGE_HOME = Config( key="remote_storage_home", type=str, default=None, - help="Optionally set this if you want a different home directory path. e.g. s3a://gethue.") + help="Optionally set this if you want a different home directory path. e.g. s3a://gethue.", +) MAX_FILE_SIZE_UPLOAD_LIMIT = Config( key="max_file_size_upload_limit", default=-1, type=int, - help=_('A limit on a file size (bytes) that can be uploaded to a filesystem. ' - 'A value of -1 means there will be no limit.')) + help=_('A limit on a file size (bytes) that can be uploaded to a filesystem. ' 'A value of -1 means there will be no limit.'), +) def max_file_size_upload_limit(): @@ -98,7 +100,14 @@ def max_file_size_upload_limit(): FILE_DOWNLOAD_CACHE_CONTROL = Config( - key="file_download_cache_control", - type=str, - default=None, - help="Optionally set this to control the caching strategy for files download") + key="file_download_cache_control", type=str, default=None, help="Optionally set this to control the caching strategy for files download" +) + +RESTRICT_FILE_EXTENSIONS = Config( + key='restrict_file_extensions', + default='', + type=coerce_csv, + help=_( + 'Specify file extensions that are not allowed, separated by commas. For example: .exe, .zip, .rar, .tar, .gz' + ), +) diff --git a/desktop/conf.dist/hue.ini b/desktop/conf.dist/hue.ini index 1f94ed02be9..3929db69ae7 100644 --- a/desktop/conf.dist/hue.ini +++ b/desktop/conf.dist/hue.ini @@ -1736,6 +1736,9 @@ submit_to=True # A value of -1 means there will be no limit. ## max_file_size_upload_limit=-1 +# Specify file extensions that are not allowed, separated by commas. +## restrict_file_extensions=.exe, .zip, .rar, .tar, .gz + ########################################################################### # Settings to configure Pig ########################################################################### diff --git a/desktop/conf/pseudo-distributed.ini.tmpl b/desktop/conf/pseudo-distributed.ini.tmpl index 0f4ed6d04b9..c9ceffdb672 100644 --- a/desktop/conf/pseudo-distributed.ini.tmpl +++ b/desktop/conf/pseudo-distributed.ini.tmpl @@ -1719,6 +1719,9 @@ # A value of -1 means there will be no limit. ## max_file_size_upload_limit=-1 + # Specify file extensions that are not allowed, separated by commas. + ## restrict_file_extensions=.exe, .zip, .rar, .tar, .gz + ########################################################################### # Settings to configure Pig diff --git a/desktop/core/src/desktop/api_public.py b/desktop/core/src/desktop/api_public.py index 6747397c4af..7a43073344a 100644 --- a/desktop/core/src/desktop/api_public.py +++ b/desktop/core/src/desktop/api_public.py @@ -254,7 +254,7 @@ def storage_save_file(request): @api_view(["POST"]) def storage_upload_file(request): django_request = get_django_request(request) - return filebrowser_views.upload_file(django_request) # TODO: Fix new api method and switch here + return filebrowser_api.upload_file(django_request) @api_view(["POST"]) diff --git a/desktop/core/src/desktop/lib/fs/gc/gs.py b/desktop/core/src/desktop/lib/fs/gc/gs.py index da66fe1546f..06328241732 100644 --- a/desktop/core/src/desktop/lib/fs/gc/gs.py +++ b/desktop/core/src/desktop/lib/fs/gc/gs.py @@ -24,6 +24,7 @@ from boto.gs.connection import Location from boto.gs.key import Key from boto.s3.prefix import Prefix +from django.http.multipartparser import MultiPartParser from django.utils.translation import gettext as _ from aws.s3.s3fs import S3FileSystem @@ -477,3 +478,13 @@ def _check_key_parent_path(self, src, dst): return True else: return False + + @translate_gs_error + @auth_error_handler + def upload_v1(self, META, input_data, destination, username): + from desktop.lib.fs.gc.upload import GSNewFileUploadHandler # Circular dependency + + gs_upload_handler = GSNewFileUploadHandler(destination, username) + + parser = MultiPartParser(META, input_data, [gs_upload_handler]) + return parser.parse() diff --git a/desktop/core/src/desktop/lib/fs/gc/upload.py b/desktop/core/src/desktop/lib/fs/gc/upload.py index 288e34484ab..94271dcc0c3 100644 --- a/desktop/core/src/desktop/lib/fs/gc/upload.py +++ b/desktop/core/src/desktop/lib/fs/gc/upload.py @@ -21,26 +21,29 @@ See http://docs.djangoproject.com/en/1.9/topics/http/file-uploads/ """ -from io import BytesIO as stream_io import logging +from io import BytesIO as stream_io from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadhandler import FileUploadHandler, StopFutureHandlers, StopUpload, UploadFileException -from desktop.lib.fsmanager import get_client from desktop.lib.fs.gc import parse_uri from desktop.lib.fs.gc.gs import GSFileSystemException - +from desktop.lib.fsmanager import get_client LOG = logging.getLogger() DEFAULT_WRITE_SIZE = 1024 * 1024 * 50 # TODO: set in configuration (currently 50 MiB) + class GSFileUploadError(UploadFileException): pass + +# Deprecated and core logic to be replaced with GSNewFileUploadHandler class GSFileUploadHandler(FileUploadHandler): - """This handler is triggered by any upload field whose destination path starts with "GS" (case insensitive). + """ + This handler is triggered by any upload field whose destination path starts with "GS" (case insensitive). Streams data chunks directly to Google Cloud Storage (GS). """ @@ -63,7 +66,6 @@ def __init__(self, request): self._fs._stats(self.destination) self._bucket = self._fs._get_bucket(self.bucket_name) - def new_file(self, field_name, file_name, *args, **kwargs): """Handle the start of a new file upload. @@ -90,7 +92,6 @@ def new_file(self, field_name, file_name, *args, **kwargs): self.request.META['upload_failed'] = e raise StopUpload() - def receive_data_chunk(self, raw_data, start): """Receive and process a data chunk from the uploaded file. @@ -110,7 +111,6 @@ def receive_data_chunk(self, raw_data, start): else: return raw_data - def file_complete(self, file_size): """Finalize the file upload process. @@ -124,7 +124,6 @@ def file_complete(self, file_size): else: return None - def _is_gs_upload(self): """Check if the upload destination is Google Cloud Storage (GS). @@ -133,7 +132,6 @@ def _is_gs_upload(self): """ return self._get_scheme() and self._get_scheme().startswith('gs') - def _check_access(self): """Check if the user has write access to the GS destination path. @@ -143,7 +141,6 @@ def _check_access(self): if not self._fs.check_access(self.destination, permission='WRITE'): raise GSFileSystemException('Insufficient permissions to write to GS path "%s".' % self.destination) - def _get_scheme(self): """Get the scheme (protocol) of the destination. @@ -159,7 +156,6 @@ def _get_scheme(self): else: return None - def _get_file_part(self, raw_data): """Create a file-like object from raw data. @@ -173,3 +169,52 @@ def _get_file_part(self, raw_data): fp.write(raw_data) fp.seek(0) return fp + + +class GSNewFileUploadHandler(GSFileUploadHandler): + """This handler uploads the file to Google Storage if the destination path starts with "GS" (case insensitive). + Streams data chunks directly to Google Cloud Storage (GS). + """ + + def __init__(self, dest_path, username): + self.chunk_size = DEFAULT_WRITE_SIZE + self.destination = dest_path + self.username = username + self.target_path = None + self.file = None + self._mp = None + self._part_num = 1 + + # TODO: _is_gs_upload really required? + if self._is_gs_upload(): + self._fs = get_client(fs='gs', user=self.username) + self.bucket_name, self.key_name = parse_uri(self.destination)[:2] + + # Verify that the path exists + self._fs._stats(self.destination) + self._bucket = self._fs._get_bucket(self.bucket_name) + + def new_file(self, field_name, file_name, *args, **kwargs): + """Handle the start of a new file upload. + + This method is called when a new file is encountered during the upload process. + """ + if self._is_gs_upload(): + super().new_file(field_name, file_name, *args, **kwargs) + + LOG.info('Using GSFileUploadHandler to handle file upload.') + self.target_path = self._fs.join(self.key_name, file_name) + + try: + # Check access permissions before attempting upload + self._check_access() + + # Create a multipart upload request + LOG.debug("Initiating GS multipart upload to target path: %s" % self.target_path) + self._mp = self._bucket.initiate_multipart_upload(self.target_path) + self.file = SimpleUploadedFile(name=file_name, content='') + + raise StopFutureHandlers() + except (GSFileUploadError, GSFileSystemException) as e: + LOG.error("Encountered error in GSUploadHandler check_access: %s" % e) + raise StopUpload() diff --git a/desktop/core/src/desktop/lib/fs/ozone/ofs.py b/desktop/core/src/desktop/lib/fs/ozone/ofs.py index abfd19d4baa..09d37ae621d 100644 --- a/desktop/core/src/desktop/lib/fs/ozone/ofs.py +++ b/desktop/core/src/desktop/lib/fs/ozone/ofs.py @@ -18,12 +18,14 @@ """ Interfaces for Hadoop filesystem access via HttpFs/WebHDFS """ + import stat import errno import logging import posixpath from urllib.parse import urlparse as lib_urlparse +from django.http.multipartparser import MultiPartParser from django.utils.encoding import smart_str from django.utils.translation import gettext as _ @@ -45,6 +47,7 @@ class OzoneFS(WebHdfs): """ OzoneFS implements the filesystem interface via the WebHDFS/HttpFS REST protocol. """ + def __init__(self, url, fs_defaultfs, logical_name=None, security_enabled=False, ssl_cert_ca_verify=True, temp_dir="/tmp", umask=0o1022): super(OzoneFS, self).__init__( url, @@ -53,7 +56,7 @@ def __init__(self, url, fs_defaultfs, logical_name=None, security_enabled=False, security_enabled=security_enabled, ssl_cert_ca_verify=ssl_cert_ca_verify, temp_dir=temp_dir, - umask=umask + umask=umask, ) split = lib_urlparse(fs_defaultfs) @@ -69,13 +72,13 @@ def __init__(self, url, fs_defaultfs, logical_name=None, security_enabled=False, @classmethod def from_config(cls, ofs_config): return cls( - url=ofs_config.WEBHDFS_URL.get(), - fs_defaultfs=ofs_config.FS_DEFAULTFS.get(), - logical_name=ofs_config.LOGICAL_NAME.get(), - security_enabled=ofs_config.SECURITY_ENABLED.get(), - ssl_cert_ca_verify=ofs_config.SSL_CERT_CA_VERIFY.get(), - temp_dir=ofs_config.TEMP_DIR.get(), - umask=get_umask_mode(), + url=ofs_config.WEBHDFS_URL.get(), + fs_defaultfs=ofs_config.FS_DEFAULTFS.get(), + logical_name=ofs_config.LOGICAL_NAME.get(), + security_enabled=ofs_config.SECURITY_ENABLED.get(), + ssl_cert_ca_verify=ofs_config.SSL_CERT_CA_VERIFY.get(), + temp_dir=ofs_config.TEMP_DIR.get(), + umask=get_umask_mode(), ) def strip_normpath(self, path): @@ -145,12 +148,22 @@ def _stats(self, path): def _handle_serviceid_path_status(self): json = { 'FileStatuses': { - 'FileStatus': [{ - 'pathSuffix': self._netloc, 'type': 'DIRECTORY', 'length': 0, 'owner': '', 'group': '', - 'permission': '777', 'accessTime': 0, 'modificationTime': 0, 'blockSize': 0, 'replication': 0 - }] - } + 'FileStatus': [ + { + 'pathSuffix': self._netloc, + 'type': 'DIRECTORY', + 'length': 0, + 'owner': '', + 'group': '', + 'permission': '777', + 'accessTime': 0, + 'modificationTime': 0, + 'blockSize': 0, + 'replication': 0, + } + ] } + } return json def stats(self, path): @@ -165,12 +178,21 @@ def stats(self, path): def filebrowser_action(self): return self._filebrowser_action + # Deprecated def upload(self, file, path, *args, **kwargs): """ Upload is done by the OFSFileUploadHandler """ pass + def upload_v1(self, META, input_data, destination, username): + from desktop.lib.fs.ozone.upload import OFSNewFileUploadHandler # Circular dependency + + ofs_upload_handler = OFSNewFileUploadHandler(destination, username) + + parser = MultiPartParser(META, input_data, [ofs_upload_handler]) + return parser.parse() + def rename(self, old, new): """rename(old, new)""" old = self.strip_normpath(old) diff --git a/desktop/core/src/desktop/lib/fs/ozone/upload.py b/desktop/core/src/desktop/lib/fs/ozone/upload.py index f6126ba13bf..1d56ca599b9 100644 --- a/desktop/core/src/desktop/lib/fs/ozone/upload.py +++ b/desktop/core/src/desktop/lib/fs/ozone/upload.py @@ -66,8 +66,10 @@ def check_access(self): self.filepath = self.target_path if self.totalfilesize != calculate_total_size(self.qquuid, self.qqtotalparts): - raise PopupException(_('OFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') % - {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize}) + raise PopupException( + _('OFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') + % {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize} + ) def upload_chunks(self): LOG.debug("OFSFineUploaderChunkedUpload: upload_chunks") @@ -89,15 +91,17 @@ def upload_chunks(self): if self.totalfilesize != calculate_total_size(self.qquuid, self.qqtotalparts): raise PopupException( - _('OFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') % - {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize}) + _('OFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') + % {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize} + ) try: LOG.debug("OFSFineUploaderChunkedUpload: uploading file part with size: %s" % self._part_size) fp = io.BytesIO() for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=self.chunk_size), 1): - logging.debug("OFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s" % - (self.file_name, i, total, self.destination)) + logging.debug( + "OFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s" % (self.file_name, i, total, self.destination) + ) fp.write(chunk.getvalue()) fp.seek(0) self._fs.create(self.target_path, data=fp.getvalue()) @@ -168,12 +172,14 @@ class OFSFileUploadError(UploadFileException): pass +# Deprecated and core logic to be replaced with OFSNewFileUploadHandler class OFSFileUploadHandler(FileUploadHandler): """ This handler is triggered by any upload field whose destination path starts with "OFS" (case insensitive). Streams data chunk directly to OFS. """ + def __init__(self, request): super(OFSFileUploadHandler, self).__init__(request) self.chunk_size = UPLOAD_CHUNK_SIZE.get() @@ -253,3 +259,54 @@ def _get_scheme(self): raise OFSFileUploadError('Destination does not start with a valid scheme.') else: return None + + +class OFSNewFileUploadHandler(OFSFileUploadHandler): + """ + This handler uploads the file to Apache Ozone if the destination path starts with "OFS" (case insensitive). + Streams data chunks directly to OFS. + """ + + def __init__(self, dest_path, username): + self.chunk_size = UPLOAD_CHUNK_SIZE.get() + self.destination = dest_path + self.username = username + self.target_path = None + self.file = None + self._part_size = UPLOAD_CHUNK_SIZE.get() + + # TODO: _is_ofs_upload really required? + if self._is_ofs_upload(): + self._fs = self._get_ofs(self.username) + + # Verify that the path exists + try: + self._fs.stats(self.destination) + except Exception as e: + raise OFSFileUploadError(_('Destination path does not exist: %s' % self.destination)) + + LOG.debug("Chunk size = %d" % UPLOAD_CHUNK_SIZE.get()) + + def new_file(self, field_name, file_name, *args, **kwargs): + if self._is_ofs_upload(): + super(OFSFileUploadHandler, self).new_file(field_name, file_name, *args, **kwargs) + + LOG.info('Using OFSFileUploadHandler to handle file upload.') + self.target_path = self._fs.join(self.destination, file_name) + + try: + # Check access permissions before attempting upload + # self._check_access() # Not implemented + LOG.debug("Initiating OFS upload to target path: %s" % self.target_path) + self.file = SimpleUploadedFile(name=file_name, content='') + raise StopFutureHandlers() + except (OFSFileUploadError, WebHdfsException) as e: + LOG.error("Encountered error in OFSUploadHandler check_access: %s" % e) + raise StopUpload() + + def _get_ofs(self, username): + fs = get_client(fs='ofs', user=username) + if not fs: + raise OFSFileUploadError(_("No OFS filesystem found.")) + + return fs diff --git a/desktop/core/src/desktop/lib/fs/proxyfs.py b/desktop/core/src/desktop/lib/fs/proxyfs.py index 467617d412d..a4765cd363a 100644 --- a/desktop/core/src/desktop/lib/fs/proxyfs.py +++ b/desktop/core/src/desktop/lib/fs/proxyfs.py @@ -297,9 +297,13 @@ def rename_star(self, old_dir, new_dir): def _rename_star_between_filesystems(self, old, new): raise NotImplementedError("Will be addressed in HUE-2934") + # Deprecated def upload(self, file, path, *args, **kwargs): self._get_fs(path).upload(file, path, *args, **kwargs) + def upload_v1(self, META, input_data, destination, username): + self._get_fs(destination).upload_v1(META, input_data, destination, username) + def check_access(self, path, *args, **kwargs): self._get_fs(path).check_access(path, *args, **kwargs) diff --git a/desktop/core/src/desktop/settings.py b/desktop/core/src/desktop/settings.py index a3d153b9576..22a8ad0e76e 100644 --- a/desktop/core/src/desktop/settings.py +++ b/desktop/core/src/desktop/settings.py @@ -112,7 +112,7 @@ # Examples: "http://media.lawrence.com/media/", "http://example.com/media/" MEDIA_URL = '' -DATA_UPLOAD_MAX_MEMORY_SIZE = 5242880 # Setting this variable to 5MB as sometime request size > 2.5MB (default value) +DATA_UPLOAD_MAX_MEMORY_SIZE = None ############################################################ # Part 3: Django configuration @@ -645,14 +645,18 @@ def is_oidc_configured(): # This section must go after the desktop lib modules are loaded ################################################################ +# Import after configs are set +from desktop.conf import ENABLE_NEW_STORAGE_BROWSER # noqa: E402 + # Insert our custom upload handlers +file_upload_handlers = [] if is_chunked_fileuploader_enabled(): file_upload_handlers = [ 'hadoop.fs.upload.FineUploaderChunkedUploadHandler', 'django.core.files.uploadhandler.MemoryFileUploadHandler', 'django.core.files.uploadhandler.TemporaryFileUploadHandler', ] -else: +elif not ENABLE_NEW_STORAGE_BROWSER.get(): file_upload_handlers = [ 'hadoop.fs.upload.HDFSfileUploadHandler', 'django.core.files.uploadhandler.MemoryFileUploadHandler', @@ -671,7 +675,8 @@ def is_oidc_configured(): if is_ofs_enabled(): file_upload_handlers.insert(0, 'desktop.lib.fs.ozone.upload.OFSFileUploadHandler') -FILE_UPLOAD_HANDLERS = tuple(file_upload_handlers) +if file_upload_handlers: + FILE_UPLOAD_HANDLERS = tuple(file_upload_handlers) ############################################################ diff --git a/desktop/libs/aws/src/aws/s3/s3fs.py b/desktop/libs/aws/src/aws/s3/s3fs.py index 32c550f09cb..99d16d7682c 100644 --- a/desktop/libs/aws/src/aws/s3/s3fs.py +++ b/desktop/libs/aws/src/aws/s3/s3fs.py @@ -571,12 +571,21 @@ def _copy_file(src, dst): remote_file = remote_dst _copy_file(local_src, remote_file) + # Deprecated @translate_s3_error def upload(self, file, path, *args, **kwargs): - # parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding) - # return parser.parse() pass # upload is handled by S3FileUploadHandler + @translate_s3_error + @auth_error_handler + def upload_v1(self, META, input_data, destination, username): + from aws.s3.upload import S3NewFileUploadHandler # Circular dependency + + s3_upload_handler = S3NewFileUploadHandler(destination, username) + + parser = MultiPartParser(META, input_data, [s3_upload_handler]) + return parser.parse() + @translate_s3_error @auth_error_handler def append(self, path, data): diff --git a/desktop/libs/aws/src/aws/s3/upload.py b/desktop/libs/aws/src/aws/s3/upload.py index e3df72d38a7..861e4a7b78c 100644 --- a/desktop/libs/aws/src/aws/s3/upload.py +++ b/desktop/libs/aws/src/aws/s3/upload.py @@ -130,6 +130,7 @@ class S3FileUploadError(UploadFileException): pass +# Deprecated and core logic to be replaced with S3NewFileUploadHandler class S3FileUploadHandler(FileUploadHandler): """ This handler is triggered by any upload field whose destination path starts with "S3" (case insensitive). @@ -198,15 +199,6 @@ def file_complete(self, file_size): else: return None - def _get_s3fs(self, request): - # Pre 6.0 request.fs did not exist, now it does. The logic for assigning request.fs is not correct for FileUploadHandler. - fs = get_client(user=request.user.username) - - if not fs: - raise S3FileUploadError(_("No S3 filesystem found.")) - - return fs - def _is_s3_upload(self): return self._get_scheme() and self._get_scheme().startswith('S3') @@ -229,3 +221,48 @@ def _get_file_part(self, raw_data): fp.write(raw_data) fp.seek(0) return fp + + +class S3NewFileUploadHandler(S3FileUploadHandler): + """ + This handler uploads the file to AWS S3 if the destination path starts with "S3" (case insensitive). + Streams data chunks directly to S3. + """ + def __init__(self, dest_path, username): + self.chunk_size = DEFAULT_WRITE_SIZE + self.destination = dest_path + self.username = username + self.target_path = None + self.file = None + self._mp = None + self._part_num = 1 + + # TODO: _is_s3_upload really required? + if self._is_s3_upload(): + self._fs = get_client(fs='s3a', user=self.username) + self.bucket_name, self.key_name = parse_uri(self.destination)[:2] + + # Verify that the path exists + self._fs._stats(self.destination) + self._bucket = self._fs._get_bucket(self.bucket_name) + + def new_file(self, field_name, file_name, *args, **kwargs): + if self._is_s3_upload(): + super(S3FileUploadHandler, self).new_file(field_name, file_name, *args, **kwargs) + + LOG.info('Using S3FileUploadHandler to handle file upload.') + self.target_path = self._fs.join(self.key_name, file_name) + + try: + # Check access permissions before attempting upload + self._check_access() + + # Create a multipart upload request + LOG.debug("Initiating S3 multipart upload to target path: %s" % self.target_path) + self._mp = self._bucket.initiate_multipart_upload(self.target_path) + self.file = SimpleUploadedFile(name=file_name, content='') + + raise StopFutureHandlers() + except (S3FileUploadError, S3FileSystemException) as e: + LOG.error("Encountered error in S3UploadHandler check_access: %s" % e) + raise StopUpload() diff --git a/desktop/libs/azure/src/azure/abfs/abfs.py b/desktop/libs/azure/src/azure/abfs/abfs.py index 6a22f640a67..45de7143099 100644 --- a/desktop/libs/azure/src/azure/abfs/abfs.py +++ b/desktop/libs/azure/src/azure/abfs/abfs.py @@ -29,6 +29,8 @@ from posixpath import join from urllib.parse import quote as urllib_quote, urlparse as lib_urlparse +from django.http.multipartparser import MultiPartParser + import azure.abfs.__init__ as Init_ABFS from azure.abfs.abfsfile import ABFSFile from azure.abfs.abfsstats import ABFSStat @@ -46,29 +48,27 @@ class ABFSFileSystemException(IOError): - def __init__(self, *args, **kwargs): super(ABFSFileSystemException, self).__init__(*args, **kwargs) class ABFS(object): - def __init__( - self, - url, - fs_defaultfs, - logical_name=None, - hdfs_superuser=None, - security_enabled=False, - ssl_cert_ca_verify=True, - temp_dir="/tmp", - umask=0o1022, - hdfs_supergroup=None, - access_token=None, - token_type=None, - expiration=None, - username=None - ): + self, + url, + fs_defaultfs, + logical_name=None, + hdfs_superuser=None, + security_enabled=False, + ssl_cert_ca_verify=True, + temp_dir="/tmp", + umask=0o1022, + hdfs_supergroup=None, + access_token=None, + token_type=None, + expiration=None, + username=None, + ): self._url = url self._superuser = hdfs_superuser self._security_enabled = security_enabled @@ -100,18 +100,18 @@ def __init__( def from_config(cls, hdfs_config, auth_provider): credentials = auth_provider.get_credentials() return cls( - url=hdfs_config.WEBHDFS_URL.get(), - fs_defaultfs=hdfs_config.FS_DEFAULTFS.get(), - logical_name=None, - security_enabled=False, - ssl_cert_ca_verify=False, - temp_dir=None, - umask=get_umask_mode(), - hdfs_supergroup=None, - access_token=credentials.get('access_token'), - token_type=credentials.get('token_type'), - expiration=int(credentials.get('expires_on')) * 1000 if credentials.get('expires_on') is not None else None, - username=credentials.get('username') + url=hdfs_config.WEBHDFS_URL.get(), + fs_defaultfs=hdfs_config.FS_DEFAULTFS.get(), + logical_name=None, + security_enabled=False, + ssl_cert_ca_verify=False, + temp_dir=None, + umask=get_umask_mode(), + hdfs_supergroup=None, + access_token=credentials.get('access_token'), + token_type=credentials.get('token_type'), + expiration=int(credentials.get('expires_on')) * 1000 if credentials.get('expires_on') is not None else None, + username=credentials.get('username'), ) def get_client(self, url): @@ -600,12 +600,21 @@ def rename_star(self, old_dir, new_dir): """ self.rename(old_dir, new_dir) + # Deprecated def upload(self, file, path, *args, **kwargs): """ Upload is done by the client """ pass + def upload_v1(self, META, input_data, destination, username): + from azure.abfs.upload import ABFSNewFileUploadHandler # Circular dependency + + abfs_upload_handler = ABFSNewFileUploadHandler(destination, username) + + parser = MultiPartParser(META, input_data, [abfs_upload_handler]) + return parser.parse() + def copyFromLocal(self, local_src, remote_dst, *args, **kwargs): """ Copy a directory or file from Local (Testing) @@ -711,7 +720,7 @@ def _writedata(self, path, data, size): length = chunk_size else: length = chunk - self._append(path, data[i * chunk_size:i * chunk_size + length], length) + self._append(path, data[i * chunk_size : i * chunk_size + length], length) self.flush(path, {'position': int(size)}) # Use Patch HTTP request diff --git a/desktop/libs/azure/src/azure/abfs/upload.py b/desktop/libs/azure/src/azure/abfs/upload.py index f5c76390f5e..62da4eebdc3 100644 --- a/desktop/libs/azure/src/azure/abfs/upload.py +++ b/desktop/libs/azure/src/azure/abfs/upload.py @@ -16,7 +16,7 @@ import logging import unicodedata -from io import StringIO as string_io +from io import BytesIO from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadhandler import FileUploadHandler, SkipFile, StopFutureHandlers, StopUpload, UploadFileException @@ -75,8 +75,10 @@ def check_access(self): raise PopupException("ABFSFineUploaderChunkedUpload: Initiating ABFS upload to target path: %s failed %s" % (self.target_path, e)) if self.totalfilesize != calculate_total_size(self.qquuid, self.qqtotalparts): - raise PopupException(_('ABFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') % - {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize}) + raise PopupException( + _('ABFSFineUploaderChunkedUpload: Sorry, the file size is not correct. %(name)s %(qquuid)s %(size)s') + % {'name': self.file_name, 'qquuid': self.qquuid, 'size': self.totalfilesize} + ) def upload_chunks(self): if TASK_SERVER_V2.ENABLED.get(): @@ -93,8 +95,10 @@ def upload_chunks(self): current_position = 0 # keeps track of position and uploaded_size for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=DEFAULT_WRITE_SIZE), 1): chunk_size = len(chunk.getvalue()) - LOG.debug("ABFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s, current_position: %d" % - (self.file_name, i, chunk_size, self.destination, current_position)) + LOG.debug( + "ABFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s, current_position: %d" + % (self.file_name, i, chunk_size, self.destination, current_position) + ) params = {'position': current_position} self._fs._append(self.target_path, chunk, size=chunk_size, offset=0, params=params) current_position += chunk_size @@ -140,12 +144,14 @@ class ABFSFileUploadError(UploadFileException): pass +# Deprecated and core logic to be replaced with ABFSNewFileUploadHandler class ABFSFileUploadHandler(FileUploadHandler): """ This handler is triggered by any upload field whose destination path starts with "ABFS" (case insensitive). Streams data chunks directly to ABFS """ + def __init__(self, request): super(ABFSFileUploadHandler, self).__init__(request) self.chunk_size = DEFAULT_WRITE_SIZE @@ -227,3 +233,54 @@ def _get_scheme(self): raise ABFSFileSystemException('Destination does not start with a valid scheme.') else: return None + + +class ABFSNewFileUploadHandler(ABFSFileUploadHandler): + """ + This handler uploads the file to ABFS if the destination path starts with "ABFS" (case insensitive). + Streams data chunks directly to ABFS. + """ + + def __init__(self, dest_path, username): + self.chunk_size = DEFAULT_WRITE_SIZE + self.target_path = None + self.file = None + self._part_size = DEFAULT_WRITE_SIZE + + self.destination = dest_path + self.username = username + + # TODO: _is_abfs_upload really required? + if self._is_abfs_upload(): + self._fs = self._get_abfs(self.username) + self.filesystem, self.directory = parse_uri(self.destination)[:2] + + # Verify that the path exists + self._fs.stats(self.destination) + + LOG.debug("Chunk size = %d" % DEFAULT_WRITE_SIZE) + + def new_file(self, field_name, file_name, *args, **kwargs): + if self._is_abfs_upload(): + super(ABFSFileUploadHandler, self).new_file(field_name, file_name, *args, **kwargs) + + LOG.info('Using ABFSFileUploadHandler to handle file upload wit temp file%s.' % file_name) + self.target_path = self._fs.join(self.destination, file_name) + + try: + # Check access permissions before attempting upload + # self._check_access() #implement later + LOG.debug("Initiating ABFS upload to target path: %s" % self.target_path) + self._fs.create(self.target_path) + self.file = SimpleUploadedFile(name=file_name, content='') + raise StopFutureHandlers() + except (ABFSFileUploadError, ABFSFileSystemException) as e: + LOG.error("Encountered error in ABFSUploadHandler check_access: %s" % e) + raise StopUpload() + + def _get_abfs(self, username): + fs = get_client(fs='abfs', user=username) + if not fs: + raise ABFSFileUploadError(_("No ABFS filesystem found")) + + return fs diff --git a/desktop/libs/hadoop/src/hadoop/fs/upload.py b/desktop/libs/hadoop/src/hadoop/fs/upload.py index 32c0627fa7e..7c8df89b5bc 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/upload.py +++ b/desktop/libs/hadoop/src/hadoop/fs/upload.py @@ -17,16 +17,9 @@ """ Classes for a custom upload handler to stream into HDFS. - -Note that since our middlewares inspect request.POST, we cannot inject a custom -handler into a specific view. Therefore we always use the HDFSfileUploadHandler, -which is triggered by a magic prefix ("HDFS") in the field name. - -See http://docs.djangoproject.com/en/1.2/topics/http/file-uploads/ """ import os -import sys import time import errno import logging @@ -40,6 +33,7 @@ import hadoop.cluster from desktop.lib import fsmanager from desktop.lib.exceptions_renderable import PopupException +from desktop.lib.fsmanager import get_client from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR from filebrowser.utils import calculate_total_size, generate_chunks from hadoop.conf import UPLOAD_CHUNK_SIZE @@ -194,6 +188,8 @@ def __del__(self): if self._do_cleanup: # Do not do cleanup here. It's hopeless. The self._fs threadlocal states # are going to be all wrong. + + # TODO: Check if this is required with new upload handler flow LOG.debug(f"Check for left-over upload file for cleanup if the upload op was unsuccessful: {self._path}") def get_temp_path(self): @@ -272,6 +268,7 @@ def file_complete(self, file_size): LOG.debug('Uploaded %s bytes %s to in %s seconds' % (file_size, self.chunk_file_path, elapsed)) +# Deprecated and core logic to be replaced with HDFSNewFileUploadHandler class HDFSfileUploadHandler(FileUploadHandler): """ Handle file upload by storing data in a temp HDFS file. @@ -349,3 +346,142 @@ def file_complete(self, file_size): elapsed = time.time() - self._starttime LOG.info('Uploaded %s bytes to HDFS in %s seconds' % (file_size, elapsed)) return self._file + + +class HDFSNewFileUploadHandler(FileUploadHandler): + """ + Handle file upload by storing data in a temp HDFS file. + """ + def __init__(self, dest_path, username): + self.chunk_size = UPLOAD_CHUNK_SIZE.get() + self._file = None + self._starttime = 0 + self._destination = dest_path + self.username = username + + self._fs = self._get_hdfs(self.username) + + # Verify that the path exists + try: + self._fs.stats(self._destination) + except Exception as e: + raise HDFSerror(_('Destination path does not exist: %s' % self._destination)) + + LOG.debug("Chunk size = %d" % self.chunk_size) + + def new_file(self, field_name, file_name, *args, **kwargs): + super(HDFSNewFileUploadHandler, self).new_file(field_name, file_name, *args, **kwargs) + + LOG.info('Using HDFSfileUploadHandler to handle file upload.') + try: + self._file = HDFSNewTemporaryUploadedFile(self._fs, file_name, self._destination, self.username) + LOG.debug('Upload attempt to %s' % (self._file.get_temp_path())) + + self._starttime = time.time() + except Exception as ex: + LOG.error("Not using HDFS upload handler: %s" % (ex)) + raise ex + + raise StopFutureHandlers() + + def receive_data_chunk(self, raw_data, start): + LOG.debug("HDFSfileUploadHandler receive_data_chunk") + + try: + self._file.write(raw_data) + self._file.flush() + return None + except IOError: + LOG.exception('Error storing upload data in temporary file "%s"' % (self._file.get_temp_path())) + raise StopUpload() + + def file_complete(self, file_size): + try: + self._file.finish_upload(file_size) + except IOError: + LOG.exception('Error closing uploaded temporary file "%s"' % (self._file.get_temp_path())) + raise + + elapsed = time.time() - self._starttime + LOG.info('Uploaded %s bytes to HDFS in %s seconds' % (file_size, elapsed)) + return self._file + + def upload_complete(self): + LOG.debug("HDFSFileUploadHandler: Running after upload complete task") + original_file_path = self._fs.join(self._destination, self._file.name) + tmp_file = self._file.get_temp_path() + + self._fs.do_as_user(self.username, self._fs.rename, tmp_file, original_file_path) + + def upload_interrupted(self): + LOG.debug("HDFSFileUploadHandler: Attempting cleanup after upload interruption") + if self._file and hasattr(self._file, 'remove'): + self._file.remove() + + def _get_hdfs(self, username): + fs = get_client(fs='hdfs', user=username) + if not fs: + raise HDFSerror(_("No HDFS found for upload operation.")) + + return fs + + +class HDFSNewTemporaryUploadedFile(object): + """ + A temporary HDFS file to store upload data. + This class does not have any file read methods. + """ + def __init__(self, fs, name, destination, username): + self.name = name + self.size = None + self._do_cleanup = False + self._fs = fs + + self._path = self._fs.mkswap(name, suffix='tmp', basedir=destination) + + # Check access permissions before attempting upload + try: + self._fs.check_access(destination, 'rw-') + except WebHdfsException as e: + raise HDFSerror(_('User %s does not have permissions to write to path "%s".') % (username, destination)) + + if self._fs.exists(self._path): + self._fs._delete(self._path) + + self._file = self._fs.open(self._path, 'w') + + self._do_cleanup = True + + def __del__(self): + if self._do_cleanup: + # Do not do cleanup here. It's hopeless. The self._fs threadlocal states + # are going to be all wrong. + LOG.debug(f"Check for left-over upload file for cleanup if the upload op was unsuccessful: {self._path}") + + def get_temp_path(self): + return self._path + + def finish_upload(self, size): + try: + self.size = size + self.close() + except Exception as ex: + LOG.exception('Error uploading file to %s' % (self._path)) + raise + + def remove(self): + try: + self._fs.remove(self._path, skip_trash=True) + self._do_cleanup = False + except IOError as ex: + if ex.errno != errno.ENOENT: + LOG.exception('Failed to remove temporary upload file "%s". Please cleanup manually: %s' % (self._path, ex)) + + def write(self, data): + self._file.write(data) + + def flush(self): + self._file.flush() + + def close(self): + self._file.close() diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index cb5f2d698c9..c644535e612 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -30,6 +30,7 @@ from builtins import object, oct from urllib.parse import unquote as urllib_unquote, urlparse +from django.http.multipartparser import MultiPartParser from django.utils.encoding import smart_str from django.utils.translation import gettext as _ from past.builtins import long @@ -896,6 +897,7 @@ def do_recursively(self, fn, path, *args, **kwargs): except Exception: pass + # Deprecated def upload(self, file, path, *args, **kwargs): username = kwargs.get('username') if not username: @@ -906,6 +908,14 @@ def upload(self, file, path, *args, **kwargs): self.do_as_user(username, self.rename, tmp_file, dst) + def upload_v1(self, META, input_data, destination, username): + from hadoop.fs.upload import HDFSNewFileUploadHandler # Circular dependency + + hdfs_upload_handler = HDFSNewFileUploadHandler(destination, username) + + parser = MultiPartParser(META, input_data, [hdfs_upload_handler]) + return parser.parse() + def filebrowser_action(self): return None