diff --git a/.gitignore b/.gitignore index 8d00d32..77b5a44 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ MANIFEST *.egg-info .coverage .tox +.vscode diff --git a/bin/speakeasy b/bin/speakeasy old mode 100644 new mode 100755 index 52db7c7..47d67e1 --- a/bin/speakeasy +++ b/bin/speakeasy @@ -1,12 +1,21 @@ #!/usr/bin/env python +from __future__ import absolute_import +from __future__ import print_function + import argparse import logging -from speakeasy.speakeasy import Speakeasy import socket -import time +import threading +import signal +import itertools +import pdb + +from speakeasy.speakeasy import Speakeasy logger = logging.getLogger() +quit_event = threading.Event() + def init_logger(verbose): if verbose: @@ -18,48 +27,61 @@ def init_logger(verbose): handler.setFormatter(formatter) logger.addHandler(handler) + +def catch_quit_signal(signum, stack): + logger.warn("Caught signal %d", signum) + quit_event.set() + + def main(): - parser = argparse.ArgumentParser(description='Run Speakeasy server') - parser.add_argument('-H', '--host', required=False, default=socket.getfqdn(), - help='Hostname to emit metrics as') - parser.add_argument('-ms', '--metric-socket', required=True, - help='Metric socket to listen on') - parser.add_argument('-cp', '--cmd-port', required=True, - help='Port to receive commands on') - parser.add_argument('-pp', '--pub-port', required=False, - help='Port to publish updates on') - parser.add_argument('-e', '--emitter', required=True, - help='Module to emit data through periodically') - parser.add_argument('-ea', '--emitter-args', required=False, nargs='*', - help='Arguments to be passed to emitter module') - parser.add_argument('-ei', '--emission-interval', default=60, required=False, type=float, - help='Frequency to emit data (seconds)') - parser.add_argument('-l', '--legacy', required=False, type=str, - help='Optional legacy socket to listen for incoming metrics') - parser.add_argument('-sm', '--socket-mod', required=False, default=None, type=str, - help='File mode for metric socket') - parser.add_argument('-v', '--verbose', required=False, default=False, action='store_true', - help='Enable verbose logging') - - args = parser.parse_args() - - init_logger(args.verbose) - - logger.info('Start speakeasy') - socket_mod = int(args.socket_mod, base=8) if args.socket_mod else None - server = Speakeasy(args.host, args.metric_socket, args.cmd_port, - args.pub_port, args.emitter, args.emitter_args, - args.emission_interval, args.legacy, - socket_mod=socket_mod) - server.start() - - while True: - try: - time.sleep(1) - except (KeyboardInterrupt, Exception), e: - logger.warn("Exception... exiting - {0}".format(e)) - server.shutdown() - break - -if __name__=='__main__': - main() + parser = argparse.ArgumentParser(description='Run Speakeasy server') + parser.add_argument('-H', '--host', required=False, default=socket.getfqdn(), + help='Hostname to emit metrics as') + parser.add_argument('-ms', '--metric-socket', required=True, + help='Metric socket to listen on') + parser.add_argument('-cp', '--cmd-port', required=True, + help='Port to receive commands on') + parser.add_argument('-pp', '--pub-port', required=False, + help='Port to publish updates on') + parser.add_argument('-e', '--emitter', required=True, + help='Module to emit data through periodically') + parser.add_argument('-ea', '--emitter-args', required=False, nargs='*', action="append", default=[], + help='Arguments to be passed to emitter module') + parser.add_argument('-ei', '--emission-interval', default=60, required=False, type=float, + help='Frequency to emit data (seconds)') + parser.add_argument('-l', '--legacy', required=False, type=str, + help='Optional legacy socket to listen for incoming metrics') + parser.add_argument('-sm', '--socket-mod', required=False, default=None, type=str, + help='File mode for metric socket') + parser.add_argument('-v', '--verbose', required=False, default=False, action='store_true', + help='Enable verbose logging') + + args = parser.parse_args() + + init_logger(args.verbose) + + logger.info('Start speakeasy') + + socket_mod = int(args.socket_mod, base=8) if args.socket_mod else None + server = Speakeasy(args.host, args.metric_socket, args.cmd_port, + args.pub_port, args.emitter, itertools.chain(*args.emitter_args), + args.emission_interval, args.legacy, + socket_mod=socket_mod) + server.start() + + signal.signal(signal.SIGTERM, catch_quit_signal) + signal.signal(signal.SIGINT, catch_quit_signal) + signal.signal(signal.SIGPIPE, catch_quit_signal) + signal.signal(signal.SIGUSR1, lambda x, y: pdb.set_trace()) + + while not quit_event.is_set(): + try: + quit_event.wait(10) + except Exception as e: + logger.info("Exception (%s)... exiting", e) + + server.shutdown() + + +if __name__ == '__main__': + main() diff --git a/docs/conf.py b/docs/conf.py index 40f3700..352d6cb 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,13 +11,14 @@ # # All configuration values have a default; values that are commented out # serve to show the default. - +import sys +import os import sphinx_rtd_theme # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.insert(0, os.path.abspath('.')) +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + "/..")) # -- General configuration ------------------------------------------------ diff --git a/docs/index.rst b/docs/index.rst index 537ff82..da047dc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,7 +20,7 @@ Content: intro quickstart - + speakeasy Indices and tables ================== diff --git a/docs/speakeasy.rst b/docs/speakeasy.rst new file mode 100644 index 0000000..7e18048 --- /dev/null +++ b/docs/speakeasy.rst @@ -0,0 +1,17 @@ +Module documentation +==================== + +.. automodule:: speakeasy.speakeasy + :members: + :undoc-members: + + .. automethod:: __init__ + +Included emitters +================= + +Simple +------ + +.. automodule:: speakeasy.emitter.simple + \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..f45c59d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[flake8] +max-line-length=120 + diff --git a/speakeasy/emitter/simple.py b/speakeasy/emitter/simple.py index 3f93af7..d8ca269 100644 --- a/speakeasy/emitter/simple.py +++ b/speakeasy/emitter/simple.py @@ -1,16 +1,44 @@ +"""The simple emitter writes metrics every interval to a file. + +To use this emitter provide the emitter arg `filename` to write to. If an arg is not provided, +this emitter will attempt to write to `metrics.out` barring any permissions issue. + +.. code-block:: bash + + speakeasy ... --emitter simple --emitter-args filename=/var/tmp/metrics.out + +The special names `stdout` or `stderr` will send outputs to the appropriate stream +""" import logging -import os +import sys logger = logging.getLogger(__name__) +DEFAULT_FILENAME = "metrics.out" +DEFAULT_SEPARATOR = "|" + + class Emitter(object): + def __init__(self, **kwargs): - self.filename = kwargs['filename'] + self.filename = kwargs.get('filename', DEFAULT_FILENAME) + self.separator = kwargs.get('separator', DEFAULT_SEPARATOR) def emit(self, metrics): + fh = None + if self.filename == "stderr": + fh = sys.stderr + elif self.filename == "stdout": + fh = sys.stdout + + if fh: + self._emit(fh, metrics) + else: + with open(self.filename, 'a') as fh: + self._emit(fh, metrics) + + def _emit(self, fh, metrics): """ Ship the metrics off """ - with open(self.filename, 'a') as fh: - for metric in metrics: - mline = '|'.join([str(m) for m in metric]) - logger.debug('Writing metric out to file - {0}'.format(mline)) - fh.write(mline+'\n') + for metric in metrics: + mline = self.separator.join([str(m) for m in metric]) + fh.write(mline + '\n') diff --git a/speakeasy/speakeasy.py b/speakeasy/speakeasy.py index d69c685..1f45ac4 100644 --- a/speakeasy/speakeasy.py +++ b/speakeasy/speakeasy.py @@ -10,11 +10,29 @@ import ujson import bisect import zmq - +import resource import utils +import signal logger = logging.getLogger(__name__) +# Unused +MAX_RECEIVE_HWM = 20000 + +# How often should we poll the socket for metrics +POLLING_TIMEOUT_MS = 500 + +# How long should we wait on metric queue to events. Affects responsiveness to server shutdown +QUEUE_WAIT_SECS = 5 + + +# After these many loops of POLLING_TIMEOUT_MS, metric processed counts +# will be updated in stats +METRIC_CHECKPOINT_INTERVAL = 20 + +# max rss before dying +MAX_RSS = 2000000000 # 2GB + def _gauge_pair(): """ Track gauge as a pair of (sum, count) """ @@ -22,9 +40,11 @@ def _gauge_pair(): class Speakeasy(object): + "A Speakeasy instance" + def __init__(self, host, metric_socket, cmd_port, pub_port, emitter_name, emitter_args=None, emission_interval=60, legacy=None, - hwm=20000, socket_mod=None): + hwm=MAX_RECEIVE_HWM, socket_mod=None): """ Aggregate metrics and emit. Also support live data querying. """ self.metric_socket = metric_socket self.pub_port = pub_port @@ -36,16 +56,22 @@ def __init__(self, host, metric_socket, cmd_port, pub_port, emitter_name, self.percentiles = [0.5, 0.75, 0.95, 0.99] self.metrics_queue = Queue.Queue() self.metrics_lock = threading.RLock() + self.stop = threading.Event() # Setup legacy socket if needed + # If socket file name exists, will delete and recreate it self.legacy_socket = None if self.legacy: if os.path.exists(self.legacy): logger.warn('Remove existing legacy socket "{0}" and recreating'.format(self.legacy)) os.remove(self.legacy) self.legacy_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self.legacy_socket.bind(self.legacy) + try: + self.legacy_socket.bind(self.legacy) + except OSError as e: + logger.fatal("Error changing perms: %s", e) self.legacy_socket_fno = self.legacy_socket.fileno() + logger.info("Will read legacy input from %s", self.legacy) # Process the args for emitter self.emitter_args = {} @@ -59,24 +85,41 @@ def __init__(self, host, metric_socket, cmd_port, pub_port, emitter_name, if not self.emitter: logger.warn("No emitter found") + logger.info("Using emitter '%s' with args %s", self.emitter_name, self.emitter_args) + self.context = zmq.Context() # Listen for metrics self.recv_socket = self.context.socket(zmq.PULL) + # Increase the HWM - self.recv_socket.set_hwm(hwm) - self.recv_socket.bind('ipc://{0}'.format(self.metric_socket)) - if socket_mod: - os.chmod(self.metric_socket, socket_mod) + self.recv_socket.set_hwm(hwm) # warn, According to http://api.zeromq.org/2-1:zmq-socket, this has no effect on a PULL socket + try: + self.recv_socket.bind('ipc://{0}'.format(self.metric_socket)) + if socket_mod: + os.chmod(self.metric_socket, socket_mod) + logger.info("Will receive client metrics from %s", self.metric_socket) + except zmq.error.ZMQError as e: + logger.fatal("Could not bind receive socket: %s", e) + except OSError as e: + logger.fatal("Error changing perms: %s", e) # Listen for commands self.cmd_socket = self.context.socket(zmq.REP) - self.cmd_socket.bind('tcp://*:{0}'.format(self.cmd_port)) + try: + self.cmd_socket.bind('tcp://*:{0}'.format(self.cmd_port)) + logger.info("Will receive commands from *:%s", self.cmd_port) + except zmq.error.ZMQError as e: + logger.fatal("Could not bind cmd socket: %s", e) # Publish metrics if self.pub_port: self.pub_socket = self.context.socket(zmq.PUB) - self.pub_socket.bind('tcp://*:{0}'.format(self.pub_port)) + try: + self.pub_socket.bind('tcp://*:{0}'.format(self.pub_port)) + logger.info("Will publish events to PUB socket at *:%s", self.pub_port) + except zmq.error.ZMQError as e: + logger.fatal("Could not bind pub socket: %s", e) # Register sockets for polling self.poller = zmq.Poller() @@ -86,9 +129,9 @@ def __init__(self, host, metric_socket, cmd_port, pub_port, emitter_name, self.poller.register(self.legacy_socket, zmq.POLLIN) # Setup poll and emit thread - self.poll_thread = threading.Thread(target=self.poll_sockets, args=()) - self.emit_thread = threading.Thread(target=self.emit_metrics, args=()) - self.process_thread = threading.Thread(target=self.process_metrics_queue, args=()) + self.poll_thread = threading.Thread(target=self.poll_sockets, args=(), name="poll_input") + self.emit_thread = threading.Thread(target=self.emit_metrics, args=(), name="emit_metrics") + self.process_thread = threading.Thread(target=self.process_metrics_queue, args=(), name="process_metrics_queue") # Init metrics # Index metrics by appname @@ -96,40 +139,60 @@ def __init__(self, host, metric_socket, cmd_port, pub_port, emitter_name, def process_metrics_queue(self): logger.info("Start processing metrics queue") - while self.running: + while not self.stop.is_set(): try: - metric, legacy = self.metrics_queue.get(block=False) + metric, legacy = self.metrics_queue.get(timeout=QUEUE_WAIT_SECS) except Queue.Empty: - time.sleep(0.01) continue try: self.process_metric(metric, legacy=legacy) except Exception as e: - logger.warn("Failed to process metric: {0}".format(e)) + logger.warn("Failed to process metric [{0}]: {1}".format(metric, e)) self.metrics_queue.task_done() + logger.info("Stop processing metrics queue") def gauge_append(self, lst, value): + "Add a value to the gauge. This adds to a running sum of the values and a counter for how many values were added" lst[0] += value lst[1] += 1 + def is_shutdown(self): + return self.stop.is_set() + def gauge_sum(self, lst): - return float(lst[0])/lst[1] + "Returns the computed mean for the gauge values added till now" + return float(lst[0]) / lst[1] def process_gauge_metric(self, app_name, metric_name, value): + "Updates the gauge value to the internal counters and returns the mean of the values till now" with self.metrics_lock: dp = self.metrics[app_name]['GAUGE'][metric_name] self.gauge_append(dp, value) - return self.gauge_sum(dp) + return self.gauge_sum(dp) def process_counter_metric(self, app_name, metric_name, value): + "Add to the internal counter and return the current running total" with self.metrics_lock: self.metrics[app_name]['COUNTER'][metric_name] += value return self.metrics[app_name]['COUNTER'][metric_name] + def update_internal_counters(self, app_name, metric_type): + "Track speakeasy stats" + if app_name == "speakeasy": + return + + # common + self.process_metric(["speakeasy", "metrics._type_.{0}.total".format(metric_type), "COUNTER", 1]) + self.process_metric(["speakeasy", "metrics.total", "COUNTER", 1]) + + # app + self.process_metric(["speakeasy", "metrics._app_.{0}.total".format(app_name), "COUNTER", 1]) + self.process_metric(["speakeasy", "metrics._app_.{0}._type_.{1}.total".format(app_name, metric_type), "COUNTER", 1]) + def process_metric(self, metric, legacy=False): - """ Process metrics and store and publish """ + """Add metric value to internal structures, and publish the current values to the publish socket if asked""" if legacy: # Legacy format for metrics is slightly different... # Index them under same "app name" @@ -138,11 +201,16 @@ def process_metric(self, metric, legacy=False): else: app_name, metric_name, metric_type, value = metric + if app_name != "speakeasy": # avoid loop + self.update_internal_counters(app_name, metric_type) + try: value = float(value) except ValueError: - logger.warn("Failed to cast metric value to float - {0}".format(metric)) + logger.warn("Failed to cast metric value to float: app=%s metric_name=%s metric_type=%s value=%s", + app_name, metric_name, metric_type, value) return + if app_name not in self.metrics: self.init_app_metrics(app_name) @@ -151,6 +219,7 @@ def process_metric(self, metric, legacy=False): pub_metrics = [] else: pub_metrics = None + pub_val = None if metric_type == 'GAUGE': pub_val = self.process_gauge_metric(app_name, metric_name, value) @@ -159,6 +228,8 @@ def process_metric(self, metric, legacy=False): elif metric_type == 'PERCENTILE' or metric_type == 'HISTOGRAM': # Kill off the HISTOGRAM type!! metric_type = 'PERCENTILE' + if not metric_name.endswith("."): + metric_name += "." # Track average value separately avg_pub_val = self.process_gauge_metric(app_name, metric_name + 'average', value) with self.metrics_lock: @@ -170,7 +241,7 @@ def process_metric(self, metric, legacy=False): cur_time = time.time() for p in self.percentiles: pub_metrics.append((self.hostname, app_name, - '{0}{1}_percentile'.format(metric_name, int(p*100)), + '{0}{1}_percentile'.format(metric_name, int(p * 100)), 'GAUGE', utils.percentile(dp, p), cur_time)) pub_metrics.append((self.hostname, app_name, metric_name + 'average', 'GAUGE', avg_pub_val, cur_time)) @@ -193,23 +264,68 @@ def process_command(self, cmd): # TODO: Do something here pass + def watch_memory(self): + rss_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # https://github.com/kmike/scrapy/commit/a83c6f545c440edb901e706beb2cb4e6c24b8381 + if sys.platform != "darwin": + rss_usage *= 1024 + self.process_metric(["speakeasy", "mem.rss", "GAUGE", rss_usage]) + if not self.stop.is_set() and (rss_usage > MAX_RSS): + logger.error("RSS usage (%d) > limit (%d). Killing myself.", rss_usage, MAX_RSS) + os.kill(os.getpid(), signal.SIGTERM) + def poll_sockets(self): """ Poll metrics socket and cmd socket for data """ logger.info("Start polling") - while self.running: - socks = dict(self.poller.poll(1000)) + loop = 0 + received_count = 0 + good_count = 0 + decode_errors = 0 + empty_packets = 0 + while not self.stop.is_set(): + loop += 1 + if not (loop % METRIC_CHECKPOINT_INTERVAL): # 20 * 0.5s + self.watch_memory() + self.process_metric(["speakeasy", "metrics_queue.length", "GAUGE", self.metrics_queue.qsize()]) + + self.process_metric(["speakeasy", "received_metrics.total", "COUNTER", received_count]) + self.process_metric(["speakeasy", "received_metrics.empty.total", "COUNTER", empty_packets]) + self.process_metric(["speakeasy", "received_metrics.good.total", "COUNTER", good_count]) + self.process_metric(["speakeasy", "received_metrics.decode_errors.total", "COUNTER", decode_errors]) + + received_count = good_count = decode_errors = empty_packets = 0 + + socks = dict(self.poller.poll(POLLING_TIMEOUT_MS)) + if socks.get(self.recv_socket) == zmq.POLLIN: - try: - metric = ujson.loads(self.recv_socket.recv()) - # Put metric on metrics queue - self.metrics_queue.put((metric, False)) - except ValueError as e: - logger.warn("Error receving metric: {0}".format(e)) + try: + data = self.recv_socket.recv() + received_count += 1 + if not data: + logger.warn("Got an empty packet (possibly rumrunner ping)") + empty_packets += 1 + continue + metric = ujson.loads(data) + # Put metric on metrics queue + self.metrics_queue.put((metric, False)) + good_count += 1 + except ValueError as e: + logger.warn("Error receiving metric [data=%s]: %s", data, e) + decode_errors += 1 + continue + # logger.debug("Received and processed %d metrics from socket", count) if socks.get(self.cmd_socket) == zmq.POLLIN: - cmd = ujson.loads(self.cmd_socket.recv()) - # Process command - self.process_command(cmd) + while True: + try: + data = self.cmd_socket.recv(zmq.NOBLOCK) + cmd = ujson.loads(data) + # Process command + self.process_command(cmd) + except zmq.ZMQError: + break + except ValueError as e: + logger.warn("Error receiving command [data=%s]: %s", data, e) if self.legacy and socks.get(self.legacy_socket_fno) == zmq.POLLIN: # Process legacy format @@ -223,29 +339,22 @@ def poll_sockets(self): def emit_metrics(self): """ Send snapshot of metrics through emitter """ - while self.running: - logger.info("Emit metrics") + logger.info("Start emitting") + while not self.stop.is_set(): + e_start = time.time() # Grab "this is what the world looks like now" snapshot metrics_ss = self.snapshot() - e_start = time.time() if self.emitter: self.emitter.emit(metrics_ss) e_end = time.time() - # Sleep for 1 second interval until time to emit again - if (e_end - e_start) < self.emission_interval: - sleep_until = time.time() + (self.emission_interval - (e_end - e_start)) - while self.running: - ct = time.time() - if ct > sleep_until: - break - else: - if sleep_until - ct < 1: - time.sleep(sleep_until - ct) - else: - time.sleep(1) + duration = e_end - e_start + sleep_time = self.emission_interval - duration + logger.info("Emitted %d metrics in %.2f seconds. Will sleep for %.2f seconds", + len(metrics_ss), duration, sleep_time) + self.stop.wait(sleep_time) logger.info("Stop emitting") @@ -257,7 +366,6 @@ def snapshot(self): """ metrics = [] with self.metrics_lock: - logger.debug("Inside of metrics lock") ss = copy.deepcopy(self.metrics) # Reset metrics @@ -269,7 +377,7 @@ def snapshot(self): for m, vals in ss[app]['GAUGE'].iteritems(): if vals[1] == 0: - logger.debug("No values for metric: {0}".format(m)) + logger.debug("No values for GAUGE metric: app=%s metric_name=%s", app, m) continue if vals: @@ -278,21 +386,21 @@ def snapshot(self): for m, vals in ss[app]['PERCENTILE'].iteritems(): if len(vals) == 0: - logger.debug("No values for metric: {0}".format(m)) + logger.debug("No values for PERCENTILE metric: app=%s metric_name=%s", app, m) continue # Emit 50%, 75%, 95%, 99% as GAUGE for p in self.percentiles: # Assume the metric name has a trailing separator to append # the percentile to - metrics.append((app, '{0}{1}_percentile'.format(m, int(p*100)), + metrics.append((app, '{0}{1}_percentile'.format(m, int(p * 100)), utils.percentile(vals, p), 'GAUGE', time.time())) return metrics def reset_metrics(self): """ Reset metrics for next interval """ - for app in self.metrics: - with self.metrics_lock: + with self.metrics_lock: + for app in self.metrics: self.metrics[app]['GAUGE'] = collections.defaultdict(_gauge_pair) self.metrics[app]['PERCENTILE'] = collections.defaultdict(list) @@ -313,13 +421,12 @@ def shutdown(self): self.__stop() def __start(self): - self.running = True self.poll_thread.start() self.emit_thread.start() self.process_thread.start() def __stop(self): - self.running = False + self.stop.set() logger.info("Shutting down") if self.poll_thread: logger.info("Waiting for poll thread to stop...") @@ -330,43 +437,64 @@ def __stop(self): self.emit_thread.join() if self.process_thread: - logger.info("Waiting for process thread to stop...") + logger.info("Waiting for metrics queue processing thread to stop...") self.process_thread.join() self.__cleanup() def __cleanup(self): - if self.legacy: - if os.path.exists(self.legacy): - logger.info('Cleaning up legacy socket') - os.remove(self.legacy) - os.remove(self.metric_socket) + try: + if self.legacy: + if os.path.exists(self.legacy): + logger.info('Cleaning up legacy socket') + os.remove(self.legacy) + os.remove(self.metric_socket) + except OSError as e: + logger.error(e) def import_emitter(name, **kwargs): + "Returns an instance of an emitter with a class `Emitter` in the module `speakeasy.emitter.$name`, or `None` if no such module could be found" + namespace = 'speakeasy.emitter.' if namespace not in name: name = namespace + name try: - __import__(name) - except Exception: + __import__(name, level=0) + except Exception as e: # app doesn't exist - return + logger.error("Error loading module %s: %s", name, e) + return None module = sys.modules[name] - return module.Emitter(**kwargs) + try: + return module.Emitter(**kwargs) + except Exception as e: # if the Emitter class is not present, or fails to instantiate + logger.error("Error initializing emitter %s: %s", name, e) + return None + if __name__ == '__main__': + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setFormatter( + logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')) + logging.getLogger().addHandler(stderr_handler) + logging.getLogger().setLevel(logging.INFO) + server = Speakeasy('0.0.0.0', '/var/tmp/metrics_socket', '55001', '55002', - 'simple', ['filename=/var/tmp/metrics.out'], 60, + 'simple', ['filename=stderr'], 10, '/var/tmp/metric_socket2') server.start() + logger.info("Started") while True: try: time.sleep(1) - except: - print "Exception... exiting" - server.shutdown() + except Exception as e: + logger.info("Exception (%s)... exiting", e) + break + except KeyboardInterrupt: + logger.info("ctrl-c ... quitting") break + server.shutdown() diff --git a/speakeasy/utils.py b/speakeasy/utils.py index 82eaddf..18696d4 100644 --- a/speakeasy/utils.py +++ b/speakeasy/utils.py @@ -1,17 +1,18 @@ import math + def percentile(values, percent): """ Return percentile out of list of values """ if not values: - return + return - k = (len(values)-1) * percent + k = (len(values) - 1) * percent f = math.floor(k) c = math.ceil(k) if f == c: return values[int(k)] - d0 = values[int(f)] * (c-k) - d1 = values[int(c)] * (k-f) - return d0+d1 + d0 = values[int(f)] * (c - k) + d1 = values[int(c)] * (k - f) + return d0 + d1 diff --git a/test/test_speakeasy.py b/test/test_speakeasy.py index b1059cf..d4020eb 100644 --- a/test/test_speakeasy.py +++ b/test/test_speakeasy.py @@ -5,11 +5,19 @@ import zmq from test_util import get_random_free_port from speakeasy.speakeasy import Speakeasy +import speakeasy.speakeasy as speakeasy +import logging + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') +logger = logging.getLogger() G_SPEAKEASY_HOST = '0.0.0.0' G_PUB_PORT = str(get_random_free_port()) G_CMD_PORT = str(get_random_free_port()) G_METRIC_SOCKET = '/var/tmp/test_metric_{0}'.format(random.random()) +speakeasy.QUEUE_WAIT_SECS = 1 def gen_speakeasy_server(): @@ -20,8 +28,9 @@ def gen_speakeasy_server(): class TestSpeakeasy(unittest.TestCase): @classmethod def setUpClass(self): + logger.info("**TESTS**: Setup starts") self.srv = gen_speakeasy_server() - + logger.info("**TESTS**: Server generated") self.sub_socket = zmq.Context().socket(zmq.SUB) self.sub_socket.connect('tcp://localhost:{0}'.format(G_PUB_PORT)) self.sub_socket.setsockopt(zmq.SUBSCRIBE, '') @@ -30,10 +39,18 @@ def setUpClass(self): self.poller.register(self.sub_socket, zmq.POLLIN) self.srv.start() + logger.info("**TESTS**: Setup ends") @classmethod def tearDownClass(self): + logger.info("**TESTS**: Class teardown begins") + logger.info("**TESTS**: Beginning server shutdown") self.srv.shutdown() + logger.info("**TESTS**: Server shutdown complete") + logger.info("**TESTS**: Closing socket") + self.sub_socket.close() + logger.info("**TESTS**: Socket closed.") + logger.info("**TESTS**: Class teardown complete") def setUp(self): pass @@ -59,7 +76,7 @@ def test_0_server_init(self): self.assertEqual(self.srv.emitter_args['filename'], '/var/tmp/test_metrics.out') self.assertEqual(len(self.srv.metrics), 0) - self.assertEqual(self.srv.running, True) + self.assertFalse(self.srv.is_shutdown()) def test_process_metric(self): self.srv.process_metric(['test_app', 'test_metric', 'GAUGE', 1]) diff --git a/test/test_speakeasy_legacy.py b/test/test_speakeasy_legacy.py index 5e06618..085b32a 100644 --- a/test/test_speakeasy_legacy.py +++ b/test/test_speakeasy_legacy.py @@ -7,12 +7,20 @@ import mock from test_util import get_random_free_port from speakeasy.speakeasy import Speakeasy +import speakeasy.speakeasy as speakeasy +import logging + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') +logger = logging.getLogger() G_SPEAKEASY_HOST = '0.0.0.0' G_PUB_PORT = str(get_random_free_port()) G_CMD_PORT = str(get_random_free_port()) G_METRIC_SOCKET = '/var/tmp/test_metric_{0}'.format(random.random()) G_LEGACY_METRIC_SOCKET = '/var/tmp/legacy_metric_{0}'.format(random.random()) +speakeasy.QUEUE_WAIT_SECS = 1 def gen_speakeasy_server(): @@ -56,7 +64,7 @@ def test_0_server_init(self): self.assertEqual(self.srv.emitter_args['filename'], '/var/tmp/test_metrics.out') self.assertEqual(len(self.srv.metrics), 0) - self.assertEqual(self.srv.running, True) + self.assertFalse(self.srv.is_shutdown()) def clear_sub_socket(self): while True: @@ -72,7 +80,10 @@ def test_legacy_socket(self): legacy_sock.connect(G_LEGACY_METRIC_SOCKET) legacy_sock.send('test_cnt3|10|COUNTER') self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + while True: # skip internal metrics + metrics = json.loads(self.sub_socket.recv()) + if metrics[0][1] != 'speakeasy': + break self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'__LEGACY__') @@ -86,11 +97,11 @@ def test_recreate_legacy_socket(self): sf.write('\n') with mock.patch('os.remove'): try: - s = Speakeasy(G_SPEAKEASY_HOST, G_METRIC_SOCKET, - str(get_random_free_port()), - str(get_random_free_port()), 'simple', - ['filename=/var/tmp/test_metrics.out'], - 60, dummy_legacy_socket) + Speakeasy(G_SPEAKEASY_HOST, G_METRIC_SOCKET, + str(get_random_free_port()), + str(get_random_free_port()), 'simple', + ['filename=/var/tmp/test_metrics.out'], + 60, dummy_legacy_socket) except socket.error: # remove got patched, so we should get a address already init # use socket error diff --git a/test/test_util.py b/test/test_util.py index 22a699b..782ef87 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -1,5 +1,9 @@ #!/usr/bin/env python # -*- coding:utf-8 -*- +import zmq +import json +import logging + def get_random_free_port(): import socket @@ -9,3 +13,14 @@ def get_random_free_port(): ipaddr, port = sock.getsockname() sock.close() return port + + +def filtered_metric_recv(sub_socket): + "Receive a single non-speakeasy metric" + while True: # skip internal metrics + try: + metrics = json.loads(sub_socket.recv(zmq.NOBLOCK)) + if metrics[0][1] != 'speakeasy': + return metrics + except zmq.ZMQError as e: + logging.error("**TESTS**: Got error receiving zmq event: %s", e) diff --git a/test/test_zmq_spec.py b/test/test_zmq_spec.py index f031d36..e85bc08 100644 --- a/test/test_zmq_spec.py +++ b/test/test_zmq_spec.py @@ -8,8 +8,15 @@ import random from mock import Mock from speakeasy.speakeasy import Speakeasy -from test_util import get_random_free_port +from test_util import get_random_free_port, filtered_metric_recv +import speakeasy.speakeasy as speakeasy +import logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') +logger = logging.getLogger() +speakeasy.QUEUE_WAIT_SECS = 1 random.seed(time.time()) G_SPEAKEASY_HOST = '0.0.0.0' @@ -78,10 +85,15 @@ def test_send_gauge_to_emitter(self): self.send_socket.send( json.dumps(['test_app', 'test_gauge', 'GAUGE', '2'])) time.sleep(2) - call_list = self.srv.emitter.emit.call_args_list - filtered_calls = [c for c in call_list - if c[0][0] and len(c[0][0][0]) > 0] - m = filtered_calls[-1][0][0][0] + # call_list = self.srv.emitter.emit.call_args_list + filtered_metrics = [] + for call in self.srv.emitter.emit.call_args_list: + args, kwargs = call + for metric in args[0]: + if metric[0] != "speakeasy": + filtered_metrics.append(metric) + self.assertGreater(len(filtered_metrics), 0, "There should be at least one metric being emitted") + m = filtered_metrics[0] self.assertEqual(len(m), 5) self.assertEqual(m[0], u'test_app') self.assertEqual(m[1], u'test_gauge') @@ -94,7 +106,6 @@ def test_send_counter_to_emitter(self): json.dumps(['test_app', 'test_counter', 'COUNTER', '15'])) time.sleep(2) call_list = self.srv.emitter.emit.call_args_list - print call_list filtered_calls = [c for c in call_list if c[0][0] and len(c[0][0][0]) > 0] m = filtered_calls[-1][0][0][0] @@ -109,7 +120,7 @@ def test_send_gauge_to_pub_socket(self): self.send_socket.send( json.dumps(['test_app2', 'test_gauge', 'GAUGE', '5'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'test_app2') @@ -122,7 +133,7 @@ def test_send_counter_to_pub_socket(self): self.send_socket.send( json.dumps(['test_app2', 'test_counter', 'COUNTER', '1'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'test_app2') @@ -135,93 +146,93 @@ def test_send_percentile_to_pub_socket(self): self.send_socket.send( json.dumps(['test_app2', 'test_metric', 'PERCENTILE', '13'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 5) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'test_app2') - self.assertEqual(metrics[0][2], u'test_metric50_percentile') + self.assertEqual(metrics[0][2], u'test_metric.50_percentile') self.assertEqual(metrics[0][3], 'GAUGE') self.assertEqual(metrics[0][4], 13) self.assertEqual(metrics[1][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[1][1], u'test_app2') - self.assertEqual(metrics[1][2], u'test_metric75_percentile') + self.assertEqual(metrics[1][2], u'test_metric.75_percentile') self.assertEqual(metrics[1][3], 'GAUGE') self.assertEqual(metrics[1][4], 13) self.assertEqual(metrics[2][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[2][1], u'test_app2') - self.assertEqual(metrics[2][2], u'test_metric95_percentile') + self.assertEqual(metrics[2][2], u'test_metric.95_percentile') self.assertEqual(metrics[2][3], 'GAUGE') self.assertEqual(metrics[2][4], 13) self.assertEqual(metrics[3][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[3][1], u'test_app2') - self.assertEqual(metrics[3][2], u'test_metric99_percentile') + self.assertEqual(metrics[3][2], u'test_metric.99_percentile') self.assertEqual(metrics[3][3], 'GAUGE') self.assertEqual(metrics[3][4], 13) self.assertEqual(metrics[4][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[4][1], u'test_app2') - self.assertEqual(metrics[4][2], u'test_metricaverage') + self.assertEqual(metrics[4][2], u'test_metric.average') self.assertEqual(metrics[4][3], 'GAUGE') self.assertEqual(metrics[4][4], 13) self.send_socket.send( json.dumps(['test_app2', 'test_metric', 'PERCENTILE', '1'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 5) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'test_app2') - self.assertEqual(metrics[0][2], u'test_metric50_percentile') + self.assertEqual(metrics[0][2], u'test_metric.50_percentile') self.assertEqual(metrics[0][3], 'GAUGE') self.assertEqual(metrics[0][4], 7.0) self.assertEqual(metrics[1][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[1][1], u'test_app2') - self.assertEqual(metrics[1][2], u'test_metric75_percentile') + self.assertEqual(metrics[1][2], u'test_metric.75_percentile') self.assertEqual(metrics[1][3], 'GAUGE') self.assertEqual(metrics[1][4], 10.0) self.assertEqual(metrics[2][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[2][1], u'test_app2') - self.assertEqual(metrics[2][2], u'test_metric95_percentile') + self.assertEqual(metrics[2][2], u'test_metric.95_percentile') self.assertEqual(metrics[2][3], 'GAUGE') self.assertEqual(metrics[2][4], 12.4) self.assertEqual(metrics[3][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[3][1], u'test_app2') - self.assertEqual(metrics[3][2], u'test_metric99_percentile') + self.assertEqual(metrics[3][2], u'test_metric.99_percentile') self.assertEqual(metrics[3][3], 'GAUGE') self.assertEqual(metrics[3][4], 12.880000000000001) self.assertEqual(metrics[4][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[4][1], u'test_app2') - self.assertEqual(metrics[4][2], u'test_metricaverage') + self.assertEqual(metrics[4][2], u'test_metric.average') self.assertEqual(metrics[4][3], 'GAUGE') self.assertEqual(metrics[4][4], 7) self.send_socket.send( json.dumps(['test_app2', 'test_metric', 'PERCENTILE', '5'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 5) self.assertEqual(metrics[0][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[0][1], u'test_app2') - self.assertEqual(metrics[0][2], u'test_metric50_percentile') + self.assertEqual(metrics[0][2], u'test_metric.50_percentile') self.assertEqual(metrics[0][3], 'GAUGE') self.assertEqual(metrics[0][4], 5.0) self.assertEqual(metrics[1][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[1][1], u'test_app2') - self.assertEqual(metrics[1][2], u'test_metric75_percentile') + self.assertEqual(metrics[1][2], u'test_metric.75_percentile') self.assertEqual(metrics[1][3], 'GAUGE') self.assertEqual(metrics[1][4], 9.0) self.assertEqual(metrics[2][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[2][1], u'test_app2') - self.assertEqual(metrics[2][2], u'test_metric95_percentile') + self.assertEqual(metrics[2][2], u'test_metric.95_percentile') self.assertEqual(metrics[2][3], 'GAUGE') self.assertEqual(metrics[2][4], 12.199999999999999) self.assertEqual(metrics[3][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[3][1], u'test_app2') - self.assertEqual(metrics[3][2], u'test_metric99_percentile') + self.assertEqual(metrics[3][2], u'test_metric.99_percentile') self.assertEqual(metrics[3][3], 'GAUGE') self.assertEqual(metrics[3][4], 12.84) self.assertEqual(metrics[4][0], G_SPEAKEASY_HOST) self.assertEqual(metrics[4][1], u'test_app2') - self.assertEqual(metrics[4][2], u'test_metricaverage') + self.assertEqual(metrics[4][2], u'test_metric.average') self.assertEqual(metrics[4][3], 'GAUGE') self.assertEqual(metrics[4][4], 6.3333333332999997) @@ -230,7 +241,8 @@ def test_gauge_avg_calculation(self): self.send_socket.send( json.dumps(['test_app', 'test_gauge', 'GAUGE', '5'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) + self.assertIsNotNone(metrics) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][:-1], [G_SPEAKEASY_HOST, u'test_app', u'test_gauge', @@ -239,7 +251,7 @@ def test_gauge_avg_calculation(self): self.send_socket.send( json.dumps(['test_app', 'test_gauge', 'GAUGE', '3'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][:-1], [G_SPEAKEASY_HOST, u'test_app', u'test_gauge', @@ -248,7 +260,7 @@ def test_gauge_avg_calculation(self): self.send_socket.send( json.dumps(['test_app', 'test_gauge', 'GAUGE', '13'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][:-1], [G_SPEAKEASY_HOST, u'test_app', u'test_gauge', @@ -257,7 +269,7 @@ def test_gauge_avg_calculation(self): self.send_socket.send( json.dumps(['test_app', 'test_gauge', 'GAUGE', '3'])) self.assertGreater(len(dict(self.poller.poll(500))), 0) - metrics = json.loads(self.sub_socket.recv()) + metrics = filtered_metric_recv(self.sub_socket) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0][:-1], [G_SPEAKEASY_HOST, u'test_app', u'test_gauge',