Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import/export optimize, merge some pr #1

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
24 changes: 11 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
v1.0.4
======
* Added support for aws_s3.query_export_to_s3 #12 [@darthbear]

v1.0.3
======
* Fix for current postgres 12.* and s3 #11 [@phileon]

v1.0.2
======
* Made column_list as optional parameter #6 [@oyarushe]

v1.0.1
v1.0.0
======

* Added support of using custom S3 endpoint URL #4 [@oyarushe]
* Add support for MinIO #3 [@huiser]
* Made column_list as optional parameter #6 [@oyarushe]
* Fix for current postgres 12.* and s3 #11 [@phileon]
* Added support for aws_s3.query_export_to_s3 #12 [@darthbear]
* Fix uploaded records to ignore header #23(https://github.com/chimpler/postgres-aws-s3/pull/23) [@tzachshabtay]
* Minor Readme fixes + enhancements #21(https://github.com/chimpler/postgres-aws-s3/pull/21) [@GRBurst]
* import support multi-file and directorys #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export not override file, if file exists will add _part{n} suffix #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* import and export add read_timeout param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export_s3 add override param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export/import add tempfile_dir param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EXTENSION = aws_s3 # the extensions name
DATA = aws_s3--0.0.1.sql # script files to install
DATA = aws_s3--1.0.0.sql # script files to install

# postgres build stuff
PG_CONFIG = pg_config
Expand Down
120 changes: 85 additions & 35 deletions aws_s3--0.0.1.sql → aws_s3--1.0.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 (
secret_key text default null,
session_token text default null,
endpoint_url text default null,
content_encoding text default null
read_timeout integer default 60,
tempfile_dir text default '/var/lib/postgresql/data/'
) RETURNS int
LANGUAGE plpython3u
AS $$
Expand Down Expand Up @@ -86,33 +87,51 @@ AS $$
s3 = boto3.resource(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

obj = s3.Object(bucket, file_path)
response = obj.get()
content_encoding = content_encoding or response.get('ContentEncoding')
user_content_encoding = response.get('x-amz-meta-content-encoding')
body = response['Body']

with tempfile.NamedTemporaryFile() as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
return res.nrows()
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
num_rows = 0

for file_path_item in file_path.split(","):
file_path_item = file_path_item.strip()
if not file_path_item:
continue

s3_objects = []
if file_path_item.endswith("/"): # Directory
bucket_objects = s3.Bucket(bucket).objects.filter(Prefix=file_path_item)
s3_objects = [bucket_object for bucket_object in bucket_objects]
else: # File
s3_object = s3.Object(bucket, file_path_item)
s3_objects = [s3_object]

for s3_object in s3_objects:
response = s3_object.get()
content_encoding = response.get('ContentEncoding')
body = response['Body']
user_content_encoding = response.get('x-amz-meta-content-encoding')

with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()

res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
num_rows += res.nrows()
return num_rows
$$;

--
Expand All @@ -126,14 +145,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3(
s3_info aws_commons._s3_uri_1,
credentials aws_commons._aws_credentials_1,
endpoint_url text default null,
content_encoding text default null
read_timeout integer default 60,
tempfile_dir text default '/var/lib/postgresql/data/'
) RETURNS INT
LANGUAGE plpython3u
AS $$

plan = plpy.prepare(
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'TEXT']
)
return plan.execute(
[
Expand All @@ -146,8 +166,8 @@ AS $$
credentials['access_key'],
credentials['secret_key'],
credentials['session_token'],
endpoint_url,
content_encoding
endpoint_url,
read_timeout
]
)[0]['num_rows']
$$;
Expand All @@ -162,6 +182,9 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
session_token text default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
override boolean default false,
tempfile_dir text default '/var/lib/postgresql/data/',
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
Expand All @@ -180,8 +203,19 @@ AS $$
module_cache[module_name] = _module
return _module

def file_exists(bucket, file_path, s3_client):
try:
s3_client.head_object(Bucket=bucket, Key=file_path)
return True
except:
return False

def get_unique_file_path(base_name, counter, extension):
return f"{base_name}_part{counter}{extension}"

boto3 = cache_import('boto3')
tempfile = cache_import('tempfile')
re = cache_import("re")

plan = plpy.prepare("select name, current_setting('aws_s3.' || name, true) as value from (select unnest(array['access_key_id', 'secret_access_key', 'session_token', 'endpoint_url']) as name) a");
default_aws_settings = {
Expand All @@ -199,10 +233,22 @@ AS $$
s3 = boto3.client(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

with tempfile.NamedTemporaryFile() as fd:
upload_file_path = file_path
if not override:
# generate unique file path
file_path_parts = re.match(r'^(.*?)(\.[^.]*$|$)', upload_file_path)
base_name = file_path_parts.group(1)
extension = file_path_parts.group(2)
counter = 0
while file_exists(bucket, get_unique_file_path(base_name, counter, extension), s3):
counter += 1
upload_file_path = get_unique_file_path(base_name, counter, extension)

with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd:
plan = plpy.prepare(
"COPY ({query}) TO '{filename}' {options}".format(
query=query,
Expand All @@ -221,7 +267,7 @@ AS $$
num_lines += buffer.count(b'\n')
size += len(buffer)
fd.seek(0)
s3.upload_fileobj(fd, bucket, file_path)
s3.upload_fileobj(fd, bucket, upload_file_path)
if 'HEADER TRUE' in options.upper():
num_lines -= 1
yield (num_lines, 1, size)
Expand All @@ -233,15 +279,18 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
credentials aws_commons._aws_credentials_1 default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
override boolean default false,
tempfile_dir text default '/var/lib/postgresql/data/',
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
) RETURNS SETOF RECORD
LANGUAGE plpython3u
AS $$
plan = plpy.prepare(
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'BOOLEAN', 'TEXT']
)
return plan.execute(
[
Expand All @@ -253,7 +302,8 @@ AS $$
credentials.get('secret_key') if credentials else None,
credentials.get('session_token') if credentials else None,
options,
endpoint_url
endpoint_url,
read_timeout
]
)
$$;
2 changes: 1 addition & 1 deletion aws_s3.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# aws_s3 extension
comment = 'aws s3 wrapper for non rds postgres'
default_version = '0.0.1'
default_version = '1.0.0'
module_pathname = '$libdir/aws_s3'
relocatable = true
2 changes: 1 addition & 1 deletion mock-servers/postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ WORKDIR /tmp/extension
RUN apt-get update \
&& apt-get -y install python3 python3-pip postgresql-plpython3-13 make
RUN pip3 install boto3
COPY aws_s3--0.0.1.sql aws_s3.control Makefile ./
COPY aws_s3--1.0.0.sql aws_s3.control Makefile ./
RUN make install