Skip to content

Commit

Permalink
api: add an InfluxDB API endpoint
Browse files Browse the repository at this point in the history
This provides a new InfluxDB-compatible endpoint to write data to Gnocchi at
`/v1/influxdb'.
  • Loading branch information
jd committed Nov 27, 2017
1 parent 2ba006a commit 17ff947
Show file tree
Hide file tree
Showing 11 changed files with 650 additions and 9 deletions.
2 changes: 2 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Gnocchi's main features are:
- Nagios/Icinga support
- Statsd protocol support
- Collectd plugin support
- InfluxDB line protocol ingestion support

Community
---------
Expand All @@ -48,6 +49,7 @@ Documentation
statsd
grafana
prometheus
influxdb
nagios
collectd
alternatives
Expand Down
43 changes: 43 additions & 0 deletions doc/source/influxdb.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
============================
InfluxDB ingestion support
============================

Gnocchi implements some part of the InfluxDB REST API. That allows tool that
are used to write to InfluxDB to write directly to Gnocchi instead, such as
`Telegraf`_.

The endpoint is available at `/v1/influxdb`. It supports:

* `GET /v1/influxdb/ping`
* `POST /v1/influxdb/query` where the only query that is handled is `CREATE
DATABASE <db>`. That will create a new resource type named after the database
handle.
* `POST /v1/influxdb/write?db=<db>`. The `db` parameter should be an existing
resource type that does not require any attributes to be set. The body should
follow the `InfluxDB line protocol`_.

In order to map InfluxDB data to Gnocchi data model, the following
transformation happen when writing metrics:

* For each measure sent, one of the tag value is used as the original resource
id. By default the `host` tag is used. This can be overriden by passing the
`X-Gnocchi-InfluxDB-Tag-Resource-ID` HTTP header.

* The metric names associated to the resource have the format:
`<measurement>.<field_key>[@<tag_key>=<tag_value>,…]`. The tag are sorted
by keys.


Telegraf configuration
======================

In order to use `Telegraf`_ with Gnocchi, you can use the following
configuration example::

[[outputs.influxdb]]
urls = ["http://admin:localhost:8041/v1/influxdb"]
http_headers = {"X-Gnocchi-InfluxDB-Tag-Resource-ID" = "host"}


.. _`Telegraf`: https://github.com/influxdata/telegraf
.. _`InfluxDB line protocol`: https://docs.influxdata.com/influxdb/v1.3/write_protocols/line_protocol_reference/
5 changes: 3 additions & 2 deletions gnocchi/incoming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ def add_measures(self, metric, measures):
def add_measures_batch(self, metrics_and_measures):
"""Add a batch of measures for some metrics.
:param metrics_and_measures: A dict where keys
are metrics and value are measure.
:param metrics_and_measures: A dict where keys are metric objects
and values are a list of
:py:class:`gnocchi.incoming.Measure`.
"""
utils.parallel_map(
self._store_new_measures,
Expand Down
10 changes: 10 additions & 0 deletions gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def __init__(self, type):
"Resource type %s does not exist" % type)
self.type = type

def jsonify(self):
return {
"cause": "Resource type does not exist",
"detail": self.type,
}


class NoSuchMetric(IndexerException):
"""Error raised when a metric does not exist."""
Expand Down Expand Up @@ -224,6 +230,10 @@ def __init__(self, resource):
"Resource %s already exists" % resource)
self.resource = resource

def jsonify(self):
return {"cause": "Resource already exists",
"detail": self.resource}


class ResourceTypeAlreadyExists(IndexerException):
"""Error raised when a resource type already exists."""
Expand Down
11 changes: 7 additions & 4 deletions gnocchi/rest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1965,12 +1965,13 @@ def get_or_create_resource_and_metrics(
target.update(resource_attributes)
enforce("create resource", target)

kwargs = resource_attributes # no copy used since not used after
kwargs['metrics'] = metrics
kwargs['original_resource_id'] = original_resource_id

try:
return pecan.request.indexer.create_resource(
resource_type, rid, creator,
original_resource_id=original_resource_id,
metrics=metrics,
**resource_attributes,
resource_type, rid, creator, **kwargs
).metrics
except indexer.ResourceAlreadyExists:
# NOTE(sileht): ensure the rid is not registered whitin another
Expand Down Expand Up @@ -2042,6 +2043,7 @@ class V1Controller(object):
def __init__(self):
# FIXME(sileht): split controllers to avoid lazy loading
from gnocchi.rest.aggregates import api as agg_api
from gnocchi.rest import influxdb

self.sub_controllers = {
"search": SearchController(),
Expand All @@ -2055,6 +2057,7 @@ def __init__(self):
"capabilities": CapabilityController(),
"status": StatusController(),
"aggregates": agg_api.AggregatesController(),
"influxdb": influxdb.InfluxDBController(),
}
for name, ctrl in self.sub_controllers.items():
setattr(self, name, ctrl)
Expand Down
267 changes: 267 additions & 0 deletions gnocchi/rest/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import time

from gnocchi import incoming
from gnocchi import indexer
from gnocchi.rest import api
from gnocchi import utils

import daiquiri
import numpy
import pbr.version
import pecan
from pecan import rest
import pyparsing
import six
import tenacity
try:
import uwsgi
except ImportError:
uwsgi = None


LOG = daiquiri.getLogger(__name__)


boolean = "False|True|false|true|FALSE|TRUE|F|T|f|t"
boolean = pyparsing.Regex(boolean).setParseAction(
lambda t: t[0].lower()[0] == "t")

quoted_string = pyparsing.QuotedString('"', escChar="\\")
unquoted_string = pyparsing.OneOrMore(
pyparsing.CharsNotIn(" ,=\\") +
pyparsing.Optional(
pyparsing.OneOrMore(
(pyparsing.Literal("\\ ") |
pyparsing.Literal("\\,") |
pyparsing.Literal("\\=") |
pyparsing.Literal("\\")).setParseAction(
lambda s, loc, tok: tok[0][-1])))).setParseAction(
lambda s, loc, tok: "".join(list(tok)))
measurement = tag_key = tag_value = field_key = quoted_string | unquoted_string
number = r"[+-]?\d+(:?\.\d*)?(:?[eE][+-]?\d+)?"
number = pyparsing.Regex(number).setParseAction(
lambda s, loc, tok: float(tok[0]))
integer = (
pyparsing.Word(pyparsing.nums).setParseAction(
lambda s, loc, tok: int(tok[0])) +
pyparsing.Suppress("i")
)
field_value = integer | number | quoted_string
timestamp = pyparsing.Word(pyparsing.nums).setParseAction(
lambda s, loc, tok: numpy.datetime64(int(tok[0]), 'ns'))

line_protocol = (
measurement +
# Tags
pyparsing.Optional(pyparsing.Suppress(",") +
pyparsing.delimitedList(
pyparsing.OneOrMore(
pyparsing.Group(
tag_key +
pyparsing.Suppress("=") +
tag_value), ",")).setParseAction(
lambda s, loc, tok: dict(list(tok))),
default={}) +
pyparsing.Suppress(" ") +
# Fields
pyparsing.delimitedList(
pyparsing.OneOrMore(
pyparsing.Group(field_key +
pyparsing.Suppress("=") +
field_value), ",")).setParseAction(
lambda s, loc, tok: dict(list(tok))) +
# Timestamp
pyparsing.Optional(pyparsing.Suppress(" ") + timestamp, default=None)
).leaveWhitespace()


query_parser = (
pyparsing.Suppress(pyparsing.CaselessLiteral("create")) +
pyparsing.Suppress(pyparsing.CaselessLiteral("database")) +
pyparsing.Suppress(pyparsing.White()) +
(pyparsing.QuotedString('"', escChar="\\") |
pyparsing.Word(pyparsing.alphas + "_",
pyparsing.alphanums + "_")) +
pyparsing.Suppress(
pyparsing.Optional(pyparsing.Optional(pyparsing.White()) +
pyparsing.Optional(pyparsing.Literal(";"))))
)


class InfluxDBController(rest.RestController):
_custom_actions = {
'ping': ['HEAD', 'GET'],
'query': ['POST'],
'write': ['POST'],
}

DEFAULT_TAG_RESOURCE_ID = "host"

@pecan.expose()
def ping(self):
pecan.response.headers['X-Influxdb-Version'] = (
"Gnocchi " + pbr.version.VersionInfo('gnocchi').version_string()
)

@pecan.expose('json')
def post_query(self, q=None):
if q is not None:
try:
query = query_parser.parseString(q)
except pyparsing.ParseException:
api.abort(501, {"cause": "Not implemented error",
"detail": "q",
"reason": "Query not implemented"})
resource_type = query[0]
api.enforce("create resource type", {"name": resource_type})
schema = pecan.request.indexer.get_resource_type_schema()
rt = schema.resource_type_from_dict(resource_type, {}, 'creating')
try:
pecan.request.indexer.create_resource_type(rt)
except indexer.ResourceTypeAlreadyExists:
pass
pecan.response.status = 204

@staticmethod
def _write_get_lines():
encoding = pecan.request.headers.get('Transfer-Encoding', "").lower()
if encoding == "chunked":
if uwsgi is None:
api.abort(
501, {"cause": "Not implemented error",
"reason": "This server is not running with uwsgi"})
return encoding, uwsgi.chunked_read()
return None, pecan.request.body

@pecan.expose('json')
def post_write(self, db="influxdb"):

creator = pecan.request.auth_helper.get_current_user(pecan.request)
tag_to_rid = pecan.request.headers.get(
"X-Gnocchi-InfluxDB-Tag-Resource-ID",
self.DEFAULT_TAG_RESOURCE_ID)

while True:
encoding, chunk = self._write_get_lines()

# If chunk is empty then this is over.
if not chunk:
break

# Compute now on a per-chunk basis
now = numpy.datetime64(int(time.time() * 10e8), 'ns')

# resources = { resource_id: {
# metric_name: [ incoming.Measure(t, v), …], …
# }, …
# }
resources = collections.defaultdict(
lambda: collections.defaultdict(list))
for line_number, line in enumerate(chunk.split(b"\n")):
# Ignore empty lines
if not line:
continue

try:
measurement, tags, fields, timestamp = (
line_protocol.parseString(line.decode())
)
except (UnicodeDecodeError, SyntaxError,
pyparsing.ParseException):
api.abort(400, {
"cause": "Value error",
"detail": "line",
"reason": "Unable to parse line %d" % (
line_number + 1),
})

if timestamp is None:
timestamp = now

try:
resource_id = tags.pop(tag_to_rid)
except KeyError:
api.abort(400, {
"cause": "Value error",
"detail": "key",
"reason": "Unable to find key `%s' in tags" % (
tag_to_rid),
})

tags_str = (("@" if tags else "") +
",".join(("%s=%s" % (k, tags[k]))
for k in sorted(tags)))

for field_name, field_value in six.iteritems(fields):
if isinstance(field_value, str):
# We do not support field value that are not numerical
continue

# Metric name is the:
# <measurement>.<field_key>@<tag_key>=<tag_value>,…
# with tag ordered
# Replace "/" with "_" because Gnocchi does not support /
# in metric names
metric_name = (
measurement + "." + field_name + tags_str
).replace("/", "_")

resources[resource_id][metric_name].append(
incoming.Measure(timestamp, field_value))

measures_to_batch = {}
for resource_name, metrics_and_measures in six.iteritems(
resources):
resource_name = resource_name
resource_id = utils.ResourceUUID(
resource_name, creator=creator)
LOG.debug("Getting metrics from resource `%s'", resource_name)
timeout = pecan.request.conf.api.operation_timeout
try:
metrics = (
api.get_or_create_resource_and_metrics.retry_with(
stop=tenacity.stop_after_delay(timeout))(
creator, resource_id, resource_name,
metrics_and_measures.keys(),
{}, db)
)
except indexer.ResourceAlreadyExists as e:
# If this function raises ResourceAlreadyExists it means
# the resource might already exist as another type, we
# can't continue.
LOG.error("Unable to create resource `%s' for InfluxDB, "
"it might already exists as another "
"resource type than `%s'", resource_name, db)
api.abort(400, e)

for metric in metrics:
api.enforce("post measures", metric)

measures_to_batch.update(
dict((metric, metrics_and_measures[metric.name])
for metric in metrics
if metric.name in metrics_and_measures))

LOG.debug("Add measures batch for %d metrics",
len(measures_to_batch))
pecan.request.incoming.add_measures_batch(measures_to_batch)
pecan.response.status = 204

if encoding != "chunked":
return
Loading

0 comments on commit 17ff947

Please sign in to comment.