Skip to content

Commit

Permalink
drop boto2 from client
Browse files Browse the repository at this point in the history
Note that this work relies heavily on work by @lod in
#78

Note that the ingester and api both need this treatment too. And
both the ingester and the api rely on the datalake client for
shared code (e.g., Record, Metadata, various test fixtures). But
migrating just the client was a heavy enough lift. To preserve
the ingester and api funtionality, I made the docker container
install datalake<1. We will migrate these back to the main line
code in subsequent PRs.
  • Loading branch information
bcavagnolo committed May 30, 2023
1 parent 4215d7f commit b8a67d2
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 198 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ on: [push, pull_request]

jobs:
test-client:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
# 4 jobs total with all permutations
python: [3.6, 3.8, 3.9]
extras: ["test", "test,queable,sentry"]
steps:
Expand All @@ -30,4 +29,4 @@ jobs:
fetch-depth: 0
- name: Test
run: |
make test
make test-ingester test-api
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ ENV LC_ALL C.UTF-8
# TODO: keep requirements in one place
RUN pip install \
blinker>=1.4 \
boto>=2.38.0 \
boto3>=1.1.3 \
click>=5.1 \
Flask>=0.10.1 \
Expand All @@ -28,7 +27,9 @@ RUN pip install \
'pytest<8' \
'responses<0.22.0' \
pyinotify>=0.9.4, \
raven>=5.0.0
raven>=5.0.0 \
'tox>4,<5' \
'datalake<2'

RUN mkdir -p /opt/
COPY . /opt/
Expand All @@ -37,7 +38,7 @@ COPY . /opt/
# the container and used for development. That is, the python paths and paths
# to console scripts Just Work (TM)
ENV PYTHONPATH=/opt/client:/opt/ingester:/opt/api
RUN for d in client ingester api; do \
RUN for d in ingester api; do \
cd /opt/$d && \
python setup.py develop -s /usr/local/bin \
--egg-path ../../../../../opt/$d/ \
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ devshell: docker
docker run --rm -it -v $$PWD:/opt --entrypoint /bin/bash $(IMAGE)

test-client: docker
docker run --rm --entrypoint py.test $(IMAGE) client
docker run --rm --entrypoint tox $(IMAGE) -c /opt/client/tox.ini

test-ingester: docker
docker run --rm --entrypoint py.test $(IMAGE) ingester
Expand Down
149 changes: 75 additions & 74 deletions client/datalake/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
from io import BytesIO
import errno

from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.s3.connection import NoHostProvided
import boto3
import math
from logging import getLogger
log = getLogger('datalake-archive')
Expand Down Expand Up @@ -210,7 +208,26 @@ def push(self, f):
return self.url_from_file(f)

def _upload_file(self, f):
key = self._s3_key_from_metadata(f)

# Implementation inspired by https://stackoverflow.com/a/60892027
obj = self._s3_object_from_metadata(f)

# NB: we have an opportunitiy to turn on threading here, which may
# improve performance. However, in some cases (i.e., queue-based
# uploader) we already use threads. So let's add it later as a
# configuration if/when we want to experiment.
config = boto3.s3.transfer.TransferConfig(
# All sizes are bytes
multipart_threshold=CHUNK_SIZE(),
use_threads=False,
multipart_chunksize=CHUNK_SIZE(),
)

extra = {
'Metadata': {
METADATA_NAME: json.dumps(f.metadata)
}
}

spos = f.tell()
f.seek(0, os.SEEK_END)
Expand All @@ -220,38 +237,22 @@ def _upload_file(self, f):

num_chunks = int(math.ceil(f_size / float(CHUNK_SIZE())))
log.info("Uploading {} ({} B / {} chunks)".format(
key.name, f_size, num_chunks))
if num_chunks <= 1:
key.set_metadata(METADATA_NAME, json.dumps(f.metadata))
completed_size = key.set_contents_from_file(f)
log.info("Upload of {} complete (1 part / {} B).".format(
key.name, completed_size))
return
completed_size = 0
obj.key, f_size, num_chunks))

chunk = 0
mp = key.bucket.initiate_multipart_upload(
key.name, metadata={
METADATA_NAME: json.dumps(f.metadata)
})
try:
for chunk in range(1, num_chunks + 1):
part = mp.upload_part_from_file(
f, chunk, size=CHUNK_SIZE())
completed_size += part.size
log.debug("Uploaded chunk {}/{} ({}B)".format(
chunk, num_chunks, part.size))
except: # NOQA
# Any exception we want to attempt to cancel_upload, otherwise
# AWS will bill us every month indefnitely for storing the
# partial-uploaded chunks.
log.exception("Upload of {} failed on chunk {}".format(
key.name, chunk))
mp.cancel_upload()
raise
else:
completed = mp.complete_upload()
log.info("Upload of {} complete ({} parts / {} B).".format(
completed.key_name, chunk, completed_size))

def _progress(number_of_bytes):
nonlocal chunk
log.info("Uploaded chunk {}/{} ({}B)".format(
chunk, num_chunks, CHUNK_SIZE()))
chunk += 1

# NB: deep under the hood, upload_fileobj creates a
# CreateMultipartUploadTask. And that object cleans up after itself:
# https://github.com/boto/s3transfer/blob/develop/s3transfer/tasks.py#L353-L360 # noqa
obj.upload_fileobj(f, ExtraArgs=extra, Config=config,
Callback=_progress)
obj.wait_until_exists()

def url_from_file(self, f):
return self._get_s3_url(f)
Expand Down Expand Up @@ -279,12 +280,11 @@ def _is_valid_http_url(self, url):
return url.startswith('http') and url.endswith('/data')

def _fetch_s3_url(self, url, stream=False):
k = self._get_key_from_url(url)
m = self._get_metadata_from_key(k)
obj, m = self._get_object_from_url(url)
if stream:
return StreamingFile(k, **m)
return StreamingFile(obj._datalake_details['Body'], **m)
fd = BytesIO()
k.get_contents_to_file(fd)
self._s3_bucket.download_fileobj(obj.key, fd)
fd.seek(0)
return File(fd, **m)

Expand Down Expand Up @@ -331,8 +331,7 @@ def fetch_to_filename(self, url, filename_template=None):
'''
k = None
if url.startswith('s3://'):
k = self._get_key_from_url(url)
m = self._get_metadata_from_key(k)
obj, m = self._get_object_from_url(url)
else:
m = self._get_metadata_from_http_url(url)
fname = self._get_filename_from_template(filename_template, m)
Expand All @@ -357,18 +356,19 @@ def _mkdirs(self, path):
else:
raise

def _get_key_from_url(self, url):
def _get_object_from_url(self, url):
self._validate_fetch_url(url)
key_name = self._get_key_name_from_url(url)
k = self._s3_bucket.get_key(key_name)
if k is None:
obj = self._s3.Object(self._s3_bucket_name, key_name)
try:
# cache the results of the get on the obj to avoid superfluous
# network calls.
obj._datalake_details = obj.get()
m = obj._datalake_details['Metadata'].get(METADATA_NAME)
except self._s3.meta.client.exceptions.NoSuchKey:
msg = 'Failed to find {} in the datalake.'.format(url)
raise InvalidDatalakePath(msg)
return k

def _get_metadata_from_key(self, key):
m = key.get_metadata(METADATA_NAME)
return Metadata.from_json(m)
return obj, Metadata.from_json(m)

def _get_filename_from_template(self, template, metadata):
if template is None:
Expand All @@ -388,7 +388,12 @@ def _get_key_name_from_url(self, url):
msg = '{} is not a valid datalake url'.format(url)
raise InvalidDatalakePath(msg)

return parts.path
# NB: under boto 2 we didn't used to have to have the lstrip. It seems
# that boto2 explicitly stripped these leading slashes for us:
# https://groups.google.com/g/boto-users/c/mv--NMPUXoU ...but boto3
# does not. So we must take care to strip it whenever we parse a URL to
# get a key.
return parts.path.lstrip('/')

def _validate_fetch_url(self, url):
valid_base_urls = (self.storage_url, self.http_url)
Expand All @@ -398,51 +403,47 @@ def _validate_fetch_url(self, url):
raise InvalidDatalakePath(msg)

def _get_s3_url(self, f):
key = self._s3_key_from_metadata(f)
obj = self._s3_object_from_metadata(f)
return self._URL_FORMAT.format(bucket=self._s3_bucket_name,
key=key.name)
key=obj.key)

@property
def _s3_bucket_name(self):
return self._parsed_storage_url.netloc

@memoized_property
def _s3_bucket(self):
# Note: we pass validate=False because we may just have push
# permissions. If validate is not False, boto tries to list the
# bucket. And this will 403.
return self._s3_conn.get_bucket(self._s3_bucket_name, validate=False)
return self._s3.Bucket(self._s3_bucket_name)

_KEY_FORMAT = '{id}/data'

def _s3_key_from_metadata(self, f):
# For performance reasons, s3 keys should start with a short random
# sequence:
# https://aws.amazon.com/blogs/aws/amazon-s3-performance-tips-tricks-seattle-hiring-event/
# http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html
def _s3_object_from_metadata(self, f):
key_name = self._KEY_FORMAT.format(**f.metadata)
return Key(self._s3_bucket, name=key_name)
return self._s3_bucket.Object(key_name)

@property
def _s3_host(self):
h = environ.get('AWS_S3_HOST')
if h is not None:
return h
return 'https://' + h
r = environ.get('AWS_REGION') or environ.get('AWS_DEFAULT_REGION')
if r is not None:
return 's3-' + r + '.amazonaws.com'
return 'https://s3-' + r + '.amazonaws.com'
else:
return NoHostProvided
return None

@property
def _s3_conn(self):
if not hasattr(self, '_conn'):
k = environ.get('AWS_ACCESS_KEY_ID')
s = environ.get('AWS_SECRET_ACCESS_KEY')
self._conn = S3Connection(aws_access_key_id=k,
aws_secret_access_key=s,
host=self._s3_host)
return self._conn
@memoized_property
def _s3(self):
# boto3 uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# boto3 will use AWS_DEFAULT_REGION if AWS_REGION is not set
return boto3.resource('s3',
region_name=environ.get('AWS_REGION'),
endpoint_url=self._s3_host)

@memoized_property
def _s3_client(self):
boto_session = boto3.Session()
return boto_session.client('s3')

def _requests_get(self, url, **kwargs):
return self._session.get(url, timeout=TIMEOUT(), **kwargs)
Expand Down
Loading

0 comments on commit b8a67d2

Please sign in to comment.