diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 901ae58..c4792cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,17 +1,15 @@ -v1.0.4 -====== -* Added support for aws_s3.query_export_to_s3 #12 [@darthbear] - -v1.0.3 -====== -* Fix for current postgres 12.* and s3 #11 [@phileon] - -v1.0.2 -====== -* Made column_list as optional parameter #6 [@oyarushe] - -v1.0.1 +v1.0.0 ====== * Added support of using custom S3 endpoint URL #4 [@oyarushe] * Add support for MinIO #3 [@huiser] +* Made column_list as optional parameter #6 [@oyarushe] +* Fix for current postgres 12.* and s3 #11 [@phileon] +* Added support for aws_s3.query_export_to_s3 #12 [@darthbear] +* Fix uploaded records to ignore header #23(https://github.com/chimpler/postgres-aws-s3/pull/23) [@tzachshabtay] +* Minor Readme fixes + enhancements #21(https://github.com/chimpler/postgres-aws-s3/pull/21) [@GRBurst] +* import support multi-file and directorys #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang] +* export not override file, if file exists will add _part{n} suffix #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang] +* import and export add read_timeout param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang] +* export_s3 add override param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang] +* export/import add tempfile_dir param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang] diff --git a/Makefile b/Makefile index ca355fc..0bffbae 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ EXTENSION = aws_s3 # the extensions name -DATA = aws_s3--0.0.1.sql # script files to install +DATA = aws_s3--1.0.0.sql # script files to install # postgres build stuff PG_CONFIG = pg_config diff --git a/aws_s3--0.0.1.sql b/aws_s3--1.0.0.sql similarity index 66% rename from aws_s3--0.0.1.sql rename to aws_s3--1.0.0.sql index 9041d35..5ebb1fa 100644 --- a/aws_s3--0.0.1.sql +++ b/aws_s3--1.0.0.sql @@ -49,7 +49,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 ( secret_key text default null, session_token text default null, endpoint_url text default null, - content_encoding text default null + read_timeout integer default 60, + tempfile_dir text default '/var/lib/postgresql/data/' ) RETURNS int LANGUAGE plpython3u AS $$ @@ -86,33 +87,51 @@ AS $$ s3 = boto3.resource( 's3', region_name=region, + config=boto3.session.Config(read_timeout=read_timeout), **aws_settings ) - obj = s3.Object(bucket, file_path) - response = obj.get() - content_encoding = content_encoding or response.get('ContentEncoding') - user_content_encoding = response.get('x-amz-meta-content-encoding') - body = response['Body'] - - with tempfile.NamedTemporaryFile() as fd: - if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): - with gzip.GzipFile(fileobj=body) as gzipfile: - while fd.write(gzipfile.read(204800)): - pass - else: - while fd.write(body.read(204800)): - pass - fd.flush() - formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else '' - res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format( - table_name=table_name, - filename=plpy.quote_literal(fd.name), - formatted_column_list=formatted_column_list, - options=options - ) - ) - return res.nrows() + formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else '' + num_rows = 0 + + for file_path_item in file_path.split(","): + file_path_item = file_path_item.strip() + if not file_path_item: + continue + + s3_objects = [] + if file_path_item.endswith("/"): # Directory + bucket_objects = s3.Bucket(bucket).objects.filter(Prefix=file_path_item) + s3_objects = [bucket_object for bucket_object in bucket_objects] + else: # File + s3_object = s3.Object(bucket, file_path_item) + s3_objects = [s3_object] + + for s3_object in s3_objects: + response = s3_object.get() + content_encoding = response.get('ContentEncoding') + body = response['Body'] + user_content_encoding = response.get('x-amz-meta-content-encoding') + + with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd: + if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'): + with gzip.GzipFile(fileobj=body) as gzipfile: + while fd.write(gzipfile.read(204800)): + pass + else: + while fd.write(body.read(204800)): + pass + fd.flush() + + res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format( + table_name=table_name, + filename=plpy.quote_literal(fd.name), + formatted_column_list=formatted_column_list, + options=options + ) + ) + num_rows += res.nrows() + return num_rows $$; -- @@ -126,14 +145,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3( s3_info aws_commons._s3_uri_1, credentials aws_commons._aws_credentials_1, endpoint_url text default null, - content_encoding text default null + read_timeout integer default 60, + tempfile_dir text default '/var/lib/postgresql/data/' ) RETURNS INT LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) AS num_rows', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] + 'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) AS num_rows', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'TEXT'] ) return plan.execute( [ @@ -146,8 +166,8 @@ AS $$ credentials['access_key'], credentials['secret_key'], credentials['session_token'], - endpoint_url, - content_encoding + endpoint_url, + read_timeout ] )[0]['num_rows'] $$; @@ -162,6 +182,9 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( session_token text default null, options text default null, endpoint_url text default null, + read_timeout integer default 60, + override boolean default false, + tempfile_dir text default '/var/lib/postgresql/data/', OUT rows_uploaded bigint, OUT files_uploaded bigint, OUT bytes_uploaded bigint @@ -180,8 +203,19 @@ AS $$ module_cache[module_name] = _module return _module + def file_exists(bucket, file_path, s3_client): + try: + s3_client.head_object(Bucket=bucket, Key=file_path) + return True + except: + return False + + def get_unique_file_path(base_name, counter, extension): + return f"{base_name}_part{counter}{extension}" + boto3 = cache_import('boto3') tempfile = cache_import('tempfile') + re = cache_import("re") plan = plpy.prepare("select name, current_setting('aws_s3.' || name, true) as value from (select unnest(array['access_key_id', 'secret_access_key', 'session_token', 'endpoint_url']) as name) a"); default_aws_settings = { @@ -199,10 +233,22 @@ AS $$ s3 = boto3.client( 's3', region_name=region, + config=boto3.session.Config(read_timeout=read_timeout), **aws_settings ) - with tempfile.NamedTemporaryFile() as fd: + upload_file_path = file_path + if not override: + # generate unique file path + file_path_parts = re.match(r'^(.*?)(\.[^.]*$|$)', upload_file_path) + base_name = file_path_parts.group(1) + extension = file_path_parts.group(2) + counter = 0 + while file_exists(bucket, get_unique_file_path(base_name, counter, extension), s3): + counter += 1 + upload_file_path = get_unique_file_path(base_name, counter, extension) + + with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd: plan = plpy.prepare( "COPY ({query}) TO '{filename}' {options}".format( query=query, @@ -221,7 +267,7 @@ AS $$ num_lines += buffer.count(b'\n') size += len(buffer) fd.seek(0) - s3.upload_fileobj(fd, bucket, file_path) + s3.upload_fileobj(fd, bucket, upload_file_path) if 'HEADER TRUE' in options.upper(): num_lines -= 1 yield (num_lines, 1, size) @@ -233,6 +279,9 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( credentials aws_commons._aws_credentials_1 default null, options text default null, endpoint_url text default null, + read_timeout integer default 60, + override boolean default false, + tempfile_dir text default '/var/lib/postgresql/data/', OUT rows_uploaded bigint, OUT files_uploaded bigint, OUT bytes_uploaded bigint @@ -240,8 +289,8 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3( LANGUAGE plpython3u AS $$ plan = plpy.prepare( - 'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9)', - ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT'] + 'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)', + ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'BOOLEAN', 'TEXT'] ) return plan.execute( [ @@ -253,7 +302,8 @@ AS $$ credentials.get('secret_key') if credentials else None, credentials.get('session_token') if credentials else None, options, - endpoint_url + endpoint_url, + read_timeout ] ) $$; diff --git a/aws_s3.control b/aws_s3.control index 5a111ce..5431451 100644 --- a/aws_s3.control +++ b/aws_s3.control @@ -1,5 +1,5 @@ # aws_s3 extension comment = 'aws s3 wrapper for non rds postgres' -default_version = '0.0.1' +default_version = '1.0.0' module_pathname = '$libdir/aws_s3' relocatable = true diff --git a/mock-servers/postgres/Dockerfile b/mock-servers/postgres/Dockerfile index 43420de..3bb9391 100644 --- a/mock-servers/postgres/Dockerfile +++ b/mock-servers/postgres/Dockerfile @@ -6,5 +6,5 @@ WORKDIR /tmp/extension RUN apt-get update \ && apt-get -y install python3 python3-pip postgresql-plpython3-13 make RUN pip3 install boto3 -COPY aws_s3--0.0.1.sql aws_s3.control Makefile ./ +COPY aws_s3--1.0.0.sql aws_s3.control Makefile ./ RUN make install