diff --git a/CHANGES b/CHANGES index 2de7816..974ccbd 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,14 @@ +blueox (0.12.0) + * Move recorders to separate module + * Add pycernan recorder + * Update the way blueox is configured to allow desired + recorder from imported constant + +-- Aaron Biller Tue Sep 4 12:40:45 2018 -0400 + +blueox (0.11.6.4) + * Fix encoding of unknown types + blueox (0.11.6.3) * Fix handling of unicode strings diff --git a/blueox/__init__.py b/blueox/__init__.py index 8cc8cfc..69d63c3 100644 --- a/blueox/__init__.py +++ b/blueox/__init__.py @@ -9,7 +9,7 @@ """ __title__ = 'blueox' -__version__ = '0.11.6.3' +__version__ = '0.12.0' __author__ = 'Rhett Garber' __author_email__ = 'rhettg@gmail.com' __license__ = 'ISC' @@ -18,10 +18,8 @@ __url__ = 'https://github.com/rhettg/BlueOx' import logging -import os from . import utils -from . import network from . import ports from .context import ( Context, set, append, add, context_wrap, current_context, find_context, @@ -30,36 +28,53 @@ from .errors import Error from .logger import LogHandler from .timer import timeit +from .recorders import pycernan, zmq log = logging.getLogger(__name__) +ZMQ_RECORDER = 'zmq' +PYCERNAN_RECORDER = 'pycernan' +RECORDERS = { + ZMQ_RECORDER: zmq, + PYCERNAN_RECORDER: pycernan, +} +DEFAULT_RECORDER = ZMQ_RECORDER + def configure(host, port, recorder=None): """Initialize blueox - This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will - be silently dropped. + This instructs the blueox system where to send its logging data. + If blueox is not configured, log data will be silently dropped. - Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or - to the specified recorder function + Currently we support logging through the network (and the configured host + and port) to a blueoxd instances, or to the specified recorder function. """ - if recorder: + if callable(recorder): _context_mod._recorder_function = recorder - elif host and port: - network.init(host, port) - _context_mod._recorder_function = network.send + else: - log.info("Empty blueox configuration") - _context_mod._recorder_function = None + _rec = RECORDERS.get(recorder, None) + if _rec is not None: + _rec.init(host, port) + _context_mod._recorder_function = _rec.send + else: + log.info("Empty blueox configuration") + _context_mod._recorder_function = None -def default_configure(host=None): + +def default_configure(host=None, recorder=DEFAULT_RECORDER): """Configure BlueOx based on defaults Accepts a connection string override in the form `localhost:3514`. Respects environment variable BLUEOX_HOST """ - host = ports.default_collect_host(host) + _rec = RECORDERS.get(recorder, None) + if _rec is None: + _rec = RECORDERS.get(DEFAULT_RECORDER) + + host = _rec.default_host(host) hostname, port = host.split(':') try: @@ -67,8 +82,9 @@ def default_configure(host=None): except ValueError: raise Error("Invalid value for port") - configure(hostname, int_port) + configure(hostname, int_port, recorder=recorder) def shutdown(): - network.close() + zmq.close() + pycernan.close() diff --git a/blueox/client.py b/blueox/client.py index 8062c64..ec2f420 100644 --- a/blueox/client.py +++ b/blueox/client.py @@ -3,7 +3,8 @@ blueox.client ~~~~~~~~ -This module provides utilities for writing client applications which connect or use blueox data. +This module provides utilities for writing client applications +which connector use blueox data. :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -35,7 +36,8 @@ def default_host(host=None): def decode_stream(stream): - """A generator which reads data out of the buffered file stream, unpacks and decodes the blueox events + """A generator which reads data out of the buffered file stream, + unpacks and decodes the blueox events This is useful for parsing on disk log files generated by blueoxd """ @@ -97,8 +99,8 @@ def subscribe_stream(control_host, subscribe): sock.connect("tcp://%s" % (stream_host,)) # Now that we are connected, loop almost forever emiting events. - # If we fail to receive any events within the specified timeout, we'll quit - # and verify that we are connected to a valid stream. + # If we fail to receive any events within the specified timeout, + # we'll quit and verify that we are connected to a valid stream. poller = zmq.Poller() poller.register(sock, zmq.POLLIN) while True: @@ -113,7 +115,7 @@ def subscribe_stream(control_host, subscribe): if not prefix and subscription and channel != subscription: continue - yield msgpack.unpackb(data,encoding='utf8') + yield msgpack.unpackb(data, encoding='utf8') else: break @@ -137,10 +139,10 @@ def stdin_stream(): class Grouper(object): """Utility for grouping events and sub-events together. - + Events fed into a Grouper are joined by their common 'id'. Encountering the parent event type will trigger emitting a list of all events and sub events - for that single id. + for that single id. This assumes that the parent event will be the last encountered. diff --git a/blueox/context.py b/blueox/context.py index c23fcb7..fc117a9 100644 --- a/blueox/context.py +++ b/blueox/context.py @@ -19,7 +19,6 @@ import logging from . import utils -from . import network log = logging.getLogger(__name__) @@ -41,8 +40,10 @@ def __init__(self, type_name, id=None, sample=None): heirarchy of parent requests. Examples: '.foo' - Will generate a name like '.foo' - '.foo.bar' - If the parent ends in '.foo', the final name will be '.bar' - '^.foo' - Will use the top-most context, generating '.foo' + '.foo.bar' - If the parent ends in '.foo', the final name + will be '.bar' + '^.foo' - Will use the top-most context, generating + '.foo' 'top.foo.bar' - The name will be based on the longest matched parent context. If there is a parent context named 'top' and a parent context named 'top.foo', the new context will be named @@ -111,11 +112,13 @@ def __init__(self, type_name, id=None, sample=None): elif parent_ctx: self.id = parent_ctx.id else: - # Generate an id if one wasn't provided and we don't have any parents - # We're going to encode the time as the front 4 bytes so we have some order to the ids - # that could prove useful later on by making sorting a little easier. - self.id = (struct.pack(">L", int(time.time())) + os.urandom(12)).encode( - 'hex') + # Generate an id if one wasn't provided and we don't have any + # parents. We're going to encode the time as the front 4 bytes + # so we have some order to the ids that could prove useful + # later on by making sorting a little easier. + self.id = ( + struct.pack(">L", int(time.time())) + + os.urandom(12)).encode('hex') if parent_ctx and not parent_ctx.enabled: self.enabled = False diff --git a/blueox/contrib/__init__.py b/blueox/contrib/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/__init__.py +++ b/blueox/contrib/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/celery/__init__.py b/blueox/contrib/celery/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/celery/__init__.py +++ b/blueox/contrib/celery/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/celery/celery_signals.py b/blueox/contrib/celery/celery_signals.py index 3c20092..c37dba6 100644 --- a/blueox/contrib/celery/celery_signals.py +++ b/blueox/contrib/celery/celery_signals.py @@ -1,8 +1,7 @@ """Hooks for gathering celery task data into blueox. -Importing this module will register signal handlers into Celery worker's runtime. - -We also will track creation of tasks on the client side. +Importing this module will register signal handlers into Celery +worker's runtime. We also will track creation of tasks on the client side. """ import traceback @@ -33,9 +32,9 @@ def on_task_sent(sender=None, body=None, **kwargs): @signals.task_sent.connect def on_task_sent(**kwargs): with blueox.Context('.celery.task_sent'): - # Arguments for this signal are different than the worker signals. Sometimes - # they are even different than what the documentation says. See also - # https://github.com/celery/celery/issues/1606 + # Arguments for this signal are different than the worker signals. + # Sometimes they are even different than what the documentation + # says. See also https://github.com/celery/celery/issues/1606 blueox.set('task_id', kwargs.get('task_id', kwargs.get('id'))) blueox.set('task', str(kwargs['task'])) blueox.set('eta', kwargs['eta']) @@ -43,7 +42,14 @@ def on_task_sent(**kwargs): @signals.worker_process_init.connect def on_worker_process_init(**kwargs): - if hasattr(settings, 'BLUEOX_HOST'): + if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): + if settings.BLUEOX_PYCERNAN_HOST: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + settings.BLUEOX_PYCERNAN_HOST, recorder=rec) + else: + blueox.configure(None, None) + elif hasattr(settings, 'BLUEOX_HOST'): if settings.BLUEOX_HOST: blueox.default_configure(settings.BLUEOX_HOST) else: diff --git a/blueox/contrib/django/__init__.py b/blueox/contrib/django/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/django/__init__.py +++ b/blueox/contrib/django/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/django/middleware.py b/blueox/contrib/django/middleware.py index b16f486..1471ac2 100644 --- a/blueox/contrib/django/middleware.py +++ b/blueox/contrib/django/middleware.py @@ -1,6 +1,5 @@ import sys import traceback -import logging import blueox @@ -10,7 +9,14 @@ class Middleware(object): def __init__(self): - if hasattr(settings, 'BLUEOX_HOST'): + if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): + if settings.BLUEOX_PYCERNAN_HOST: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + settings.BLUEOX_PYCERNAN_HOST, recorder=rec) + else: + blueox.configure(None, None) + elif hasattr(settings, 'BLUEOX_HOST'): if settings.BLUEOX_HOST: blueox.default_configure(settings.BLUEOX_HOST) else: @@ -28,7 +34,9 @@ def process_request(self, request): headers = {} for k, v in request.META.iteritems(): - if k.startswith('HTTP_') or k in ('CONTENT_LENGTH', 'CONTENT_TYPE'): + if ( + k.startswith('HTTP_') or + k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): headers[k] = v blueox.set('headers', headers) diff --git a/blueox/contrib/flask/__init__.py b/blueox/contrib/flask/__init__.py index 56fb178..57de85d 100644 --- a/blueox/contrib/flask/__init__.py +++ b/blueox/contrib/flask/__init__.py @@ -23,7 +23,13 @@ class BlueOxMiddleware(object): def __init__(self, app): self.app = app - if 'BLUEOX_HOST' in app.config: + if 'BLUEOX_PYCERNAN_HOST' in app.config: + self.blueox_pycernan_host = app.config['BLUEOX_PYCERNAN_HOST'] + if self.blueox_pycernan_host: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + self.blueox_pycernan_host, recorder=rec) + elif 'BLUEOX_HOST' in app.config: self.blueox_host = app.config['BLUEOX_HOST'] if self.blueox_host: blueox.default_configure(self.blueox_host) @@ -45,8 +51,8 @@ def before_request(self, *args, **kwargs): headers = {} for k, v in request.environ.iteritems(): if ( - k.startswith('HTTP_') or k in - ('CONTENT_LENGTH', 'CONTENT_TYPE')): + k.startswith('HTTP_') or + k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): headers[k] = v blueox.set('headers', headers) diff --git a/blueox/logger.py b/blueox/logger.py index 37d2180..6a7c025 100644 --- a/blueox/logger.py +++ b/blueox/logger.py @@ -3,7 +3,9 @@ blueox.logger ~~~~~~~~ -This module provides integration with blueox and standard python logging module. +This module provides integration with blueox and standard +python logging module. + :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -20,7 +22,8 @@ class LogHandler(logging.Handler): Records standard fields such as logger name, level the message and if an exception was provided, the string formatted exception. - The type name, if not specified will be something like '.log' + The type name, if not specified will be something like + '.log' """ def __init__(self, type_name=None): diff --git a/blueox/ports.py b/blueox/ports.py index 5b1ca2f..d977847 100644 --- a/blueox/ports.py +++ b/blueox/ports.py @@ -28,7 +28,7 @@ def _default_host(host, default_host, default_port): if not host: host = default_host if ':' not in host: - host = "{}:{}".format(host, default_port) + host = '{}:{}'.format(host, default_port) return host @@ -41,3 +41,13 @@ def default_control_host(host=None): def default_collect_host(host=None): default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST) return _default_host(host, default_host, DEFAULT_COLLECT_PORT) + + +# For consistency, we'll abstract pycernan connections in the same way +ENV_VAR_PYCERNAN_HOST = 'BLUEOX_PYCERNAN_HOST' +DEFAULT_PYCERNAN_PORT = 2003 + + +def default_pycernan_host(host=None): + default_host = os.environ.get(ENV_VAR_PYCERNAN_HOST, DEFAULT_HOST) + return _default_host(host, default_host, DEFAULT_PYCERNAN_PORT) diff --git a/blueox/recorders/__init__.py b/blueox/recorders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/blueox/recorders/pycernan.py b/blueox/recorders/pycernan.py new file mode 100644 index 0000000..1edfd5b --- /dev/null +++ b/blueox/recorders/pycernan.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +""" +blueox.recorders.pycernan +~~~~~~~~ + +This module provides the interface into pycernan + +:copyright: (c) 2018 by Aaron Biller?? +:license: ISC, see LICENSE for more details. + +""" +from __future__ import absolute_import + +import atexit +import datetime +import decimal +import json +import logging +import os +import threading + +from pycernan.avro import Client + +from blueox import ports + +log = logging.getLogger(__name__) + +_uname = os.uname()[1] + +# Global blueox avro schema definition +BLUEOX_AVRO_RECORD = { + "doc": "A BlueOx event", + "name": "blueox_event", + "namespace": "blueox.{}".format(_uname), + "type": "record", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "type", "type": "string"}, + {"name": "host", "type": "string"}, + {"name": "pid", "type": "long"}, + {"name": "start", "type": "double"}, + {"name": "end", "type": "double"}, + {"name": "body", "type": ["null", "string"], "default": "null"} + ] +} + + +def default_host(host=None): + """Build a default host string for pycernan + """ + return ports.default_pycernan_host(host) + + +def _serializer(obj): + """Serialize native python objects + """ + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + elif isinstance(obj, decimal.Decimal): + return float(obj) + try: + obj = str(obj) + except Exception: + raise TypeError(repr(obj) + ' is not JSON serializable') + return obj + + +threadLocal = threading.local() + +# Context can be shared between threads +_client = None + + +def init(host, port): + global _client + + _client = Client(host=host, port=port) + + +def _thread_connect(): + if _client and not getattr(threadLocal, 'client', None): + threadLocal.client = _client + + +def _serialize_context(context): + context_dict = context.to_dict() + for key in ('host', 'type'): + if len(context_dict.get(key, '')) > 64: + raise ValueError('Value too long: %r' % key) + + context_dict['id'] = str(context_dict['id']) + + body = context_dict.get('body', None) + if body is not None: + try: + context_dict['body'] = json.dumps(body, default=_serializer) + except (TypeError, ValueError): + try: + context_dict['body'] = unicode(body) + except Exception: + log.exception( + 'Serialization failure (not fatal, dropping data)') + context_dict['body'] = None + + context_dict = { + k: v.encode('utf-8') if isinstance(v, unicode) + else v for k, v in context_dict.items() + } + + return context_dict + + +def send(context): + _thread_connect() + + try: + context_data = [_serialize_context(context)] + except Exception: + log.exception('Failed to serialize context') + return + + if _client and threadLocal.client is not None: + try: + log.debug('Sending msg') + threadLocal.client.publish( + BLUEOX_AVRO_RECORD, context_data, sync=False) + except Exception: + log.exception('Failed during publish to pycernan.') + else: + log.info('Skipping sending event %s', context.name) + + +def close(): + if getattr(threadLocal, 'client', None): + threadLocal.client.close() + threadLocal.client = None + + +atexit.register(close) diff --git a/blueox/network.py b/blueox/recorders/zmq.py similarity index 94% rename from blueox/network.py rename to blueox/recorders/zmq.py index ede1a1e..fa02207 100644 --- a/blueox/network.py +++ b/blueox/recorders/zmq.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -blueox.network +blueox.recorders.zmq ~~~~~~~~ This module provides our interface into ZeroMQ @@ -9,6 +9,8 @@ :license: ISC, see LICENSE for more details. """ +from __future__ import absolute_import + import atexit import logging import msgpack @@ -16,7 +18,8 @@ import threading import zmq -from . import utils +from blueox import ports +from blueox import utils log = logging.getLogger(__name__) @@ -44,6 +47,12 @@ def check_meta_version(meta): raise ValueError(value) +def default_host(host=None): + """Build a default host string for the blueox collector + """ + return ports.default_collect_host(host) + + threadLocal = threading.local() # Context can be shared between threads diff --git a/blueox/store.py b/blueox/store.py index 66f4f19..afeaa2f 100644 --- a/blueox/store.py +++ b/blueox/store.py @@ -226,7 +226,8 @@ def list_log_files(log_path): def filter_log_files_for_active(log_files): - """Filter our list of log files to remove those we expect might be active.""" + """Filter our list of log files to remove those we expect might be active. + """ out_log_files = [] files_by_type = collections.defaultdict(list) @@ -242,11 +243,11 @@ def filter_log_files_for_active(log_files): out_log_files += type_files - # If that last log file is old, then it's probably not being used either. - # We add a buffer of an hour just to make sure everything has rotated - # away safely when this is run close to midnight. - cutoff_date = (datetime.datetime.utcnow() - datetime.timedelta(hours=1) - ).date() + # If that last log file is old, then it's probably not being used + # either. We add a buffer of an hour just to make sure everything has + # rotated away safely when this is run close to midnight. + cutoff_date = ( + datetime.datetime.utcnow() - datetime.timedelta(hours=1)).date() if last_lf.date < cutoff_date: out_log_files.append(last_lf) diff --git a/blueox/timer.py b/blueox/timer.py index dc0cf18..77a22ff 100644 --- a/blueox/timer.py +++ b/blueox/timer.py @@ -3,7 +3,9 @@ blueox.timer ~~~~~~~~ -This module has a timer context manager for easily tracking wall-clock time for some execution +This module has a timer context manager for easily tracking wall-clock +time for some execution + :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. diff --git a/blueox/tornado_utils.py b/blueox/tornado_utils.py index ffbfa9b..af3e2c4 100644 --- a/blueox/tornado_utils.py +++ b/blueox/tornado_utils.py @@ -5,8 +5,8 @@ This module provides hooks for using blueox with the Tornado async web server. Making blueox useful inside tornado is a challenge since you'll likely want a -blueox context per request, but multiple requests can be going on at once inside -tornado. +blueox context per request, but multiple requests can be going on at once +inside tornado. :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -19,8 +19,6 @@ import sys import time -log = logging.getLogger(__name__) - import tornado.web import tornado.gen import tornado.httpclient @@ -29,6 +27,8 @@ import blueox +log = logging.getLogger(__name__) + def _gen_wrapper(ctx, generator): """Generator Wrapper that starts/stops our context @@ -112,7 +112,8 @@ def on_finish(self): class SampleRequestHandler(BlueOxRequestHandlerMixin, tornado.web.RequestHandler): - """Sample base request handler that provides basic information about the request. + """Sample base request handler that provides basic + information about the request. """ def prepare(self): @@ -123,8 +124,8 @@ def prepare(self): def write_error(self, status_code, **kwargs): if 'exc_info' in kwargs: - blueox.set('exception', - ''.join(traceback.format_exception(*kwargs["exc_info"]))) + blueox.set('exception', ''.join( + traceback.format_exception(*kwargs["exc_info"]))) return super(SampleRequestHandler, self).write_error(status_code, **kwargs) @@ -159,15 +160,16 @@ def fetch(self, request, callback=None, **kwargs): ctx.stop() # I'd love to use the future to handle the completion step, BUT, we - # need this to happen first. If the caller has provided a callback, we don't want them - # to get called before we do. Rather than poke into the internal datastructures, we'll just - # handle the callback explicitly + # need this to happen first. If the caller has provided a callback, we + # don't want them to get called before we do. Rather than poke into the + # internal datastructures, we'll just handle the callback explicitly def complete_context(response): ctx.start() ctx.set('response.code', response.code) - ctx.set('response.size', len(response.body) if response.body else 0) + ctx.set('response.size', + len(response.body) if response.body else 0) ctx.done() @@ -175,12 +177,12 @@ def complete_context(response): def fetch_complete(future): # This error handling is just copied from tornado.httpclient as - # we need to record a real HTTPError. httpclient might do the same thing - # again if needs to deal with the caller's callbacks. + # we need to record a real HTTPError. httpclient might do the + # same thing again if needs to deal with the caller's callbacks exc = future.exception() - if isinstance( - exc, - tornado.httpclient.HTTPError) and exc.response is not None: + if ( + isinstance(exc, tornado.httpclient.HTTPError) and + exc.response is not None): response = exc.response elif exc is not None: response = tornado.httpclient.HTTPResponse( diff --git a/blueox/utils.py b/blueox/utils.py index 28d7a93..5a15b29 100644 --- a/blueox/utils.py +++ b/blueox/utils.py @@ -82,5 +82,4 @@ def msgpack_encode_default(obj): return time.mktime(obj.utctimetuple()) if isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") - - raise TypeError("Unknown type: %r" % (obj,)) + return None diff --git a/requirements.txt b/requirements.txt index dfdd0e7..4e7e345 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ pyflakes tornado==3.2 boto yapf +./vendor/pycernan-0.0.10.zip diff --git a/tests/ports_test.py b/tests/ports_test.py index c7d278c..0c5aeb5 100644 --- a/tests/ports_test.py +++ b/tests/ports_test.py @@ -1,5 +1,8 @@ import os -from testify import * +from testify import ( + TestCase, + assert_equal, + teardown) from blueox import ports @@ -71,3 +74,31 @@ def test_env_port(self): os.environ['BLUEOX_HOST'] = 'master:123' host = ports.default_collect_host() assert_equal(host, "master:123") + + +class DefaultPycernanHost(TestCase): + @teardown + def clear_env(self): + try: + del os.environ['BLUEOX_PYCERNAN_HOST'] + except KeyError: + pass + + def test_emtpy(self): + host = ports.default_pycernan_host() + assert_equal(host, '127.0.0.1:2003') + + def test_env(self): + os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s' + host = ports.default_pycernan_host() + assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') + + def test_env_port(self): + os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s:2003' + host = ports.default_pycernan_host() + assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') + + def test_passed(self): + _host = 'my.wish.is.your.command' + host = ports.default_pycernan_host(_host) + assert_equal(host, 'my.wish.is.your.command:2003') diff --git a/tests/recorders/__init__.py b/tests/recorders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/recorders/pycernan_test.py b/tests/recorders/pycernan_test.py new file mode 100644 index 0000000..56ef550 --- /dev/null +++ b/tests/recorders/pycernan_test.py @@ -0,0 +1,164 @@ +import datetime +import decimal +import json +import random + +from testify import ( + TestCase, + setup, + teardown, + assert_equal, + assert_raises) + +from pycernan.avro.serde import serialize +from pycernan.avro.exceptions import DatumTypeException + +from blueox import default_configure, PYCERNAN_RECORDER +from blueox import utils +from blueox import context +from blueox.recorders import pycernan as pycernan_rec +from blueox.recorders import zmq + + +class MockPycernanClient(object): + last_schema = None + last_batch = None + last_sync = None + + def __call__(self, host=None, port=None): + self.host = host + self.port = port + return self + + def publish(self, schema, batch, sync=None): + self.last_schema = schema + self.last_batch = batch + self.last_sync = sync + + def close(self): + pass + + +class CantSerializeMe(object): + def __repr__(self): + return chr(167) + + +class PycernanOverrideTestCase(TestCase): + def test_configure_no_override(self): + default_configure() + assert_equal(context._recorder_function, zmq.send) + + def test_configure_override(self): + pycernan_rec.Client = MockPycernanClient() + default_configure(recorder=PYCERNAN_RECORDER) + assert_equal(context._recorder_function, pycernan_rec.send) + + +class PycernanSendTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + @setup + def init_pycernan(self): + self.port = random.randint(30000, 40000) + self.client = MockPycernanClient() + pycernan_rec.Client = self.client + pycernan_rec.init('127.0.0.1', self.port) + + @setup + def configure_pycernan(self): + context._recorder_function = pycernan_rec.send + + @teardown + def unconfigure_pycernan(self): + context._recorder_function = None + + @teardown + def destroy_recorder(self): + pycernan_rec.close() + + def test(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + + data = self.client.last_batch[0] + data['body'] = json.loads(data['body']) + assert_equal(self.client.last_schema, pycernan_rec.BLUEOX_AVRO_RECORD) + assert_equal(self.client.last_sync, False) + assert_equal(data['id'], '1') + assert_equal(data['type'], 'test') + assert_equal(utils.get_deep(data['body'], 'bar.baz'), 10.0) + + assert_equal(self.client.host, '127.0.0.1') + assert_equal(self.client.port, self.port) + + +class SerializeContextTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + def test_types(self): + with self.context: + self.context.set('decimal_value', decimal.Decimal('6.66')) + self.context.set('date_value', datetime.date(2013, 12, 10)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + + data = pycernan_rec._serialize_context(self.context) + data['body'] = json.loads(data['body']) + assert_equal(data['body']['decimal_value'], 6.66) + assert_equal(data['body']['date_value'], '2013-12-10') + assert_equal( + datetime.datetime.strptime( + data['body']['datetime_value'], '%Y-%m-%dT%H:%M:%S'), + datetime.datetime(2013, 12, 10, 12, 12, 12)) + + def test_exception(self): + with self.context: + self.context.set('value', CantSerializeMe()) + + data = pycernan_rec._serialize_context(self.context) + + # The serialization should fail, but that just + # means we don't have any data. + assert_equal(data['body'], None) + + +class EncodeAvroTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + def test_success(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + + data = pycernan_rec._serialize_context(self.context) + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) + + def test_failure(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + self.context.set('decimal_value', decimal.Decimal('6.66')) + self.context.set('date_value', datetime.date(2013, 12, 10)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + + data = pycernan_rec._serialize_context(self.context) + data['host'] = None + with assert_raises(DatumTypeException): + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) + + def test_none_body(self): + with self.context: + self.context.set('bad_char', CantSerializeMe()) + + data = pycernan_rec._serialize_context(self.context) + assert_equal(data['body'], None) + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) diff --git a/tests/network_test.py b/tests/recorders/zmq_test.py similarity index 72% rename from tests/network_test.py rename to tests/recorders/zmq_test.py index dbfa4c2..c6ae28a 100644 --- a/tests/network_test.py +++ b/tests/recorders/zmq_test.py @@ -3,18 +3,24 @@ import decimal import datetime -from testify import * +from testify import ( + TestCase, + setup, + teardown, + assert_equal) import zmq import msgpack from blueox import utils -from blueox import network from blueox import context +from blueox.recorders import zmq as zmq_rec + class NoNetworkSendTestCase(TestCase): def test(self): """Verify that if network isn't setup, send just does nothing""" - network.send(context.Context('test', 1)) + zmq_rec.send(context.Context('test', 1)) + class NetworkSendTestCase(TestCase): @setup @@ -24,11 +30,11 @@ def build_context(self): @setup def init_network(self): self.port = random.randint(30000, 40000) - network.init("127.0.0.1", self.port) + zmq_rec.init("127.0.0.1", self.port) @setup def configure_network(self): - context._recorder_function = network.send + context._recorder_function = zmq_rec.send @teardown def unconfigure_network(self): @@ -36,7 +42,7 @@ def unconfigure_network(self): @setup def build_server_socket(self): - self.server = network._zmq_context.socket(zmq.PULL) + self.server = zmq_rec._zmq_context.socket(zmq.PULL) self.server.bind("tcp://127.0.0.1:%d" % self.port) @teardown @@ -45,7 +51,7 @@ def destroy_server(self): @teardown def destory_network(self): - network.close() + zmq_rec.close() def test(self): with self.context: @@ -53,8 +59,9 @@ def test(self): self.context.set('bar.baz', 10.0) event_meta, raw_data = self.server.recv_multipart() - network.check_meta_version(event_meta) - _, event_time, event_host, event_type = struct.unpack(network.META_STRUCT_FMT, event_meta) + zmq_rec.check_meta_version(event_meta) + _, event_time, event_host, event_type = struct.unpack( + zmq_rec.META_STRUCT_FMT, event_meta) assert_equal(event_type, 'test') data = msgpack.unpackb(raw_data) @@ -72,26 +79,25 @@ def test_types(self): with self.context: self.context.set('decimal_value', decimal.Decimal("6.66")) self.context.set('date_value', datetime.date(2013, 12, 10)) - self.context.set('datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) - meta_data, context_data = network._serialize_context(self.context) + meta_data, context_data = zmq_rec._serialize_context(self.context) data = msgpack.unpackb(context_data) assert_equal(data['body']['decimal_value'], "6.66") assert_equal(data['body']['date_value'], "2013-12-10") assert_equal( - datetime.datetime.fromtimestamp(float(data['body']['datetime_value'])), + datetime.datetime.fromtimestamp( + float(data['body']['datetime_value'])), datetime.datetime(2013, 12, 10, 12, 12, 12)) def test_exception(self): with self.context: self.context.set('value', Exception('hello')) - meta_data, context_data = network._serialize_context(self.context) + meta_data, context_data = zmq_rec._serialize_context(self.context) data = msgpack.unpackb(context_data) # The serialization should fail, but that just means we don't have any # data. assert_equal(data['body'], None) - - - diff --git a/tests/tornado_utils_test.py b/tests/tornado_utils_test.py index e6aedd4..085da93 100644 --- a/tests/tornado_utils_test.py +++ b/tests/tornado_utils_test.py @@ -1,9 +1,8 @@ import time -import pprint import random import collections import traceback -from testify import * +from testify import assert_equal, setup import tornado.ioloop import tornado.gen @@ -14,7 +13,10 @@ # vendor module. Tornado testing in Testify import tornado_test -class AsyncHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): + +class AsyncHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop @@ -22,7 +24,8 @@ def get(self): req_id = self.blueox_ctx.id blueox.set('async', True) - result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch(self.application.test_url) + result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch( + self.application.test_url) assert result.code == 200 with blueox.Context('.extra'): @@ -32,31 +35,40 @@ def get(self): self.finish() -class AsyncErrorHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncErrorHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop - called = yield tornado.gen.Task(loop.add_timeout, time.time() + random.randint(1, 2)) + _ = yield tornado.gen.Task(loop.add_timeout, time.time() + + random.randint(1, 2)) raise Exception('hi') def write_error(self, status_code, **kwargs): if 'exc_info' in kwargs: - blueox.set('exception', ''.join(traceback.format_exception(*kwargs["exc_info"]))) + blueox.set('exception', ''.join( + traceback.format_exception(*kwargs["exc_info"]))) - return super(AsyncErrorHandler, self).write_error(status_code, **kwargs) + return super(AsyncErrorHandler, self).write_error(status_code, + **kwargs) -class AsyncTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncTimeoutHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop - called = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) + _ = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) -class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncRecurseTimeoutHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def post(self): loop = self.request.connection.stream.io_loop @@ -64,8 +76,8 @@ def post(self): blueox.set("start", True) try: - f = yield http_client.fetch(self.request.body, request_timeout=0.5) - except tornado.httpclient.HTTPError, e: + _ = yield http_client.fetch(self.request.body, request_timeout=0.5) + except tornado.httpclient.HTTPError: self.write("got it") else: self.write("nope") @@ -73,13 +85,14 @@ def post(self): blueox.set("end", True) -class MainHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class MainHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): def get(self): blueox.set('async', False) self.write("Hello, world") - class SimpleTestCase(tornado_test.AsyncHTTPTestCase): @setup def setup_bluox(self): @@ -112,11 +125,6 @@ def test_error(self): f = self.http_client.fetch(self.get_url("/error"), self.stop) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(len(self.log_ctx), 2) found_exception = False @@ -128,31 +136,22 @@ def test_error(self): assert found_exception def test_timeout_error(self): - f = self.http_client.fetch(self.get_url("/timeout"), self.stop, request_timeout=0.5) + f = self.http_client.fetch( + self.get_url("/timeout"), self.stop, request_timeout=0.5) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(len(self.log_ctx), 1) ctx = self.log_ctx[self.log_ctx.keys()[0]][0] assert_equal(get_deep(ctx.to_dict(), 'body.response.code'), 599) def test_recurse_timeout_error(self): url = self.get_url("/timeout") - f = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, + _ = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, body=url, method="POST", request_timeout=1.5) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(resp.code, 200) assert_equal(resp.body, "got it") @@ -161,7 +160,9 @@ def test_recurse_timeout_error(self): for ctx_list in self.log_ctx.values(): for ctx in ctx_list: c = ctx.to_dict() - if c['type'] == 'request.httpclient' and c['body']['response']['code'] == 599: + if ( + c['type'] == 'request.httpclient' and + c['body']['response']['code'] == 599): found_timeout = True if c['type'] == 'request' and get_deep(c, 'body.start'): @@ -175,13 +176,8 @@ def test_context(self): self.http_client.fetch(self.get_url("/async"), self.stop) resp = self.wait() - #for ctx_id in self.log_ctx: - #print - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - - # If everything worked properly, we should have two separate ids, one will have two contexts associated with it. + # If everything worked properly, we should have two separate ids, + # one will have two contexts associated with it. # Hopefully it's the right one. found_sync = None found_async = None @@ -191,7 +187,9 @@ def test_context(self): if ctx.name == "request" and ctx.to_dict()['body']['async']: assert_equal(len(ctx_list), 3) found_async = ctx - if ctx.name == "request" and not ctx.to_dict()['body']['async']: + if ( + ctx.name == "request" and + not ctx.to_dict()['body']['async']): assert_equal(len(ctx_list), 1) found_sync = ctx if ctx.name.endswith("httpclient"): diff --git a/vendor/pycernan-0.0.10.zip b/vendor/pycernan-0.0.10.zip new file mode 100644 index 0000000..c47a78e Binary files /dev/null and b/vendor/pycernan-0.0.10.zip differ