Skip to content

Commit

Permalink
1. merge pr:
Browse files Browse the repository at this point in the history
  a. Fix uploaded records to ignore header chimpler#23
  b. Minor Readme fixes + enhancements. chimpler#21
2. import support multi-file and directorys
3. import and export add read_timeout param
4. export_s3 add override param (if override will add _part{n} suffix)
5. export/import add tempfile_dir param
  • Loading branch information
yanboer committed Mar 22, 2023
1 parent b84140e commit 963d3c7
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 51 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
24 changes: 11 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
v1.0.4
======
* Added support for aws_s3.query_export_to_s3 #12 [@darthbear]

v1.0.3
======
* Fix for current postgres 12.* and s3 #11 [@phileon]

v1.0.2
======
* Made column_list as optional parameter #6 [@oyarushe]

v1.0.1
v1.0.0
======

* Added support of using custom S3 endpoint URL #4 [@oyarushe]
* Add support for MinIO #3 [@huiser]
* Made column_list as optional parameter #6 [@oyarushe]
* Fix for current postgres 12.* and s3 #11 [@phileon]
* Added support for aws_s3.query_export_to_s3 #12 [@darthbear]
* Fix uploaded records to ignore header #23(https://github.com/chimpler/postgres-aws-s3/pull/23) [@tzachshabtay]
* Minor Readme fixes + enhancements #21(https://github.com/chimpler/postgres-aws-s3/pull/21) [@GRBurst]
* import support multi-file and directorys #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export not override file, if file exists will add _part{n} suffix #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* import and export add read_timeout param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export_s3 add override param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
* export/import add tempfile_dir param #1(https://github.com/radondb/postgres-aws-s3/pull/1) [@yanboer] [@lianzhuangzhang]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EXTENSION = aws_s3 # the extensions name
DATA = aws_s3--0.0.1.sql # script files to install
DATA = aws_s3--1.0.0.sql # script files to install

# postgres build stuff
PG_CONFIG = pg_config
Expand Down
120 changes: 85 additions & 35 deletions aws_s3--0.0.1.sql → aws_s3--1.0.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 (
secret_key text default null,
session_token text default null,
endpoint_url text default null,
content_encoding text default null
read_timeout integer default 60,
tempfile_dir text default '/var/lib/postgresql/data/'
) RETURNS int
LANGUAGE plpython3u
AS $$
Expand Down Expand Up @@ -86,33 +87,51 @@ AS $$
s3 = boto3.resource(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

obj = s3.Object(bucket, file_path)
response = obj.get()
content_encoding = content_encoding or response.get('ContentEncoding')
user_content_encoding = response.get('x-amz-meta-content-encoding')
body = response['Body']

with tempfile.NamedTemporaryFile() as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
return res.nrows()
formatted_column_list = "({column_list})".format(column_list=column_list) if column_list else ''
num_rows = 0

for file_path_item in file_path.split(","):
file_path_item = file_path_item.strip()
if not file_path_item:
continue

s3_objects = []
if file_path_item.endswith("/"): # Directory
bucket_objects = s3.Bucket(bucket).objects.filter(Prefix=file_path_item)
s3_objects = [bucket_object for bucket_object in bucket_objects]
else: # File
s3_object = s3.Object(bucket, file_path_item)
s3_objects = [s3_object]

for s3_object in s3_objects:
response = s3_object.get()
content_encoding = response.get('ContentEncoding')
body = response['Body']
user_content_encoding = response.get('x-amz-meta-content-encoding')

with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd:
if (content_encoding and content_encoding.lower() == 'gzip') or (user_content_encoding and user_content_encoding.lower() == 'gzip'):
with gzip.GzipFile(fileobj=body) as gzipfile:
while fd.write(gzipfile.read(204800)):
pass
else:
while fd.write(body.read(204800)):
pass
fd.flush()

res = plpy.execute("COPY {table_name} {formatted_column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
formatted_column_list=formatted_column_list,
options=options
)
)
num_rows += res.nrows()
return num_rows
$$;

--
Expand All @@ -126,14 +145,15 @@ CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3(
s3_info aws_commons._s3_uri_1,
credentials aws_commons._aws_credentials_1,
endpoint_url text default null,
content_encoding text default null
read_timeout integer default 60,
tempfile_dir text default '/var/lib/postgresql/data/'
) RETURNS INT
LANGUAGE plpython3u
AS $$

plan = plpy.prepare(
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) AS num_rows',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'TEXT']
)
return plan.execute(
[
Expand All @@ -146,8 +166,8 @@ AS $$
credentials['access_key'],
credentials['secret_key'],
credentials['session_token'],
endpoint_url,
content_encoding
endpoint_url,
read_timeout
]
)[0]['num_rows']
$$;
Expand All @@ -162,6 +182,9 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
session_token text default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
override boolean default false,
tempfile_dir text default '/var/lib/postgresql/data/',
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
Expand All @@ -180,8 +203,19 @@ AS $$
module_cache[module_name] = _module
return _module

def file_exists(bucket, file_path, s3_client):
try:
s3_client.head_object(Bucket=bucket, Key=file_path)
return True
except:
return False

def get_unique_file_path(base_name, counter, extension):
return f"{base_name}_part{counter}{extension}"

boto3 = cache_import('boto3')
tempfile = cache_import('tempfile')
re = cache_import("re")

plan = plpy.prepare("select name, current_setting('aws_s3.' || name, true) as value from (select unnest(array['access_key_id', 'secret_access_key', 'session_token', 'endpoint_url']) as name) a");
default_aws_settings = {
Expand All @@ -199,10 +233,22 @@ AS $$
s3 = boto3.client(
's3',
region_name=region,
config=boto3.session.Config(read_timeout=read_timeout),
**aws_settings
)

with tempfile.NamedTemporaryFile() as fd:
upload_file_path = file_path
if not override:
# generate unique file path
file_path_parts = re.match(r'^(.*?)(\.[^.]*$|$)', upload_file_path)
base_name = file_path_parts.group(1)
extension = file_path_parts.group(2)
counter = 0
while file_exists(bucket, get_unique_file_path(base_name, counter, extension), s3):
counter += 1
upload_file_path = get_unique_file_path(base_name, counter, extension)

with tempfile.NamedTemporaryFile(dir=tempfile_dir) as fd:
plan = plpy.prepare(
"COPY ({query}) TO '{filename}' {options}".format(
query=query,
Expand All @@ -221,7 +267,7 @@ AS $$
num_lines += buffer.count(b'\n')
size += len(buffer)
fd.seek(0)
s3.upload_fileobj(fd, bucket, file_path)
s3.upload_fileobj(fd, bucket, upload_file_path)
if 'HEADER TRUE' in options.upper():
num_lines -= 1
yield (num_lines, 1, size)
Expand All @@ -233,15 +279,18 @@ CREATE OR REPLACE FUNCTION aws_s3.query_export_to_s3(
credentials aws_commons._aws_credentials_1 default null,
options text default null,
endpoint_url text default null,
read_timeout integer default 60,
override boolean default false,
tempfile_dir text default '/var/lib/postgresql/data/',
OUT rows_uploaded bigint,
OUT files_uploaded bigint,
OUT bytes_uploaded bigint
) RETURNS SETOF RECORD
LANGUAGE plpython3u
AS $$
plan = plpy.prepare(
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
'SELECT * FROM aws_s3.query_export_to_s3($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)',
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'INTEGER', 'BOOLEAN', 'TEXT']
)
return plan.execute(
[
Expand All @@ -253,7 +302,8 @@ AS $$
credentials.get('secret_key') if credentials else None,
credentials.get('session_token') if credentials else None,
options,
endpoint_url
endpoint_url,
read_timeout
]
)
$$;
2 changes: 1 addition & 1 deletion aws_s3.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# aws_s3 extension
comment = 'aws s3 wrapper for non rds postgres'
default_version = '0.0.1'
default_version = '1.0.0'
module_pathname = '$libdir/aws_s3'
relocatable = true
2 changes: 1 addition & 1 deletion mock-servers/postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ WORKDIR /tmp/extension
RUN apt-get update \
&& apt-get -y install python3 python3-pip postgresql-plpython3-13 make
RUN pip3 install boto3
COPY aws_s3--0.0.1.sql aws_s3.control Makefile ./
COPY aws_s3--1.0.0.sql aws_s3.control Makefile ./
RUN make install

0 comments on commit 963d3c7

Please sign in to comment.