From f69e058031445d9ab1013db26e4ef337a21f346c Mon Sep 17 00:00:00 2001 From: rdwebster Date: Tue, 8 Oct 2024 09:20:04 -0700 Subject: [PATCH] fix: Prevent truncated Parquet files in S3 after failed CreateMultipartUpload During a call to s3.to_parquet(), if the size of the data exceeds 5MB a multi-part upload operation will be initiated. If the S3 call to CreateMultipartUpload fails (such as with a 503 SlowDown error) then the incomplete Parquet file data was being written to S3 using 'put_object' during close(). This resulted in broken Parquet files in S3, causing errors when queried by services like Athena. Now, the data buffer is cleared at the end of the call to flush() -- even when an exception occurs. --- awswrangler/s3/_fs.py | 67 +++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/awswrangler/s3/_fs.py b/awswrangler/s3/_fs.py index 72f1a7ae3..4abcb31f4 100644 --- a/awswrangler/s3/_fs.py +++ b/awswrangler/s3/_fs.py @@ -406,39 +406,44 @@ def flush(self, force: bool = False) -> None: return None if total_size == 0: return None - _logger.debug("Flushing: %s bytes", total_size) - self._mpu = self._mpu or _utils.try_it( - f=self._client.create_multipart_upload, # type: ignore[arg-type] - ex=_S3_RETRYABLE_ERRORS, - base=0.5, - max_num_tries=6, - Bucket=self._bucket, - Key=self._key, - **get_botocore_valid_kwargs( - function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs - ), - ) - self._buffer.seek(0) - for chunk_size in _utils.get_even_chunks_sizes( - total_size=total_size, chunk_size=_MIN_WRITE_BLOCK, upper_bound=False - ): - _logger.debug("chunk_size: %s bytes", chunk_size) - self._parts_count += 1 - self._upload_proxy.upload( - bucket=self._bucket, - key=self._key, - part=self._parts_count, - upload_id=self._mpu["UploadId"], - data=self._buffer.read(chunk_size), - s3_client=self._client, - boto3_kwargs=get_botocore_valid_kwargs( - function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs + + try: + _logger.debug("Flushing: %s bytes", total_size) + self._mpu = self._mpu or _utils.try_it( + f=self._client.create_multipart_upload, # type: ignore[arg-type] + ex=_S3_RETRYABLE_ERRORS, + base=0.5, + max_num_tries=6, + Bucket=self._bucket, + Key=self._key, + **get_botocore_valid_kwargs( + function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs ), ) - self._buffer.seek(0) - self._buffer.truncate(0) - self._buffer.close() - self._buffer = io.BytesIO() + self._buffer.seek(0) + for chunk_size in _utils.get_even_chunks_sizes( + total_size=total_size, chunk_size=_MIN_WRITE_BLOCK, upper_bound=False + ): + _logger.debug("chunk_size: %s bytes", chunk_size) + self._parts_count += 1 + self._upload_proxy.upload( + bucket=self._bucket, + key=self._key, + part=self._parts_count, + upload_id=self._mpu["UploadId"], + data=self._buffer.read(chunk_size), + s3_client=self._client, + boto3_kwargs=get_botocore_valid_kwargs( + function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs + ), + ) + finally: + # Ensure that the buffer is cleared (even in the event of an exception) so that + # any partial data doesn't get written when close() is called. + self._buffer.seek(0) + self._buffer.truncate(0) + self._buffer.close() + self._buffer = io.BytesIO() return None def readable(self) -> bool: