Skip to content

Commit

Permalink
feat: add ext_default_encoder with prometheus encoder (#1704)
Browse files Browse the repository at this point in the history
* feat: add encoder extension interface in pkg/pipeline/extensions

* feat: add prometheus encoder whitch implements extension.Encoder interface

* feat: add ext_default_encoder extension plugin

* refactor: rename package name, var name of encoder

* docs: add docs for ext_default_encoder & update docs of other extensions

* docs: add ext_default_encoder doc in docs/cn/plugins/overview.md

* fix: fix build failed for package referenece

* chore: update pkg/go.mod to solve lint failed

* fix: fix ci-lint error
  • Loading branch information
aiops1900 authored Aug 22, 2024
1 parent 66be629 commit e32c782
Show file tree
Hide file tree
Showing 17 changed files with 1,243 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@
* [Pulsar](plugins/flusher/flusher-pulsar.md)
* [HTTP](plugins/flusher/flusher-http.md)
* [Loki](plugins/flusher/loki.md)
* [扩展](plugins/extension/README.md)
* [BasicAuth鉴权](plugins/extension/ext-basicauth.md)
* [协议解码/反序列化](plugins/extension/ext-default-decoder.md)
* [协议编码/序列化](plugins/extension/ext-default-encoder.md)
* [数据筛选](plugins/extension/ext-groupinfo-filter.md)
* [请求熔断](plugins/extension/ext-request-breaker.md)

## 工作原理 <a href="#principle" id="principle"></a>

Expand Down
3 changes: 3 additions & 0 deletions docs/cn/plugins/extension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 扩展

扩展插件用于对其它插件能力的补充(e.g. 鉴权、编解码、熔断、限流……)
37 changes: 37 additions & 0 deletions docs/cn/plugins/extension/ext-default-encoder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# DefaultEncoder Encoder扩展

## 简介

[ext_default_encoder](https://github.com/alibaba/ilogtail/blob/main/plugins/extension/default_encoder/default_encoder.go)
扩展,实现了 [Encoder](https://github.com/alibaba/ilogtail/blob/main/pkg/pipeline/extensions/encoder.go) 接口,可以用在
`flusher_http` 等插件中用于序列化不同的协议数据。

## 版本

[Alpha](../stability-level.md)

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|-------------|--------|------|-----------------------------------------------------------------------------------------------------------|
| Format | String || 具体的协议,[查看支持的具体协议列表](https://github.com/alibaba/ilogtail/blob/master/pkg/protocol/encoder/common/comon.go) |
| SeriesLimit | Int || 触发序列化时序切片的最大长度,默认 1000,仅针对 Format=prometheus 时有效 |

## 样例

使用 `flusher_http` flusher 插件,配置发送 `prometheus` 协议数据。

```yaml
enable: true
flushers:
- Type: flusher_http
...
Encoder:
Type: ext_default_encoder/prometheus
...
...
extensions:
- Type: ext_default_encoder/prometheus
Format: 'prometheus'
SeriesLimit: 1024
```
6 changes: 6 additions & 0 deletions docs/cn/plugins/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,9 @@
| 名称 | 提供方 | 简介 |
|----------------------------------------------------------------------------|-------------------------------------------------|-----------------------------|
| [`ext_default_decoder`](extension/ext-default-decoder.md)<br> 默认的decoder扩展 | 社区<br>[`snakorse`](https://github.com/snakorse) | 将内置支持的Format以Decoder扩展的形式封装 |

### Encoder

| 名称 | 提供方 | 简介 |
|----------------------------------------------------------------------------|--------------------------------------------------------|-----------------------------|
| [`ext_default_encoder`](extension/ext-default-encoder.md)<br> 默认的encoder扩展 | 社区<br>[`yuanshuai.1900`](https://github.com/aiops1900) | 将内置支持的Format以Encoder扩展的形式封装 |
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/ClickHouse/clickhouse-go/v2 v2.6.0
github.com/IBM/sarama v1.42.2
github.com/VictoriaMetrics/VictoriaMetrics v1.83.1
github.com/VictoriaMetrics/VictoriaMetrics v1.83.0
github.com/alibaba/ilogtail/pkg v0.0.0
github.com/apache/pulsar-client-go v0.10.0
github.com/buger/jsonparser v1.1.1
Expand Down Expand Up @@ -44,7 +44,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.8.0
github.com/pyroscope-io/pyroscope v0.37.2
github.com/pyroscope-io/pyroscope v1.5.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v1.7.2
Expand Down
4 changes: 3 additions & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/Microsoft/go-winio v0.5.2
github.com/VictoriaMetrics/VictoriaMetrics v1.83.0
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.2.0
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
Expand All @@ -18,12 +19,13 @@ require (
github.com/influxdata/telegraf v1.20.0
github.com/json-iterator/go v1.1.12
github.com/mailru/easyjson v0.7.7
github.com/mitchellh/mapstructure v1.4.2
github.com/narqo/go-dogstatsd-parser v0.2.0
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/prometheus/common v0.42.0
github.com/prometheus/prometheus v1.8.2-0.20210430082741-2a4b8e12bbf2
github.com/pyroscope-io/jfr-parser v0.6.0
github.com/pyroscope-io/pyroscope v0.0.0-00010101000000-000000000000
github.com/pyroscope-io/pyroscope v1.5.0
github.com/richardartoul/molecule v1.0.0
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 4 additions & 0 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKEN
github.com/hetznercloud/hcloud-go v1.24.0/go.mod h1:3YmyK8yaZZ48syie6xpm3dt26rtB6s65AisBHylXYFA=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail h1:LRDJt9eUKKhHdwPJRbC6tgtiMs/0XTjlCz1dl2pzRt0=
github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail/go.mod h1:JagjwAO58g1WNpyr6x/lrQqMTf99d/WU/yxjADxBz8E=
github.com/iLogtail/handy v0.0.0-20230327021402-6a47ec586270/go.mod h1:6ai2R0qBm3xL13e10jwvyIf91Spxvo/yREZE9KOz7so=
github.com/iLogtail/jfr-parser v0.6.0 h1:dNaQ0Ng2BLE5uxrhUQwtx1q7O9LIQFpMthl3SV326AU=
github.com/iLogtail/jfr-parser v0.6.0/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8=
Expand Down Expand Up @@ -944,6 +946,8 @@ github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
Expand Down
53 changes: 53 additions & 0 deletions pkg/pipeline/extensions/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package extensions

import (
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
)

// Encoder encodes data of iLogtail data models into bytes.
// Different drivers with different encoding protocols implement Encoder interface.
//
// drivers: raw, influxdb, prometheus, sls, ...
type Encoder interface {
EncoderV1
EncoderV2
}

// EncoderV1 supports v1 pipeline plugin interface,
// encodes data of v1 model into bytes.
//
// drivers: sls, influxdb, ...
type EncoderV1 interface {
EncodeV1(*protocol.LogGroup) ([][]byte, error)
EncodeBatchV1([]*protocol.LogGroup) ([][]byte, error)
}

// EncoderV2 supports v2 pipeline plugin interface,
// encodes data of v2 model into bytes.
//
// drivers: raw, influxdb, prometheus, ...
type EncoderV2 interface {
EncodeV2(*models.PipelineGroupEvents) ([][]byte, error)
EncodeBatchV2([]*models.PipelineGroupEvents) ([][]byte, error)
}

type EncoderExtension interface {
Encoder
pipeline.Extension
}
19 changes: 19 additions & 0 deletions pkg/protocol/encoder/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

const (
ProtocolPrometheus = "prometheus"
)
40 changes: 40 additions & 0 deletions pkg/protocol/encoder/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoder

import (
"fmt"
"strings"

"github.com/mitchellh/mapstructure"

"github.com/alibaba/ilogtail/pkg/pipeline/extensions"
"github.com/alibaba/ilogtail/pkg/protocol/encoder/common"
"github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus"
)

func NewEncoder(format string, options map[string]any) (extensions.Encoder, error) {
switch strings.TrimSpace(strings.ToLower(format)) {
case common.ProtocolPrometheus:
var opt prometheus.Option
if err := mapstructure.Decode(options, &opt); err != nil {
return nil, err
}
return prometheus.NewPromEncoder(opt.SeriesLimit), nil

default:
return nil, fmt.Errorf("not supported encode format: %s", format)
}
}
126 changes: 126 additions & 0 deletions pkg/protocol/encoder/prometheus/encoder_prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheus

import (
"context"
"errors"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline/extensions"
"github.com/alibaba/ilogtail/pkg/protocol"
)

const defaultSeriesLimit = 1000

var errNilOrZeroGroupEvents = errors.New("nil or zero group events")

type Option struct {
SeriesLimit int // config for prometheus encoder
}

func NewPromEncoder(seriesLimit int) extensions.Encoder {
return newPromEncoder(seriesLimit)
}

type Encoder struct {
SeriesLimit int
}

func newPromEncoder(seriesLimit int) *Encoder {
if seriesLimit <= 0 {
seriesLimit = defaultSeriesLimit
}

return &Encoder{
SeriesLimit: seriesLimit,
}
}

func (p *Encoder) EncodeV1(logGroups *protocol.LogGroup) ([][]byte, error) {
// TODO implement me
return nil, nil
}

func (p *Encoder) EncodeBatchV1(logGroups []*protocol.LogGroup) ([][]byte, error) {
// TODO implement me
return nil, nil
}

func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, error) {
if groupEvents == nil || len(groupEvents.Events) == 0 {
return nil, errNilOrZeroGroupEvents
}

var res [][]byte

wr := getWriteRequest(p.SeriesLimit)
defer putWriteRequest(wr)

for _, event := range groupEvents.Events {
if event == nil {
logger.Debugf(context.Background(), "nil event")
continue
}

if event.GetType() != models.EventTypeMetric {
logger.Debugf(context.Background(), "event type (%s) not metric", event.GetName())
continue
}

metricEvent, ok := event.(*models.Metric)
if !ok {
logger.Debugf(context.Background(), "assert metric event type (%s) failed", event.GetName())
continue
}

wr.Timeseries = append(wr.Timeseries, genPromRemoteWriteTimeseries(metricEvent))
if len(wr.Timeseries) >= p.SeriesLimit {
res = append(res, marshalBatchTimeseriesData(wr))
wr.Timeseries = wr.Timeseries[:0]
}
}

if len(wr.Timeseries) > 0 {
res = append(res, marshalBatchTimeseriesData(wr))
wr.Timeseries = wr.Timeseries[:0]
}

return res, nil
}

func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents) ([][]byte, error) {
if len(groupEventsSlice) == 0 {
return nil, errNilOrZeroGroupEvents
}

var res [][]byte

for _, groupEvents := range groupEventsSlice {
bytes, err := p.EncodeV2(groupEvents)
if err != nil {
continue
}

res = append(res, bytes...)
}

if res == nil {
return nil, errNilOrZeroGroupEvents
}

return res, nil
}
Loading

0 comments on commit e32c782

Please sign in to comment.