diff --git a/.gitignore b/.gitignore index c7fd4480..f29664cf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ dist MANIFEST build .eggs -.env \ No newline at end of file +.env +.pytest_cache diff --git a/.travis.yml b/.travis.yml index 38146fa6..2a978d30 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,10 @@ language: python +sudo: required +dist: xenial python: - - "3.3" - - "3.4" - "3.5" + - "3.6" + - "3.7" install: - "pip install ." script: make test diff --git a/HISTORY.md b/HISTORY.md index e38dff9a..f192d979 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,8 @@ +1.5.0 / 2018-12-14 +================== + + * Add S3 transport to upload files directly to S3. + 1.3.1 / 2018-01-06 ================== diff --git a/README.md b/README.md index 239328c8..eb8c0514 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -analytics-python -============== +# analytics-python [![Build Status](https://travis-ci.org/FindHotel/analytics-python.svg?branch=master)](https://travis-ci.org/FindHotel/analytics-python) @@ -9,6 +8,12 @@ analytics-python is a python client is a slightly modified version of [Segment's ## Usage +The documentation for Segment's Python SDK that this repository is based on +is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). +Check Segment's docs to get familiar with the API. + +You can package directly, in this case default `http` transport will be used: + ```python import analytics @@ -16,20 +21,74 @@ import analytics analytics.write_key='AWS_API_GATEWAY_KEY' # The custom endpoint to where the events will be delivered to -analytics.endpoint='https://polku.fih.io/dev/[hookname]' +analytics.endpoint='https://segment.fih.io/v1/[endpoint-key]' analytics.track('kljsdgs99', 'SignedUp', {'plan': 'Enterprise'}) +analytics.flush() ``` +Use client with custom error handling function: -## More information +```python -The documentation for Segment's Python SDK that this repository is based on is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). You can use Segment's docs to get familiar with the API. +import analytics +ANALYTICS_WRITE_KEY='AWS_API_GATEWAY_KEY' +ANALYTICS_ENDPOINT='https://segment.fih.io/v1/[endpoint-key]' -## License +def log_error(e, batch): + print("exception: {}, batch: {}".format(e, batch), flush=True) + +client = analytics.Client( + endpoint=ANALYTICS_ENDPOINT, + write_key=ANALYTICS_WRITE_KEY, + debug=analytics.debug, + on_error=log_error, + send=analytics.send, + max_queue_size=analytics.max_queue_size, + upload_size=analytics.upload_size +) +client.track(...) +client.flush() ``` + +### Using S3 transport + +When using `s3` transport SDK will upload data directly to AWS S3 bypassing http interface. + +```python + +MB = 1024*1024 + +c = Client( + write_key="write-key", + endpoint="https://segment.fih.io/v1/[endpoint-key]", + upload_size=1*MB, + transport='s3', + max_queue_size=1000000, +) + +for i in range(30000): + c.track( + user_id='pavel', + event='UUIDGenerated', + properties=dict(id=str(uuid.uuid4()), counter=i) + ) + if i % 10000 == 0: + c.flush() + +c.flush() +assert False +``` + +## More information + +The documentation for Segment's Python SDK that this repository is based on is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). You can use Segment's docs to get familiar with the API. + +## License + +```txt WWWWWW||WWWWWW W W W||W W W || diff --git a/analytics/__init__.py b/analytics/__init__.py index 788c1841..64d08164 100644 --- a/analytics/__init__.py +++ b/analytics/__init__.py @@ -7,6 +7,7 @@ """Settings.""" write_key = None endpoint = 'https://api.segment.io/v1/batch' +transport = 'http' max_queue_size = 10000 upload_size = 100 on_error = None @@ -55,7 +56,7 @@ def _proxy(method, *args, **kwargs): default_client = Client(write_key, debug=debug, on_error=on_error, send=send, endpoint=endpoint, max_queue_size=max_queue_size, - upload_size=upload_size) + upload_size=upload_size, transport=transport) fn = getattr(default_client, method) fn(*args, **kwargs) diff --git a/analytics/client.py b/analytics/client.py index cbcbea5e..dce31f58 100644 --- a/analytics/client.py +++ b/analytics/client.py @@ -10,6 +10,7 @@ from analytics.utils import guess_timezone, clean from analytics.consumer import Consumer +from analytics.s3_consumer import S3Consumer from analytics.version import VERSION try: @@ -22,22 +23,37 @@ class Client(object): - """Create a new Segment client.""" + """Create a new Segment client. + + upload_size has different meaning, depending on chosen transport. + For http transport upload_size means number of items to be batched + in a single POST request to backend. + For s3 transport upload_size means size in bytes of _uncompressed_ + partition of the data. Sane default value is between 10 and 100 MB + depending on compressability of underlying data. + """ log = logging.getLogger('segment') def __init__(self, write_key=None, debug=False, max_queue_size=10000, - send=True, on_error=None, endpoint=None, upload_size=100): + send=True, on_error=None, endpoint=None, upload_size=100, + transport='http'): require('write_key', write_key, string_types) - self.queue = queue.Queue(max_queue_size) - self.consumer = Consumer(self.queue, write_key, endpoint=endpoint, - on_error=on_error, upload_size=upload_size) self.write_key = write_key self.endpoint = endpoint self.on_error = on_error self.debug = debug self.send = send + if transport == 'http': + self.consumer = Consumer(self.queue, write_key, endpoint=endpoint, + on_error=on_error, upload_size=upload_size) + elif transport == 's3': + self.consumer = S3Consumer(self.queue, write_key, endpoint=endpoint, + on_error=on_error, upload_size=upload_size) + else: + raise ValueError("transport should be either http or s3") + if debug: self.log.setLevel(logging.DEBUG) diff --git a/analytics/consumer.py b/analytics/consumer.py index 1771130c..483f6976 100644 --- a/analytics/consumer.py +++ b/analytics/consumer.py @@ -1,14 +1,10 @@ import logging from threading import Thread +from queue import Empty import analytics -from analytics.version import VERSION from analytics.request import post -try: - from queue import Empty -except: - from Queue import Empty class Consumer(Thread): """Consumes the messages from the client's queue.""" @@ -59,7 +55,7 @@ def upload(self): self.on_error(e, batch) finally: # mark items as acknowledged from queue - for item in batch: + for _ in batch: self.queue.task_done() return success diff --git a/analytics/request.py b/analytics/request.py index f2be1567..6dbb9ce8 100644 --- a/analytics/request.py +++ b/analytics/request.py @@ -10,6 +10,30 @@ _session = sessions.Session() +@retry(wait_exponential_multiplier=500, wait_exponential_max=5000, + stop_max_delay=20000) +def get(write_key, endpoint): + log = logging.getLogger('segment') + headers = { + 'content-type': 'application/json', + 'x-api-key': write_key, + } + res = _session.get(endpoint, headers=headers, timeout=15) + + if res.status_code == 200: + log.debug('get request is successful') + return res.json() + + try: + payload = res.json() + log.debug('received response: %s', payload) + raise APIError( + res.status_code, + payload.get('code', '???'), + payload.get('message', '???')) + except ValueError: + raise APIError(res.status_code, 'unknown', res.text) + @retry(wait_exponential_multiplier=500, wait_exponential_max=5000, stop_max_delay=20000) @@ -21,7 +45,10 @@ def post(write_key, endpoint, **kwargs): body["sentAt"] = int(time.time()*1000) auth = HTTPBasicAuth(write_key, '') data = json.dumps(body, cls=DatetimeSerializer) - headers = { 'content-type': 'application/json', 'x-api-key': write_key } + headers = { + 'content-type': 'application/json', + 'x-api-key': write_key, + } log.debug('making request: %s', data) res = _session.post(endpoint, data=data, auth=auth, headers=headers, timeout=15) diff --git a/analytics/s3_consumer.py b/analytics/s3_consumer.py new file mode 100644 index 00000000..8384ef11 --- /dev/null +++ b/analytics/s3_consumer.py @@ -0,0 +1,175 @@ +import logging +import io +import json +import gzip +import uuid +from functools import reduce +from threading import Thread +from datetime import datetime +from queue import Empty + +import boto3 + +import analytics +from analytics.request import get + + +MB = 1024*1024 + +class S3Consumer(Thread): + """Consumes the messages from the client's queue and pushes it to s3.""" + log = logging.getLogger('segment') + + _layouts = ( + ('YYYY', '%Y'), + ('MM', '%m'), + ('DD', '%d'), + ('HH', '%H'), + ) + + def __init__(self, queue, write_key, upload_size=10*MB, on_error=None, + endpoint=None, dt=None): + """Create a consumer thread. + upload_size is the size of chunk in bytes. + """ + Thread.__init__(self) + # Make consumer a daemon thread so that it doesn't block program exit + self.daemon = True + if upload_size < 1*MB: + raise ValueError("upload_size should be >= {} (1 MB), got {}".format(1*MB, upload_size)) + self.upload_size = upload_size + self.on_error = on_error + self.queue = queue + + if dt is None: + dt = datetime.now() + + self._reset_buffer() + self.s3 = boto3.client('s3') + + s3_details = S3Consumer._s3_details(write_key, endpoint or analytics.endpoint) + + layouts = [(k, dt.strftime(v)) for (k, v) in self._layouts] + prefix = reduce(lambda a, kv: a.replace(*kv), layouts, s3_details['prefix']) + + self.s3_details = dict( + bucket=s3_details['bucket'], + key_template='{prefix}/{job_id}-part-%d.json.gz'.format( + prefix=prefix, + job_id=str(uuid.uuid4()) + ), + part=0, + tags=s3_details.get('tags', None), + ) + self.log.debug("s3 details: {}".format(self.s3_details)) + + self.encoder = json.JSONEncoder() + + # It's important to set running in the constructor: if we are asked to + # pause immediately after construction, we might set running to True in + # run() *after* we set it to False in pause... and keep running forever. + self.running = True + + def _reset_buffer(self): + self.buf = io.BytesIO() + + def _writer(self): + return gzip.GzipFile(fileobj=self.buf, mode='w') + + def _reader(self): + return gzip.GzipFile(fileobj=self.buf, mode='r') + + @staticmethod + def _s3_details(write_key, endpoint): + """ + Goes to endpoint, reads details of the object where data should be uploaded to. + """ + res = get(write_key, endpoint) + if not ('s3_bucket' in res and 's3_prefix' in res): + raise ValueError("Response should contain s3_bucket and s3_prefix keys, got {}".format(res)) + return { + 'bucket': res['s3_bucket'], + 'prefix': res['s3_prefix'], + 'tags': res.get('tags', None), + } + + def run(self): + """Runs the consumer.""" + self.log.debug('s3 consumer is running...') + while self.running: + self.upload() + + self.log.debug('s3 consumer exited.') + + def pause(self): + """Pause the consumer.""" + self.running = False + + def upload(self): + """Upload the next batch of items, return whether successful.""" + success = False + total_items_to_upload = self.next() + if total_items_to_upload == 0: + return False + + try: + self._upload_request() + self.s3_details['part'] += 1 + self._reset_buffer() + success = True + except Exception as e: + self.log.error('error uploading: %s', e) + success = False + if self.on_error: + self.on_error(e, self._reader().read()) + finally: + # mark items as acknowledged from queue + for _ in range(total_items_to_upload): + self.queue.task_done() + + return success + + def next(self): + """Writes the next batch of items from the queue to the buffer.""" + queue = self.queue + written_bytes = 0 + written_items = 0 + + writer = self._writer() + + while written_bytes < self.upload_size or queue.empty(): + try: + item = queue.get(block=True, timeout=0.5) + s = self.encoder.encode(item) + '\n' + written_bytes += writer.write(bytes(s, 'UTF-8')) + written_items += 1 + except Empty: + break + + self.log.debug("written {} bytes".format(written_bytes)) + return written_items + + def _upload_request(self, attempt=0): + """Attempt to upload the data present in the buffer. It will throw Exception on failure.""" + + bucket = self.s3_details['bucket'] + key = self.s3_details['key_template'] % (self.s3_details['part']) + tags = self.s3_details.get('tags', None) + + kwargs = dict( + ACL='bucket-owner-full-control', + Bucket=bucket, + Key=key, + ) + if tags is not None: + kwargs['Tagging'] = tags + + self.log.info("Uploading to s3 with args {}".format(kwargs)) + result = self.s3.put_object( + Body=self.buf.getvalue(), + **kwargs + ) + self.log.info("Upload to s3 finished with result: {}".format(result)) + + if result['HTTPStatusCode'] != 200: + raise Exception("S3 upload failed: {}".format(result)) diff --git a/analytics/version.py b/analytics/version.py index b2e81777..84262d36 100644 --- a/analytics/version.py +++ b/analytics/version.py @@ -1 +1 @@ -VERSION = '1.4.0' +VERSION = '1.5.0' diff --git a/setup.py b/setup.py index 3ae322f7..db5a1416 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,8 @@ "requests>=2.7,<3.0", "six>=1.7", "python-dateutil>2.1", - "retrying>=1.3.3" + "retrying>=1.3.3", + 'boto3>=1.9.57' ] setup( @@ -41,7 +42,7 @@ packages=['analytics', 'analytics.test'], license='MIT License', install_requires=install_requires, - description='FindHotel\'s fork of Segment\'s Python SDK.', + description="FindHotel's fork of Segment's Python SDK.", long_description=long_description, classifiers=[ "Development Status :: 5 - Production/Stable", @@ -50,10 +51,8 @@ "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.2", - "Programming Language :: Python :: 3.3", - "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7" ], )