diff --git a/doc/source/index.rst b/doc/source/index.rst index b700ea7be..f75295f7d 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -24,6 +24,7 @@ Gnocchi's main features are: - Nagios/Icinga support - Statsd protocol support - Collectd plugin support +- InfluxDB line protocol ingestion support Community --------- @@ -123,6 +124,7 @@ Documentation grafana nagios collectd + influxdb glossary releasenotes/index.rst contributing diff --git a/doc/source/influxdb.rst b/doc/source/influxdb.rst new file mode 100644 index 000000000..10cb13d80 --- /dev/null +++ b/doc/source/influxdb.rst @@ -0,0 +1,43 @@ +============================ + InfluxDB ingestion support +============================ + +Gnocchi implements some part of the InfluxDB REST API. That allows tool that +are used to write to InfluxDB to write directly to Gnocchi instead, such as +`Telegraf`_. + +The endpoint is available at `/v1/influxdb`. It supports: + +* `GET /v1/influxdb/ping` +* `POST /v1/influxdb/query` where the only query that is handled is `CREATE + DATABASE `. That will create a new resource type named after the database + handle. +* `POST /v1/influxdb/write?db=`. The `db` parameter should be an existing + resource type that does not require any attributes to be set. The body should + follow the `InfluxDB line protocol`_. + +In order to map InfluxDB data to Gnocchi data model, the following +transformation happen when writing metrics: + +* For each measure sent, one of the tag value is used as the original resource + id. By default the `host` tag is used. This can be overriden by passing the + `X-Gnocchi-InfluxDB-Tag-Resource-ID` HTTP header. + +* The metric names associated to the resource have the format: + `.[@=,…]`. The tag are sorted + by keys. + + +Telegraf configuration +====================== + +In order to use `Telegraf`_ with Gnocchi, you can use the following +configuration example:: + + [[outputs.influxdb]] + urls = ["http://admin:localhost:8041/v1/influxdb"] + http_headers = {"X-Gnocchi-InfluxDB-Tag-Resource-ID" = "host"} + + +.. _`Telegraf`: https://github.com/influxdata/telegraf +.. _`InfluxDB line protocol`: https://docs.influxdata.com/influxdb/v1.3/write_protocols/line_protocol_reference/ diff --git a/gnocchi/incoming/__init__.py b/gnocchi/incoming/__init__.py index 0e76a2b15..f3c66c586 100644 --- a/gnocchi/incoming/__init__.py +++ b/gnocchi/incoming/__init__.py @@ -118,8 +118,9 @@ def add_measures(self, metric, measures): def add_measures_batch(self, metrics_and_measures): """Add a batch of measures for some metrics. - :param metrics_and_measures: A dict where keys - are metrics and value are measure. + :param metrics_and_measures: A dict where keys are metric objects + and values are a list of + :py:class:`gnocchi.incoming.Measure`. """ utils.parallel_map( self._store_new_measures, diff --git a/gnocchi/indexer/__init__.py b/gnocchi/indexer/__init__.py index 9163bf135..f4af30e71 100644 --- a/gnocchi/indexer/__init__.py +++ b/gnocchi/indexer/__init__.py @@ -115,6 +115,12 @@ def __init__(self, type): "Resource type %s does not exist" % type) self.type = type + def jsonify(self): + return { + "cause": "Resource type does not exist", + "detail": self.type, + } + class NoSuchMetric(IndexerException): """Error raised when a metric does not exist.""" @@ -224,6 +230,10 @@ def __init__(self, resource): "Resource %s already exists" % resource) self.resource = resource + def jsonify(self): + return {"cause": "Resource already exists", + "detail": self.resource} + class ResourceTypeAlreadyExists(IndexerException): """Error raised when a resource type already exists.""" diff --git a/gnocchi/rest/api.py b/gnocchi/rest/api.py index 92c62d90d..01b087167 100644 --- a/gnocchi/rest/api.py +++ b/gnocchi/rest/api.py @@ -1895,6 +1895,7 @@ class V1Controller(object): def __init__(self): # FIXME(sileht): split controllers to avoid lazy loading from gnocchi.rest.aggregates import api as agg_api + from gnocchi.rest import influxdb self.sub_controllers = { "search": SearchController(), @@ -1908,6 +1909,7 @@ def __init__(self): "capabilities": CapabilityController(), "status": StatusController(), "aggregates": agg_api.AggregatesController(), + "influxdb": influxdb.InfluxDBController(), } for name, ctrl in self.sub_controllers.items(): setattr(self, name, ctrl) diff --git a/gnocchi/rest/influxdb.py b/gnocchi/rest/influxdb.py new file mode 100644 index 000000000..fe3f15efb --- /dev/null +++ b/gnocchi/rest/influxdb.py @@ -0,0 +1,315 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import collections +import time + +from gnocchi import incoming +from gnocchi import indexer +from gnocchi.rest import api +from gnocchi import utils + +import daiquiri +import numpy +import pbr.version +import pecan +from pecan import rest +import pyparsing +import six +import tenacity +try: + import uwsgi +except ImportError: + uwsgi = None + + +LOG = daiquiri.getLogger(__name__) + + +boolean = "False|True|false|true|FALSE|TRUE|F|T|f|t" +boolean = pyparsing.Regex(boolean).setParseAction( + lambda t: t[0].lower()[0] == "t") + +quoted_string = pyparsing.QuotedString('"', escChar="\\") +unquoted_string = pyparsing.OneOrMore( + pyparsing.CharsNotIn(" ,=\\") + + pyparsing.Optional( + pyparsing.OneOrMore( + (pyparsing.Literal("\\ ") | + pyparsing.Literal("\\,") | + pyparsing.Literal("\\=") | + pyparsing.Literal("\\")).setParseAction( + lambda s, loc, tok: tok[0][-1])))).setParseAction( + lambda s, loc, tok: "".join(list(tok))) +measurement = tag_key = tag_value = field_key = quoted_string | unquoted_string +number = r"[+-]?\d+(:?\.\d*)?(:?[eE][+-]?\d+)?" +number = pyparsing.Regex(number).setParseAction( + lambda s, loc, tok: float(tok[0])) +integer = ( + pyparsing.Word(pyparsing.nums).setParseAction( + lambda s, loc, tok: int(tok[0])) + + pyparsing.Suppress("i") + ) +field_value = integer | number | quoted_string +timestamp = pyparsing.Word(pyparsing.nums).setParseAction( + lambda s, loc, tok: numpy.datetime64(int(tok[0]), 'ns')) + +line_protocol = ( + measurement + + # Tags + pyparsing.Optional(pyparsing.Suppress(",") + + pyparsing.delimitedList( + pyparsing.OneOrMore( + pyparsing.Group( + tag_key + + pyparsing.Suppress("=") + + tag_value), ",")).setParseAction( + lambda s, loc, tok: dict(list(tok))), + default={}) + + pyparsing.Suppress(" ") + + # Fields + pyparsing.delimitedList( + pyparsing.OneOrMore( + pyparsing.Group(field_key + + pyparsing.Suppress("=") + + field_value), ",")).setParseAction( + lambda s, loc, tok: dict(list(tok))) + + # Timestamp + pyparsing.Optional(pyparsing.Suppress(" ") + timestamp, default=None) +).leaveWhitespace() + + +query_parser = ( + pyparsing.Suppress(pyparsing.CaselessLiteral("create")) + + pyparsing.Suppress(pyparsing.CaselessLiteral("database")) + + pyparsing.Suppress(pyparsing.White()) + + (pyparsing.QuotedString('"', escChar="\\") | + pyparsing.Word(pyparsing.alphas + "_", + pyparsing.alphanums + "_")) + + pyparsing.Suppress( + pyparsing.Optional(pyparsing.Optional(pyparsing.White()) + + pyparsing.Optional(pyparsing.Literal(";")))) +) + + +class InfluxDBController(rest.RestController): + _custom_actions = { + 'ping': ['HEAD', 'GET'], + 'query': ['POST'], + 'write': ['POST'], + } + + DEFAULT_TAG_RESOURCE_ID = "host" + + @pecan.expose() + def ping(self): + pecan.response.headers['X-Influxdb-Version'] = ( + "Gnocchi " + pbr.version.VersionInfo('gnocchi').version_string() + ) + + @pecan.expose('json') + def post_query(self, q=None): + if q is not None: + try: + query = query_parser.parseString(q) + except pyparsing.ParseException: + api.abort(501, {"cause": "Not implemented error", + "detail": "q", + "reason": "Querying is not implemented"}) + resource_type = query[0] + api.enforce("create resource type", {"name": resource_type}) + schema = pecan.request.indexer.get_resource_type_schema() + rt = schema.resource_type_from_dict(resource_type, {}, 'creating') + try: + pecan.request.indexer.create_resource_type(rt) + except indexer.ResourceTypeAlreadyExists: + pass + pecan.response.status = 204 + + @staticmethod + def _write_get_lines(): + encoding = pecan.request.headers.get('Transfer-Encoding', "").lower() + if encoding == "chunked": + if uwsgi is None: + api.abort( + 501, {"cause": "Not implemented error", + "reason": "This server is not running with uwsgi"}) + return encoding, uwsgi.chunked_read() + return None, pecan.request.body + + @pecan.expose('json') + def post_write(self, db="influxdb"): + + creator = pecan.request.auth_helper.get_current_user(pecan.request) + tag_to_rid = pecan.request.headers.get( + "X-Gnocchi-InfluxDB-Tag-Resource-ID", + self.DEFAULT_TAG_RESOURCE_ID) + + while True: + encoding, chunk = self._write_get_lines() + + # If chunk is empty then this is over. + if not chunk: + break + + # Compute now on a per-chunk basis + now = numpy.datetime64(int(time.time() * 10e8), 'ns') + + # resources = { resource_id: { + # metric_name: [ incoming.Measure(t, v), …], … + # }, … + # } + resources = collections.defaultdict( + lambda: collections.defaultdict(list)) + for line_number, line in enumerate(chunk.split(b"\n")): + # Ignore empty lines + if not line: + continue + + try: + measurement, tags, fields, timestamp = ( + line_protocol.parseString(line) + ) + except pyparsing.ParseException: + api.abort(400, { + "cause": "Value error", + "detail": "line", + "reason": "Unable to parse line %d" % ( + line_number + 1), + }) + + if timestamp is None: + timestamp = now + + try: + resource_id = tags.pop(tag_to_rid) + except KeyError: + api.abort(400, { + "cause": "Value error", + "detail": "key", + "reason": "Unable to find key `%s' in tags" % ( + tag_to_rid), + }) + + tags_str = (("@" if tags else "") + + ",".join(("%s=%s" % (k, tags[k])) + for k in sorted(tags))) + + for field_name, field_value in six.iteritems(fields): + if isinstance(field_value, str): + # We do not support field value that are not numerical + continue + + # Metric name is the: + # .@=,… + # with tag ordered + # Replace "/" with "_" because Gnocchi does not support / + # in metric names + metric_name = ( + measurement + "." + field_name + tags_str + ).replace("/", "_") + + resources[resource_id][metric_name].append( + incoming.Measure(timestamp, field_value)) + + measures_to_batch = {} + for resource_name, metrics_and_measures in six.iteritems( + resources): + resource_name = resource_name + resource_id = utils.ResourceUUID( + resource_name, creator=creator) + LOG.debug("Getting metrics from resource `%s'", resource_name) + timeout = pecan.request.conf.api.operation_timeout + try: + metrics = ( + self.get_or_create_resource_and_metrics.retry_with( + stop=tenacity.stop_after_delay(timeout))( + db, creator, resource_id, resource_name, + metrics_and_measures.keys()) + ) + except indexer.ResourceAlreadyExists as e: + # If this function raises ResourceAlreadyExists it means + # the resource might already exist as another type, we + # can't continue. + LOG.error("Unable to get resource `%s' for InfluxDB, " + "it might already exists as another " + "resource type than `%s'", resource_name, db) + api.abort(400, e) + + for metric in metrics: + api.enforce("post measures", metric) + + measures_to_batch.update( + dict((metric, metrics_and_measures[metric.name]) + for metric in metrics + if metric.name in metrics_and_measures)) + + LOG.debug("Add measures batch for %d metrics", + len(measures_to_batch)) + pecan.request.incoming.add_measures_batch(measures_to_batch) + pecan.response.status = 204 + + if encoding != "chunked": + return + + @staticmethod + @tenacity.retry(retry=tenacity.retry_if_exception_type( + (indexer.NoSuchResource, indexer.NamedMetricAlreadyExists))) + def get_or_create_resource_and_metrics(resource_type, creator, rid, + original_resource_id, + metric_names): + try: + r = pecan.request.indexer.get_resource(resource_type, rid, + with_metrics=True) + except indexer.NoSuchResourceType as e: + pecan.abort(400, e) + + if r: + api.enforce("update resource", r) + exists_metric_names = [m.name for m in r.metrics] + new_metrics = api.MetricsSchema(dict( + (m.decode(), {}) for m in metric_names + if m not in exists_metric_names + )) + if new_metrics: + try: + return pecan.request.indexer.update_resource( + resource_type, rid, + metrics=new_metrics, + append_metrics=True, + ).metrics + except indexer.NoSuchResourceType as e: + pecan.abort(400, e) + return r.metrics + + api.enforce("create resource", {"resource_type": resource_type}) + metrics = api.MetricsSchema(dict((m.decode(), {}) + for m in metric_names)) + try: + return pecan.request.indexer.create_resource( + resource_type, rid, creator, + original_resource_id=original_resource_id, + metrics=metrics, + ).metrics + except indexer.NoSuchResourceType as e: + pecan.abort(400, e) + except indexer.ResourceAlreadyExists: + # NOTE(jd) This error might be raised while the resource does not + # exist with `resource_type' as type. Check if it really exists + # using our `resource_type`: if that's the case, we can retry, + # otherwise it is a fatal error to avoid an infinite loop + if pecan.request.indexer.get_resource(resource_type, rid): + raise tenacity.TryAgain + raise diff --git a/gnocchi/tests/functional/gabbits/influxdb.yaml b/gnocchi/tests/functional/gabbits/influxdb.yaml new file mode 100644 index 000000000..34ea7aedd --- /dev/null +++ b/gnocchi/tests/functional/gabbits/influxdb.yaml @@ -0,0 +1,111 @@ +# Tests for the InfluxDB compatibility layer + +fixtures: + - ConfigFixture + +defaults: + request_headers: + # User admin + authorization: "basic YWRtaW46" + content-type: application/json + +tests: + - name: ping influxdb status with head + desc: test HEAD on ping – xfails because Pecan does not honor HEAD correctly yet + xfail: true + HEAD: /v1/influxdb/ping + status: 204 + + - name: ping influxdb status with get + GET: /v1/influxdb/ping + status: 204 + + - name: create a database + POST: /v1/influxdb/query?q=create+database+influxdbtest + status: 204 + + - name: check the resource type now exists + GET: /v1/resource_type/influxdbtest + status: 200 + response_json_paths: + $: + name: influxdbtest + attributes: {} + state: active + + - name: do an unrecognized query + POST: /v1/influxdb/query?q=select+metrics+plz + request_headers: + # This is useful to get the error in JSON format + accept: application/json + status: 501 + response_json_paths: + $.description.cause: Not implemented error + $.description.detail: q + $.description.reason: Querying is not implemented + + - name: create archive policy + POST: /v1/archive_policy + data: + name: low + definition: + - granularity: 1 hour + status: 201 + + - name: create archive policy for influxdb + POST: /v1/archive_policy_rule + data: + name: influxdb + metric_pattern: "*" + archive_policy_name: low + status: 201 + + - name: write a line + POST: /v1/influxdb/write?db=influxdbtest + request_headers: + content-type: text/plain + data: + "mymetric,host=foobar,mytag=myvalue field=123 1510581804179554816" + status: 204 + + - name: check resource created + GET: /v1/resource/influxdbtest/foobar + status: 200 + response_json_paths: + $.original_resource_id: foobar + $.id: b4d568e4-7af1-5aec-ac3f-9c09fa3685a9 + $.type: influxdbtest + $.creator: admin + + - name: check metric created + GET: /v1/resource/influxdbtest/foobar/metric/mymetric.field@mytag=myvalue + + - name: check measures processed + GET: /v1/resource/influxdbtest/foobar/metric/mymetric.field@mytag=myvalue/measures?refresh=true + response_json_paths: + $: + - ["2017-11-13T14:00:00+00:00", 3600.0, 123.0] + + - name: write lines with different tag resource id + POST: /v1/influxdb/write?db=influxdbtest + request_headers: + content-type: text/plain + X-Gnocchi-InfluxDB-Tag-Resource-ID: mytag + data: + "mymetric,host=foobar,mytag=myvalue field=123 1510581804179554816\ncpu,path=/foobar,mytag=myvalue field=43i 1510581804179554816" + status: 204 + + - name: check resource created with different resource id + GET: /v1/resource/influxdbtest/myvalue + status: 200 + response_json_paths: + $.original_resource_id: myvalue + $.id: 6b9e2039-98d0-5d8d-9153-2d7491cf13e5 + $.type: influxdbtest + $.creator: admin + + - name: check metric created different tag resource id + GET: /v1/resource/influxdbtest/myvalue/metric/mymetric.field@host=foobar + + - name: check metric created different tag resource id and slash replaced + GET: /v1/resource/influxdbtest/myvalue/metric/cpu.field@path=_foobar diff --git a/gnocchi/tests/functional/gabbits/resource.yaml b/gnocchi/tests/functional/gabbits/resource.yaml index d8267f1a3..d3d01231d 100644 --- a/gnocchi/tests/functional/gabbits/resource.yaml +++ b/gnocchi/tests/functional/gabbits/resource.yaml @@ -58,9 +58,9 @@ tests: redirects: true response_json_paths: $.version: "1.0" - $.links.`len`: 12 + $.links.`len`: 13 $.links[0].href: $SCHEME://$NETLOC/v1 - $.links[8].href: $SCHEME://$NETLOC/v1/resource + $.links[9].href: $SCHEME://$NETLOC/v1/resource - name: root of resource GET: /v1/resource diff --git a/gnocchi/tests/indexer/sqlalchemy/test_migrations.py b/gnocchi/tests/indexer/sqlalchemy/test_migrations.py index c29fcc907..500d7f5fa 100644 --- a/gnocchi/tests/indexer/sqlalchemy/test_migrations.py +++ b/gnocchi/tests/indexer/sqlalchemy/test_migrations.py @@ -19,7 +19,7 @@ import oslo_db.exception from oslo_db.sqlalchemy import test_migrations import six -import sqlalchemy as sa +import sqlalchemy.schema import sqlalchemy_utils from gnocchi import indexer @@ -49,6 +49,20 @@ def setUp(self): self.index = indexer.get_driver(self.conf) self.index.upgrade(nocreate=True) self.addCleanup(self._drop_database) + # NOTE(sileht): remove tables dynamically created by other tests + valid_resource_type_tables = [] + for rt in self.index.list_resource_types(): + valid_resource_type_tables.append(rt.tablename) + valid_resource_type_tables.append("%s_history" % rt.tablename) + # NOTE(sileht): load it in sqlalchemy metadata + self.index._RESOURCE_TYPE_MANAGER.get_classes(rt) + + for table in sqlalchemy_base.Base.metadata.sorted_tables: + if (table.name.startswith("rt_") and + table.name not in valid_resource_type_tables): + sqlalchemy_base.Base.metadata.remove(table) + self.index._RESOURCE_TYPE_MANAGER._cache.pop( + table.name.replace('_history', ''), None) def _drop_database(self): try: @@ -63,27 +77,3 @@ def get_metadata(): def get_engine(self): return self.index.get_engine() - - def db_sync(self, engine): - # NOTE(sileht): We ensure all resource type sqlalchemy model are loaded - # in this process - for rt in self.index.list_resource_types(): - if rt.state == "active": - self.index._RESOURCE_TYPE_MANAGER.get_classes(rt) - - def filter_metadata_diff(self, diff): - tables_to_keep = [] - for rt in self.index.list_resource_types(): - if rt.name.startswith("indexer_test"): - tables_to_keep.extend([rt.tablename, - "%s_history" % rt.tablename]) - new_diff = [] - for line in diff: - if len(line) >= 2: - item = line[1] - # NOTE(sileht): skip resource types created for tests - if (isinstance(item, sa.Table) - and item.name in tables_to_keep): - continue - new_diff.append(line) - return new_diff diff --git a/gnocchi/tests/test_influxdb.py b/gnocchi/tests/test_influxdb.py new file mode 100644 index 000000000..5bc801bb4 --- /dev/null +++ b/gnocchi/tests/test_influxdb.py @@ -0,0 +1,198 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import numpy +import pyparsing + +from gnocchi.rest import influxdb +from gnocchi.tests import base + + +class TestInfluxDBLineProtocol(base.BaseTestCase): + def test_line_protocol_parser_ok(self): + lines = ( + ('cpu,cpu=cpu2,host=abydos usage_system=11.1,usage_idle=73.2,usage_nice=0,usage_irq=0,usage_user=15.7,usage_softirq=0,usage_steal=0,usage_guest=0,usage_guest_nice=0,usage_iowait=0 1510150170000000000', # noqa + ['cpu', + {'host': 'abydos', + 'cpu': 'cpu2'}, + {'usage_guest': 0.0, + 'usage_nice': 0.0, + 'usage_steal': 0.0, + 'usage_iowait': 0.0, + 'usage_user': 15.7, + 'usage_idle': 73.2, + 'usage_softirq': 0.0, + 'usage_guest_nice': 0.0, + 'usage_irq': 0.0, + 'usage_system': 11.1}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('cpu,cpu=cpu-total,host=abydos usage_idle=79.2198049512378,usage_nice=0,usage_iowait=0,usage_steal=0,usage_guest=0,usage_guest_nice=0,usage_system=9.202300575143786,usage_irq=0,usage_softirq=0,usage_user=11.577894473618404 1510150170000000000', # noqa + ['cpu', + {'cpu': 'cpu-total', + 'host': 'abydos'}, + {'usage_guest': 0.0, + 'usage_guest_nice': 0.0, + 'usage_idle': 79.2198049512378, + 'usage_iowait': 0.0, + 'usage_irq': 0.0, + 'usage_nice': 0.0, + 'usage_softirq': 0.0, + 'usage_steal': 0.0, + 'usage_system': 9.202300575143786, + 'usage_user': 11.577894473618404}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('diskio,name=disk0,host=abydos io_time=11020501i,iops_in_progress=0i,read_bytes=413847966208i,read_time=9816308i,write_time=1204193i,weighted_io_time=0i,reads=33523907i,writes=7321123i,write_bytes=141510539264i 1510150170000000000', # noqa + ['diskio', + {'host': 'abydos', + 'name': 'disk0'}, + {'io_time': 11020501, + 'iops_in_progress': 0, + 'read_bytes': 413847966208, + 'read_time': 9816308, + 'reads': 33523907, + 'weighted_io_time': 0, + 'write_bytes': 141510539264, + 'write_time': 1204193, + 'writes': 7321123}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('disk,path=/,device=disk1s1,fstype=apfs,host=abydos total=250140434432i,free=28950695936i,used=216213557248i,used_percent=88.19130621205531,inodes_total=9223372036854775807i,inodes_free=9223372036850748963i,inodes_used=4026844i 1510150170000000000', # noqa + ['disk', + {'device': 'disk1s1', 'fstype': 'apfs', + 'host': 'abydos', 'path': '/'}, + {'free': 28950695936, + 'inodes_free': 9223372036850748963, + 'inodes_total': 9223372036854775807, + 'inodes_used': 4026844, + 'total': 250140434432, + 'used': 216213557248, + 'used_percent': 88.19130621205531}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('mem,host=abydos free=16195584i,available_percent=24.886322021484375,used=6452215808i,cached=0i,buffered=0i,active=2122153984i,inactive=2121523200i,used_percent=75.11367797851562,total=8589934592i,available=2137718784i 1510150170000000000', # noqa + ['mem', + {'host': 'abydos'}, + {'active': 2122153984, + 'available': 2137718784, + 'available_percent': 24.886322021484375, + 'buffered': 0, + 'cached': 0, + 'free': 16195584, + 'inactive': 2121523200, + 'total': 8589934592, + 'used': 6452215808, + 'used_percent': 75.11367797851562}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('disk,path=/private/var/vm,device=disk1s4,fstype=apfs,host=abydos inodes_total=9223372036854775807i,inodes_free=9223372036854775803i,inodes_used=4i,total=250140434432i,free=28950695936i,used=4296265728i,used_percent=12.922280752806417 1510150170000000000', # noqa + ['disk', + {'device': 'disk1s4', + 'fstype': 'apfs', + 'host': 'abydos', + 'path': '/private/var/vm'}, + {'free': 28950695936, + 'inodes_free': 9223372036854775803, + 'inodes_total': 9223372036854775807, + 'inodes_used': 4, + 'total': 250140434432, + 'used': 4296265728, + 'used_percent': 12.922280752806417}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('swap,host=abydos used=2689073152i,free=532152320i,used_percent=83.47981770833334,total=3221225472i 1510150170000000000', # noqa + ['swap', + {'host': 'abydos'}, + {'free': 532152320, + 'total': 3221225472, + 'used': 2689073152, + 'used_percent': 83.47981770833334}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('swap,host=abydos in=0i,out=0i 1510150170000000000', + ['swap', + {'host': 'abydos'}, + {'in': 0, 'out': 0}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('processes,host=abydos stopped=0i,running=2i,sleeping=379i,total=382i,unknown=0i,idle=0i,blocked=1i,zombies=0i 1510150170000000000', # noqa + ['processes', + {'host': 'abydos'}, + {'blocked': 1, + 'idle': 0, + 'running': 2, + 'sleeping': 379, + 'stopped': 0, + 'total': 382, + 'unknown': 0, + 'zombies': 0}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('system,host=abydos load5=3.02,load15=3.31,n_users=1i,n_cpus=4i,load1=2.18 1510150170000000000', # noqa + ['system', + {'host': 'abydos'}, + {'load1': 2.18, + 'load15': 3.31, + 'load5': 3.02, + 'n_cpus': 4, + 'n_users': 1}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('system,host=abydos uptime=337369i,uptime_format="3 days, 21:42" 1510150170000000000', # noqa + ['system', + {'host': 'abydos'}, + {'uptime': 337369, 'uptime_format': '3 days, 21:42'}, + numpy.datetime64('2017-11-08T14:09:30.000000000')]), + ('notag up=1 123234', + ['notag', + {}, + {'up': 1.0}, + numpy.datetime64('1970-01-01T00:00:00.000123234')]), + ('notag up=3 ', ['notag', {}, {'up': 3.0}, None]), + ) + for line, result in lines: + parsed = list(influxdb.line_protocol.parseString(line)) + self.assertEqual(result, parsed) + + def test_line_protocol_parser_fail(self): + lines = ( + "measurement, field=1", + "measurement, field=1 123", + "measurement,tag=value 123", + "measurement,tag=value , 123", + "measurement,tag=value 123", + ",tag=value 123", + "foobar,tag=value field=string 123", + ) + for line in lines: + self.assertRaises(pyparsing.ParseException, + influxdb.line_protocol.parseString, + line) + + def test_query_parser_ok(self): + lines = ( + "CREATE DATABASE foobar;", + "CREATE DATABASE foobar ;", + "CREATE DATABASE foobar ;;;", + "CrEaTe Database foobar", + "create Database foobar", + ) + for line in lines: + parsed = list(influxdb.query_parser.parseString(line))[0] + self.assertEqual("foobar", parsed) + + def test_query_parser_fail(self): + lines = ( + "SELECT", + "hey yo foobar;", + "help database foobar;", + "something weird", + "create stuff foobar", + ) + for line in lines: + self.assertRaises(pyparsing.ParseException, + influxdb.query_parser.parseString, + line) diff --git a/releasenotes/notes/influxdb-endpoint-13cbd82cf287d91c.yaml b/releasenotes/notes/influxdb-endpoint-13cbd82cf287d91c.yaml new file mode 100644 index 000000000..75bb6d2fe --- /dev/null +++ b/releasenotes/notes/influxdb-endpoint-13cbd82cf287d91c.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Gnocchi now provides a new `/v1/influxdb` endpoint that allows to ingest + data from InfluxDB clients. Only write is implemented. This should ease + transition of users coming from InfluxDB tools such as Telegraf. diff --git a/requirements.txt b/requirements.txt index 1a9360994..233f14e49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ ujson voluptuous werkzeug trollius; python_version < '3.4' -tenacity>=4.2.0 # Apache-2.0 +tenacity>=4.6.0 WebOb>=1.4.1 Paste PasteDeploy