From 4eededf1eb0429f2934c9a94a14bcc783ad8448a Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Wed, 14 Aug 2024 16:38:09 +0800 Subject: [PATCH 1/9] feat: add encoder extension interface in pkg/pipeline/extensions --- pkg/pipeline/extensions/encoder.go | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 pkg/pipeline/extensions/encoder.go diff --git a/pkg/pipeline/extensions/encoder.go b/pkg/pipeline/extensions/encoder.go new file mode 100644 index 0000000000..85e5fcb6a5 --- /dev/null +++ b/pkg/pipeline/extensions/encoder.go @@ -0,0 +1,53 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions + +import ( + "github.com/alibaba/ilogtail/pkg/models" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/protocol" +) + +// Encoder encodes data of iLogtail data models into bytes. +// Different drivers with different encoding protocols implement Encoder interface. +// +// drivers: raw, influxdb, prometheus, sls, ... +type Encoder interface { + EncoderV1 + EncoderV2 +} + +// EncoderV1 supports v1 pipeline plugin interface, +// encodes data of v1 model into bytes. +// +// drivers: sls, influxdb, ... +type EncoderV1 interface { + EncodeV1(*protocol.LogGroup) ([][]byte, error) + EncodeBatchV1([]*protocol.LogGroup) ([][]byte, error) +} + +// EncoderV2 supports v2 pipeline plugin interface, +// encodes data of v2 model into bytes. +// +// drivers: raw, influxdb, prometheus, ... +type EncoderV2 interface { + EncodeV2(*models.PipelineGroupEvents) ([][]byte, error) + EncodeBatchV2([]*models.PipelineGroupEvents) ([][]byte, error) +} + +type EncoderExtension interface { + Encoder + pipeline.Extension +} From 76a3d4b65aadf89936095e6b527bd4eeab100e21 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Mon, 19 Aug 2024 16:07:25 +0800 Subject: [PATCH 2/9] feat: add prometheus encoder whitch implements extension.Encoder interface --- pkg/go.mod | 3 +- pkg/go.sum | 2 + pkg/protocol/encoder/encoder.go | 39 ++ .../encoder/prometheus/encoder_prometheus.go | 126 +++++ .../prometheus/encoder_prometheus_test.go | 505 ++++++++++++++++++ pkg/protocol/encoder/prometheus/utils.go | 118 ++++ pkg/protocol/encoder/prometheus/utils_test.go | 96 ++++ pkg/protocol/encoder/protocol.go | 28 + 8 files changed, 916 insertions(+), 1 deletion(-) create mode 100644 pkg/protocol/encoder/encoder.go create mode 100644 pkg/protocol/encoder/prometheus/encoder_prometheus.go create mode 100644 pkg/protocol/encoder/prometheus/encoder_prometheus_test.go create mode 100644 pkg/protocol/encoder/prometheus/utils.go create mode 100644 pkg/protocol/encoder/prometheus/utils_test.go create mode 100644 pkg/protocol/encoder/protocol.go diff --git a/pkg/go.mod b/pkg/go.mod index 0de999b532..35ff26af66 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/Microsoft/go-winio v0.5.2 + github.com/VictoriaMetrics/VictoriaMetrics v1.83.4 github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.2.0 github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 @@ -23,7 +24,7 @@ require ( github.com/prometheus/common v0.42.0 github.com/prometheus/prometheus v1.8.2-0.20210430082741-2a4b8e12bbf2 github.com/pyroscope-io/jfr-parser v0.6.0 - github.com/pyroscope-io/pyroscope v0.0.0-00010101000000-000000000000 + github.com/pyroscope-io/pyroscope v1.5.0 github.com/richardartoul/molecule v1.0.0 github.com/smartystreets/goconvey v1.7.2 github.com/stretchr/testify v1.8.2 diff --git a/pkg/go.sum b/pkg/go.sum index 2b45645951..48a04d58bf 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -783,6 +783,8 @@ github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKEN github.com/hetznercloud/hcloud-go v1.24.0/go.mod h1:3YmyK8yaZZ48syie6xpm3dt26rtB6s65AisBHylXYFA= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= +github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail h1:LRDJt9eUKKhHdwPJRbC6tgtiMs/0XTjlCz1dl2pzRt0= +github.com/iLogtail/VictoriaMetrics v1.83.4-ilogtail/go.mod h1:JagjwAO58g1WNpyr6x/lrQqMTf99d/WU/yxjADxBz8E= github.com/iLogtail/handy v0.0.0-20230327021402-6a47ec586270/go.mod h1:6ai2R0qBm3xL13e10jwvyIf91Spxvo/yREZE9KOz7so= github.com/iLogtail/jfr-parser v0.6.0 h1:dNaQ0Ng2BLE5uxrhUQwtx1q7O9LIQFpMthl3SV326AU= github.com/iLogtail/jfr-parser v0.6.0/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8= diff --git a/pkg/protocol/encoder/encoder.go b/pkg/protocol/encoder/encoder.go new file mode 100644 index 0000000000..00c90cba9e --- /dev/null +++ b/pkg/protocol/encoder/encoder.go @@ -0,0 +1,39 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoder + +import ( + "fmt" + "strings" + + "github.com/mitchellh/mapstructure" + + "github.com/alibaba/ilogtail/pkg/pipeline/extensions" + "github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus" +) + +func NewEncoder(format string, options map[string]any) (extensions.Encoder, error) { + switch strings.TrimSpace(strings.ToLower(format)) { + case protocolPrometheus: + var opt prometheus.Option + if err := mapstructure.Decode(options, &opt); err != nil { + return nil, err + } + return prometheus.NewPromEncoder(opt.SeriesLimit), nil + + default: + return nil, fmt.Errorf("not supported encode format: %s", format) + } +} diff --git a/pkg/protocol/encoder/prometheus/encoder_prometheus.go b/pkg/protocol/encoder/prometheus/encoder_prometheus.go new file mode 100644 index 0000000000..5b4ec2d8c5 --- /dev/null +++ b/pkg/protocol/encoder/prometheus/encoder_prometheus.go @@ -0,0 +1,126 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "context" + "errors" + + "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/models" + "github.com/alibaba/ilogtail/pkg/pipeline/extensions" + "github.com/alibaba/ilogtail/pkg/protocol" +) + +const defaultSeriesLimit = 1000 + +var errNilOrZeroGroupEvents = errors.New("nil or zero group events") + +type Option struct { + SeriesLimit int // config for prometheus encoder +} + +func NewPromEncoder(seriesLimit int) extensions.Encoder { + return newPromEncoder(seriesLimit) +} + +type Encoder struct { + SeriesLimit int +} + +func newPromEncoder(seriesLimit int) *Encoder { + if seriesLimit <= 0 { + seriesLimit = defaultSeriesLimit + } + + return &Encoder{ + SeriesLimit: seriesLimit, + } +} + +func (p *Encoder) EncodeV1(logGroups *protocol.LogGroup) ([][]byte, error) { + // TODO implement me + return nil, nil +} + +func (p *Encoder) EncodeBatchV1(logGroups []*protocol.LogGroup) ([][]byte, error) { + // TODO implement me + return nil, nil +} + +func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, error) { + if groupEvents == nil || len(groupEvents.Events) == 0 { + return nil, errNilOrZeroGroupEvents + } + + var res [][]byte + + wr := getWriteRequest(p.SeriesLimit) + defer putWriteRequest(wr) + + for _, event := range groupEvents.Events { + if event == nil { + logger.Debugf(context.Background(), "nil event") + continue + } + + if event.GetType() != models.EventTypeMetric { + logger.Debugf(context.Background(), "event type (%s) not metric", event.GetName()) + continue + } + + metricEvent, ok := event.(*models.Metric) + if !ok { + logger.Debugf(context.Background(), "assert metric event type (%s) failed", event.GetName()) + continue + } + + wr.Timeseries = append(wr.Timeseries, genPromRemoteWriteTimeseries(metricEvent)) + if len(wr.Timeseries) >= p.SeriesLimit { + res = append(res, marshalBatchTimeseriesData(wr)) + wr.Timeseries = wr.Timeseries[:0] + } + } + + if len(wr.Timeseries) > 0 { + res = append(res, marshalBatchTimeseriesData(wr)) + wr.Timeseries = wr.Timeseries[:0] + } + + return res, nil +} + +func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents) ([][]byte, error) { + if groupEventsSlice == nil || len(groupEventsSlice) == 0 { + return nil, errNilOrZeroGroupEvents + } + + var res [][]byte + + for _, groupEvents := range groupEventsSlice { + bytes, err := p.EncodeV2(groupEvents) + if err != nil { + continue + } + + res = append(res, bytes...) + } + + if res == nil { + return nil, errNilOrZeroGroupEvents + } + + return res, nil +} diff --git a/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go b/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go new file mode 100644 index 0000000000..2091cf5d1d --- /dev/null +++ b/pkg/protocol/encoder/prometheus/encoder_prometheus_test.go @@ -0,0 +1,505 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "errors" + "strconv" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/alibaba/ilogtail/pkg/models" +) + +// 场景:性能测试,确定性能基线(UT粒度) +// 因子:所有 Event type 均为 models.EventTypeMetric +// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric +// 预期:EncodeV2 和 EncodeBatchV2 性能相当(实现上 EncodeBatchV2 会循环调用 EncodeV2) +// Benchmark结果(每次在具体数值上可能会有差异,但数量级相同): +// goos: darwin +// goarch: arm64 +// pkg: github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus +// BenchmarkV2Encode +// BenchmarkV2Encode/EncodeV2 +// BenchmarkV2Encode/EncodeV2-12 685 1655657 ns/op +// BenchmarkV2Encode/BatchEncodeV2 +// BenchmarkV2Encode/BatchEncodeV2-12 716 1639491 ns/op +// PASS +func BenchmarkV2Encode(b *testing.B) { + // given + p := NewPromEncoder(19) + groupEventsSlice := genNormalPipelineGroupEventsSlice(100) + want := append([]*models.PipelineGroupEvents(nil), groupEventsSlice...) + + b.Run("EncodeV2", func(b *testing.B) { + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for _, groupEvents := range groupEventsSlice { + p.EncodeV2(groupEvents) + } + } + }) + assert.Equal(b, want, groupEventsSlice) + + b.Run("BatchEncodeV2", func(b *testing.B) { + b.ResetTimer() + + for i := 0; i < b.N; i++ { + p.EncodeBatchV2(groupEventsSlice) + } + }) + assert.Equal(b, want, groupEventsSlice) +} + +// 场景:V2 Encode接口功能测试 +// 说明:EncodeBatchV2 内部会调用 EncodeV2,所以也同时测试了 EncodeV2 的正常逻辑的功能 +// 因子:所有 Event type 均为 models.EventTypeMetric +// 因子:所有 PipelineEvent interface 的实现均为「正常的*models.Metric」(而不是new(models.Metric)), +// 具体区别在于正常的*models.Metric,其中的Tags、Value等都不是nil(如果为nil,会触发序列化的异常逻辑) +// 预期:V2 Encode逻辑正常(正常流程都能正确处理),返回的error类型为nil,[][]byte不为nil +func TestV2Encode_ShouldReturnNoError_GivenNormalDataOfPipelineGroupEvents(t *testing.T) { + // given + groupEventsSlice1 := genNormalPipelineGroupEventsSlice(100) + groupEventsSlice2 := genNormalPipelineGroupEventsSlice(100) + p := NewPromEncoder(19) + + // when + // then + data1, err1 := p.EncodeBatchV2(groupEventsSlice1) + assert.NoError(t, err1) + data2, err2 := p.EncodeBatchV2(groupEventsSlice2) + assert.NoError(t, err2) + + assert.Equal(t, len(data2), len(data1)) +} + +// 场景:V2 Encode接口功能测试(异常数据,非全nil或0值) +// 说明:尽管 EncodeBatchV2 内部会调用 EncodeV2,但异常情况可能是 EncodeBatchV2 侧的防御, +// 所以还需要测试异常情况下 EncodeV2 的功能 +// 因子:并非所有 Event type 均为 models.EventTypeMetric(e.g. 还可能是 models.EventTypeLogging 等) +// 因子:PipelineEvent interface 的实现,部分是「正常的*models.Metric」,部分为 nil,部分为new(models.Metric), +// 部分为其它(e.g. *models.Log 等) +// 预期:Encode逻辑正常(异常流程也能正确处理),返回的error类型不为nil,[][]byte为nil +func TestV2Encode_ShouldReturnError_GivenAbNormalDataOfPipelineGroupEvents(t *testing.T) { + // given + groupEventsSlice1 := genPipelineGroupEventsSliceIncludingAbnormalData(100) + groupEventsSlice2 := genPipelineGroupEventsSliceIncludingAbnormalData(100) + assert.Equal(t, len(groupEventsSlice1), len(groupEventsSlice2)) + p := NewPromEncoder(19) + + // when + // then + t.Run("Test EncodeV2 with abnormal data input", func(t *testing.T) { + for i, groupEvents := range groupEventsSlice1 { + data1, err1 := p.EncodeV2(groupEvents) + data2, err2 := p.EncodeV2(groupEventsSlice2[i]) + if err1 != nil { + assert.Error(t, err2) + assert.Equal(t, err1, err2) + } else { + assert.NoError(t, err2) + assert.Equal(t, len(data2), len(data1)) + } + } + }) + + t.Run("Test EncodeBatchV2 with abnormal data input", func(t *testing.T) { + data1, err1 := p.EncodeBatchV2(groupEventsSlice1) + assert.NoError(t, err1) + data2, err2 := p.EncodeBatchV2(groupEventsSlice2) + assert.NoError(t, err2) + + assert.Equal(t, len(data2), len(data1)) + }) +} + +// 场景:V2 Encode接口功能测试(异常数据,全nil或0值) +// 说明:尽管 EncodeBatchV2 内部会调用 EncodeV2,但异常情况可能是 EncodeBatchV2 侧的防御, +// 所以还需要测试异常情况下 EncodeV2 的功能 +// 因子:所有 *models.PipelineGroupEvents 及 []*models.PipelineGroupEvents 底层为 nil 或者 长度为0的切片 +// 预期:Encode逻辑正常(异常流程也能正确处理),返回的error类型不为nil,[][]byte为nil +func TestV2Encode_ShouldReturnError_GivenNilOrZeroDataOfPipelineGroupEvents(t *testing.T) { + // given + p := NewPromEncoder(19) + nilOrZeroGroupEventsSlices := []*models.PipelineGroupEvents{ + nil, + {}, // same as {Events: nil}, + {Events: make([]models.PipelineEvent, 0)}, + } + nilOrZeroGroupEventsSlicesEx := [][]*models.PipelineGroupEvents{ + nil, + {}, // same as {nil} + {{Events: nil}}, + nilOrZeroGroupEventsSlices, + } + + // when + // then + t.Run("Test EncodeV2 with nil or zero data input", func(t *testing.T) { + for _, input := range nilOrZeroGroupEventsSlices { + data, err := p.EncodeV2(input) + assert.Error(t, err) + assert.Nil(t, data) + } + }) + + t.Run("Test EncodeBatchV2 with nil or zero data input", func(t *testing.T) { + for _, input := range nilOrZeroGroupEventsSlicesEx { + data, err := p.EncodeBatchV2(input) + assert.Error(t, err) + assert.Nil(t, data) + } + }) +} + +// 场景:V2 Encode接口功能测试 +// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能 +// 因子:所有 Event type 均为 models.EventTypeMetric +// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric +// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个 +// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同) +// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode; +// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试 +// 预期:Encode逻辑正常(正常流程都能正确处理),返回的error类型为nil,[][]byte不为nil,且两次Encode后返回的数据相同 +func TestEncoderBatchV2_ShouldReturnNoErrorAndEqualData_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTag(t *testing.T) { + // given + groupEventsSlice1 := genPipelineGroupEventsSliceSingleTag(100) + groupEventsSlice2 := genPipelineGroupEventsSliceSingleTag(100) + p := NewPromEncoder(19) + + // when + // then + data1, err1 := p.EncodeBatchV2(groupEventsSlice1) + assert.NoError(t, err1) + data2, err2 := p.EncodeBatchV2(groupEventsSlice2) + assert.NoError(t, err2) + + assert.Equal(t, data2, data1) +} + +// 场景:V2 Encode接口功能测试 +// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能 +// 因子:所有 Event type 均为 models.EventTypeMetric +// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric +// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个 +// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同) +// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode; +// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试 +// 因子:对Encode后的数据进行Decode +// 因子:「构造的用例数据」的长度(len([]*models.PipelineGroupEvents))未超过 series limit +// 预期:「构造的用例数据」和「对用例数据先Encode再Decode后的数据」相等 +func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTagNotExceedingSeriesLimit(t *testing.T) { + // given + seriesLimit := 19 + n := seriesLimit + wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n) + p := NewPromEncoder(seriesLimit) + data, err := p.EncodeBatchV2(wantGroupEventsSlice) + assert.NoError(t, err) + + // when + // then + gotGroupEventsSlice, err := DecodeBatchV2(data) + assert.NoError(t, err) + assert.Equal(t, wantGroupEventsSlice, gotGroupEventsSlice) +} + +// 场景:V2 Encode接口功能测试 +// 说明:EncoderBatchV2 内部会调用 EncoderV2,所以也同时测试了 EncoderV2 的功能 +// 因子:所有 Event type 均为 models.EventTypeMetric +// 因子:所有 PipelineEvent interface 的实现均为 *models.Metric +// 因子:每个 metric event 生成的 *models.Metric.Tags 中的 tag 仅一个 +// (确保 Encode 时 range map不用考虑 range go原生map每次顺序随机,从而导致2次Encode相同数据后得到的结果不同) +// PS:如果不这么做,就要对map做转化,先变成 range 保序的 slice,再 Encode; +// 但测试的功能点是Encode,所以采用上述方法绕过range go原生map每次顺序随机的特点,完成功能测试 +// 因子:对Encode后的数据进行Decode +// 因子:「构造的用例数据」的长度(len([]*models.PipelineGroupEvents))超过 series limit +// 预期:「构造的用例数据」的长度小于「对用例数据先Encode再Decode后的数据」的长度,且用 expectedLen 计算后的长度相等 +// PS:expectedLen 的计算方法,其实是和 genPipelineGroupEventsSlice 生成用例及根据series limit确定encode批次 +// 的逻辑相关,和 Encode 本身的逻辑无关 +func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroupEventsWithSingleTagExceedingSeriesLimit(t *testing.T) { + // given + seriesLimit := 19 + n := 100 + wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n) + assert.Equal(t, n, len(wantGroupEventsSlice)) + p := NewPromEncoder(seriesLimit) + data, err := p.EncodeBatchV2(wantGroupEventsSlice) + assert.NoError(t, err) + expectedLen := func(limit, length int) int { + // make sure limit > 0 && length > 0 + if limit <= 0 || length <= 0 { + return -1 + } + + mod := length % limit + mul := length / limit + + res := 0 + for i := 0; i <= mul; i++ { + res += i * limit + } + res += (mul + 1) * mod + + return res + } + + // when + gotGroupEventsSlice, err := DecodeBatchV2(data) + assert.NoError(t, err) + + // then + assert.Equal(t, expectedLen(seriesLimit, n), len(gotGroupEventsSlice)) +} + +func genNormalPipelineGroupEventsSlice(n int) []*models.PipelineGroupEvents { + return genPipelineGroupEventsSlice(n, genPipelineEvent) +} + +func genPipelineGroupEventsSliceIncludingAbnormalData(n int) []*models.PipelineGroupEvents { + return genPipelineGroupEventsSlice(n, genPipelineEventIncludingAbnormalData) +} + +func genPipelineGroupEventsSliceSingleTag(n int) []*models.PipelineGroupEvents { + return genPipelineGroupEventsSlice(n, genPipelineEventSingleTag) +} + +func genPipelineGroupEventsSlice(n int, genPipelineEventFn func(int) []models.PipelineEvent) []*models.PipelineGroupEvents { + res := make([]*models.PipelineGroupEvents, 0, n) + for i := 1; i <= n; i++ { + res = append(res, &models.PipelineGroupEvents{ + Group: models.NewGroup(models.NewMetadata(), models.NewTags()), + Events: genPipelineEventFn(i), + }) + } + + return res +} + +func genPipelineEvent(n int) []models.PipelineEvent { + res := make([]models.PipelineEvent, 0, n) + for i := 1; i <= n; i++ { + res = append(res, genMetric(i)) + } + + return res +} + +func genMetric(n int) *models.Metric { + i := strconv.Itoa(n) + tags := models.NewKeyValues[string]() + tags.AddAll(map[string]string{ + // range map will out of order + "a" + i: "A" + i, + "b" + i: "B" + i, + "c" + i: "C" + i, + "d" + i: "D" + i, + }) + + return &models.Metric{ + Timestamp: 11111111 * uint64(n), + Tags: tags, + Value: &models.MetricSingleValue{Value: 1.1 * float64(n)}, + } +} + +func genPipelineEventIncludingAbnormalData(n int) []models.PipelineEvent { + res := make([]models.PipelineEvent, 0, n) + for i := 1; i <= n; i++ { + if i&1 == 0 { // i is even number + // normal data + res = append(res, genMetric(i)) + continue + } + + // i is odd number + // abnormal data + if i%3 == 0 { + // abnormal data: nil data + res = append(res, nil) + continue + } + + if i%5 == 0 { + // abnormal data: zero data + // PS: + // 1. 这里只是从边界情况考虑,构造了这种异常值 + // 但实际场景中,不会直接 new(models.Metric) 或者 &models.Metric{} 这样创建 zero data, + // 一般都是用 models.NewMetric|NewSingleValueMetric|NewMultiValuesMetric 等 构造函数(工厂模式)来创建, + // 上述构造函数位置:ilogtail/pkg/models/factory.go + // 2. 此外,也可以给 *models.Metric 的 GetTag 方法增加下 *models.Metric.Tag 为 nil 时的保护 + // (参考其 GetValue 方法的实现),文件位置:ilogtail/pkg/models/metric.go + res = append(res, new(models.Metric)) + continue + } + + // abnormal data: other event type not models.EventTypeMetric + res = append(res, new(models.Log)) + } + + return res +} + +func genPipelineEventSingleTag(n int) []models.PipelineEvent { + res := make([]models.PipelineEvent, 0, n) + for i := 1; i <= n; i++ { + res = append(res, genMetricSingleTag(i)) + } + + return res +} + +func genMetricSingleTag(n int) *models.Metric { + metricName := "test_metric" + i := strconv.Itoa(n) + tags := models.NewTagsWithMap(map[string]string{ + // only single tag + // keep range in order + "x" + i: "X" + i, + }) + + dataPoint := pb.Sample{Timestamp: 11111111 * int64(n), Value: 1.1 * float64(n)} + + return models.NewSingleValueMetric( + metricName, // value of key "__name__" in prometheus + models.MetricTypeGauge, + tags, + model.Time(dataPoint.Timestamp).Time().UnixNano(), + dataPoint.Value, + ) +} + +func DecodeBatchV2(data [][]byte) ([]*models.PipelineGroupEvents, error) { + if len(data) == 0 { + return nil, errors.New("no data to decode") + } + + var res []*models.PipelineGroupEvents + + meta, commonTags := models.NewMetadata(), models.NewTags() + for _, d := range data { + groupEvents, err := convertPromRequestToPipelineGroupEvents(d, meta, commonTags) + if err != nil { + continue + } + + res = append(res, groupEvents) + } + + return res, nil +} + +func convertPromRequestToPipelineGroupEvents(data []byte, metaInfo models.Metadata, commonTags models.Tags) (*models.PipelineGroupEvents, error) { + wr, err := unmarshalBatchTimeseriesData(data) + if err != nil { + return nil, err + } + + groupEvent := &models.PipelineGroupEvents{ + Group: models.NewGroup(metaInfo, commonTags), + } + + for _, ts := range wr.Timeseries { + var metricName string + tags := models.NewTags() + for _, label := range ts.Labels { + if label.Name == metricNameKey { + metricName = label.Value + continue + } + tags.Add(label.Name, label.Value) + } + + for _, dataPoint := range ts.Samples { + metric := models.NewSingleValueMetric( + metricName, + models.MetricTypeGauge, + tags, + // Decode (during input_prometheus stage) makes timestamp + // with unix milliseconds into unix nanoseconds, + // e.g. "model.Time(milliseconds).Time().UnixNano()". + model.Time(dataPoint.Timestamp).Time().UnixNano(), + dataPoint.Value, + ) + groupEvent.Events = append(groupEvent.Events, metric) + } + } + + return groupEvent, nil +} + +func unmarshalBatchTimeseriesData(data []byte) (*pb.WriteRequest, error) { + wr := new(prompb.WriteRequest) + if err := wr.Unmarshal(data); err != nil { + return nil, err + } + + return convertPrompbToVictoriaMetricspb(wr) +} + +func convertPrompbToVictoriaMetricspb(wr *prompb.WriteRequest) (*pb.WriteRequest, error) { + if wr == nil || len(wr.Timeseries) == 0 { + return nil, errors.New("nil *prompb.WriteRequest") + } + + res := &pb.WriteRequest{ + Timeseries: make([]pb.TimeSeries, 0, len(wr.Timeseries)), + } + for _, tss := range wr.Timeseries { + res.Timeseries = append(res.Timeseries, pb.TimeSeries{ + Labels: convertToVMLabels(tss.Labels), + Samples: convertToVMSamples(tss.Samples), + }) + } + + return res, nil +} + +func convertToVMLabels(labels []prompb.Label) []pb.Label { + if len(labels) == 0 { + return nil + } + + res := make([]pb.Label, 0, len(labels)) + for _, label := range labels { + res = append(res, pb.Label{ + Name: string(label.Name), + Value: string(label.Value), + }) + } + + return res +} + +func convertToVMSamples(samples []prompb.Sample) []pb.Sample { + if len(samples) == 0 { + return nil + } + + res := make([]pb.Sample, 0, len(samples)) + for _, sample := range samples { + res = append(res, pb.Sample{ + Value: sample.Value, + Timestamp: sample.Timestamp, + }) + } + + return res +} diff --git a/pkg/protocol/encoder/prometheus/utils.go b/pkg/protocol/encoder/prometheus/utils.go new file mode 100644 index 0000000000..622b869e63 --- /dev/null +++ b/pkg/protocol/encoder/prometheus/utils.go @@ -0,0 +1,118 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "context" + "sort" + "sync" + + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + + "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/models" +) + +const metricNameKey = "__name__" + +func marshalBatchTimeseriesData(wr *pb.WriteRequest) []byte { + if len(wr.Timeseries) == 0 { + return nil + } + + data, err := wr.Marshal() + if err != nil { + // logger.Error(context.Background(), alarmType, "pb marshal err", err) + return nil + } + + return data +} + +func genPromRemoteWriteTimeseries(event *models.Metric) pb.TimeSeries { + return pb.TimeSeries{ + Labels: lexicographicalSort(append(convTagsToLabels(event.GetTags()), pb.Label{Name: metricNameKey, Value: event.GetName()})), + Samples: []pb.Sample{{ + Value: event.GetValue().GetSingleValue(), + + // Decode (during input_prometheus stage) makes timestamp + // with unix milliseconds into unix nanoseconds, + // e.g. "model.Time(milliseconds).Time().UnixNano()". + // + // Encode (during flusher_prometheus stage) conversely makes timestamp + // with unix nanoseconds into unix milliseconds, + // e.g. "int64(nanoseconds)/10^6". + Timestamp: int64(event.GetTimestamp()) / 1e6, + }}, + } +} + +func convTagsToLabels(tags models.Tags) []pb.Label { + if tags == nil { + logger.Debugf(context.Background(), "get nil models.Tags") + return nil + } + + labels := make([]pb.Label, 0, tags.Len()) + for k, v := range tags.Iterator() { + // MUST NOT contain any empty label names or values. + // reference: https://prometheus.io/docs/specs/remote_write_spec/#labels + if k != "" && v != "" { + labels = append(labels, pb.Label{Name: k, Value: v}) + } + } + + return labels +} + +// MUST have label names sorted in lexicographical order. +// reference: https://prometheus.io/docs/specs/remote_write_spec/#labels +func lexicographicalSort(labels []pb.Label) []pb.Label { + sort.Sort(promLabels(labels)) + + return labels +} + +type promLabels []pb.Label + +func (p promLabels) Len() int { + return len(p) +} + +func (p promLabels) Less(i, j int) bool { + return p[i].Name < p[j].Name +} + +func (p promLabels) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +var wrPool sync.Pool + +func getWriteRequest(seriesLimit int) *pb.WriteRequest { + wr := wrPool.Get() + if wr == nil { + return &pb.WriteRequest{ + Timeseries: make([]pb.TimeSeries, 0, seriesLimit), + } + } + + return wr.(*pb.WriteRequest) +} + +func putWriteRequest(wr *pb.WriteRequest) { + wr.Timeseries = wr.Timeseries[:0] + wrPool.Put(wr) +} diff --git a/pkg/protocol/encoder/prometheus/utils_test.go b/pkg/protocol/encoder/prometheus/utils_test.go new file mode 100644 index 0000000000..c84e4ab68f --- /dev/null +++ b/pkg/protocol/encoder/prometheus/utils_test.go @@ -0,0 +1,96 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "sort" + "testing" + + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/stretchr/testify/assert" +) + +// 场景:Prometheus label names 字典序排序 +// 因子:乱序的 Prometheus label names +// 预期:Prometheus label names 按字典序排序 +func TestLexicographicalSort_ShouldSortedInLexicographicalOrder(t *testing.T) { + // given + labels := []pb.Label{ + {Name: "Tutorial", Value: "tutorial"}, + {Name: "Point", Value: "point"}, + {Name: "Java", Value: "java"}, + {Name: "C++", Value: "c++"}, + {Name: "Golang", Value: "golang"}, + {Name: metricNameKey, Value: "test_metric_name"}, + } + ans := []pb.Label{ + {Name: "C++", Value: "c++"}, + {Name: "Golang", Value: "golang"}, + {Name: "Java", Value: "java"}, + {Name: "Point", Value: "point"}, + {Name: "Tutorial", Value: "tutorial"}, + {Name: metricNameKey, Value: "test_metric_name"}, + } + assert.Equal(t, len(ans), len(labels)) + + // when + got := lexicographicalSort(labels) + + // then + assert.Equal(t, ans, got) +} + +// 场景:性能测试,确定 lexicographicalSort 字典序排序方法的性能 +// 因子:利用 lexicographicalSort(底层基于sort.Sort())对 Prometheus label names 进行字典序排序 +// 预期:lexicographicalSort 和 sort.Strings 对 Prometheus label names 的字典序排序性能相当(数量级相同) +// goos: darwin +// goarch: arm64 +// pkg: github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus +// BenchmarkLexicographicalSort +// BenchmarkLexicographicalSort/lexicographicalSort +// BenchmarkLexicographicalSort/lexicographicalSort-12 23059904 47.51 ns/op +// BenchmarkLexicographicalSort/sort.Strings +// BenchmarkLexicographicalSort/sort.Strings-12 25321753 47.30 ns/op +// PASS +func BenchmarkLexicographicalSort(b *testing.B) { + prometheusLabels := []pb.Label{ + {Name: "Tutorial", Value: "tutorial"}, + {Name: "Point", Value: "point"}, + {Name: "Java", Value: "java"}, + {Name: "C++", Value: "c++"}, + {Name: "Golang", Value: "golang"}, + {Name: metricNameKey, Value: "test_metric_name"}, + } + stringLabels := []string{ + "Tutorial", + "Point", + "Java", + "C++", + "Golang", + metricNameKey, + } + + b.Run("lexicographicalSort", func(b *testing.B) { + for i := 0; i < b.N; i++ { + lexicographicalSort(prometheusLabels) + } + }) + + b.Run("sort.Strings", func(b *testing.B) { + for i := 0; i < b.N; i++ { + sort.Strings(stringLabels) + } + }) +} diff --git a/pkg/protocol/encoder/protocol.go b/pkg/protocol/encoder/protocol.go new file mode 100644 index 0000000000..a5ab263740 --- /dev/null +++ b/pkg/protocol/encoder/protocol.go @@ -0,0 +1,28 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoder + +const ( + protocolSLS = "sls" + protocolPrometheus = "prometheus" + protocolInflux = "influx" + protocolInfluxdb = "influxdb" + protocolStatsd = "statsd" + protocolOTLPLogV1 = "otlp_logv1" + protocolOTLPMetricV1 = "otlp_metricv1" + protocolOTLPTraceV1 = "otlp_tracev1" + protocolRaw = "raw" + protocolPyroscope = "pyroscope" +) From 8172be1659e4806947ec5b00c6cc71ab2c647d8e Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Mon, 19 Aug 2024 16:10:53 +0800 Subject: [PATCH 3/9] feat: add ext_default_encoder extension plugin --- plugins.yml | 1 + plugins/extension/encoder/encoder.go | 83 +++++++++++++ plugins/extension/encoder/encoder_test.go | 141 ++++++++++++++++++++++ 3 files changed, 225 insertions(+) create mode 100644 plugins/extension/encoder/encoder.go create mode 100644 plugins/extension/encoder/encoder_test.go diff --git a/plugins.yml b/plugins.yml index 167c7e1482..f997c790bf 100644 --- a/plugins.yml +++ b/plugins.yml @@ -25,6 +25,7 @@ plugins: - import: "github.com/alibaba/ilogtail/plugins/aggregator/skywalking" - import: "github.com/alibaba/ilogtail/plugins/extension/basicauth" - import: "github.com/alibaba/ilogtail/plugins/extension/default_decoder" + - import: "github.com/alibaba/ilogtail/plugins/extension/encoder" - import: "github.com/alibaba/ilogtail/plugins/extension/group_info_filter" - import: "github.com/alibaba/ilogtail/plugins/extension/request_breaker" - import: "github.com/alibaba/ilogtail/plugins/flusher/checker" diff --git a/plugins/extension/encoder/encoder.go b/plugins/extension/encoder/encoder.go new file mode 100644 index 0000000000..f6298d9dec --- /dev/null +++ b/plugins/extension/encoder/encoder.go @@ -0,0 +1,83 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoder + +import ( + "encoding/json" + "errors" + + "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/pipeline/extensions" + "github.com/alibaba/ilogtail/pkg/protocol/encoder" +) + +// ensure ExtensionDefaultEncoder implements the extensions.EncoderExtension interface +var _ extensions.EncoderExtension = (*ExtensionDefaultEncoder)(nil) + +type ExtensionDefaultEncoder struct { + extensions.Encoder + + Format string + options map[string]any // additional properties map to here +} + +func NewExtensionDefaultEncoder() *ExtensionDefaultEncoder { + return &ExtensionDefaultEncoder{} +} + +func (e *ExtensionDefaultEncoder) UnmarshalJSON(bytes []byte) error { + err := json.Unmarshal(bytes, &e.options) + if err != nil { + return err + } + + format, ok := e.options["Format"].(string) + if !ok { + return errors.New("field Format should be type of string") + } + + delete(e.options, "Format") + e.Format = format + + return nil +} + +func (e *ExtensionDefaultEncoder) Description() string { + return "default encoder that support builtin formats" +} + +func (e *ExtensionDefaultEncoder) Init(context pipeline.Context) error { + enc, err := encoder.NewEncoder(e.Format, e.options) + if err != nil { + return err + } + + e.Encoder = enc + + logger.Infof(context.GetRuntimeContext(), "%s init success, encoder: %s", e.Description(), e.Format) + + return nil +} + +func (e *ExtensionDefaultEncoder) Stop() error { + return nil +} + +func init() { + pipeline.AddExtensionCreator("ext_default_encoder", func() pipeline.Extension { + return NewExtensionDefaultEncoder() + }) +} diff --git a/plugins/extension/encoder/encoder_test.go b/plugins/extension/encoder/encoder_test.go new file mode 100644 index 0000000000..e5089b6f35 --- /dev/null +++ b/plugins/extension/encoder/encoder_test.go @@ -0,0 +1,141 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoder + +import ( + "encoding/json" + "fmt" + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus" + "github.com/alibaba/ilogtail/plugins/test/mock" +) + +// 场景:插件初始化 +// 因子:Format 字段存在 +// 因子:Prometheus Protocol +// 预期:初始化成功,且 Encoder 为 Prometheus Encoder +func TestEncoder_ShouldPassConfigToRealEncoder_GivenCorrectConfigInput(t *testing.T) { + Convey("Given correct config json string", t, func() { + e := NewExtensionDefaultEncoder() + So(e, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + So(e.Format, ShouldBeEmpty) + So(e.options, ShouldBeNil) + + encodeProtocol := "prometheus" + optField, optValue := "SeriesLimit", 1024 + configJsonStr := fmt.Sprintf(`{"Format":"%s","%s":%d}`, encodeProtocol, optField, optValue) + // must using float64(optValue), not optValue + // https://github.com/smartystreets/goconvey/issues/437 + wantOpts := map[string]any{optField: float64(optValue)} + + Convey("Then should json unmarshal success", func() { + err := json.Unmarshal([]byte(configJsonStr), e) + So(err, ShouldBeNil) + So(e.Encoder, ShouldBeNil) + So(e.options, ShouldResemble, wantOpts) + So(e.Format, ShouldEqual, encodeProtocol) + + Convey("Then should init success", func() { + err = e.Init(mock.NewEmptyContext("p", "l", "c")) + So(err, ShouldBeNil) + So(e.Encoder, ShouldNotBeNil) + + Convey("Then encoder implement should be *prometheus.Encoder", func() { + promEncoder, ok := e.Encoder.(*prometheus.Encoder) + So(ok, ShouldBeTrue) + So(promEncoder, ShouldNotBeNil) + So(promEncoder.SeriesLimit, ShouldEqual, optValue) + }) + }) + + Convey("Then should stop success", func() { + err := e.Stop() + So(err, ShouldBeNil) + }) + }) + }) +} + +// 场景:插件初始化 +// 因子:Format 字段存在 +// 因子:Unsupported Protocol +// 预期:初始化失败 +func TestEncoder_ShouldNotPassConfigToRealEncoder_GivenIncorrectConfigInput(t *testing.T) { + Convey("Given incorrect config json string but with Format field", t, func() { + e := NewExtensionDefaultEncoder() + So(e, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + So(e.Format, ShouldBeEmpty) + So(e.options, ShouldBeNil) + + encodeProtocol := "unknown" + configJsonStr := fmt.Sprintf(`{"Format":"%s"}`, encodeProtocol) + + Convey("Then should json unmarshal success", func() { + err := json.Unmarshal([]byte(configJsonStr), e) + So(err, ShouldBeNil) + So(e.Encoder, ShouldBeNil) + So(e.Format, ShouldEqual, encodeProtocol) + + Convey("Then should init failed", func() { + err = e.Init(mock.NewEmptyContext("p", "l", "c")) + So(err, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + }) + + Convey("Then should stop success", func() { + err := e.Stop() + So(err, ShouldBeNil) + }) + }) + }) +} + +// 场景:插件初始化 +// 因子:Format 字段缺失 +// 预期:json unmarshal 失败,初始化失败 +func TestEncoder_ShouldUnmarshalFailed_GivenConfigWithoutFormat(t *testing.T) { + Convey("Given incorrect config json string and without Format field", t, func() { + e := NewExtensionDefaultEncoder() + So(e, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + So(e.Format, ShouldBeEmpty) + So(e.options, ShouldBeNil) + + configJsonStr := `{"Unknown":"unknown"}` + + Convey("Then should json unmarshal failed", func() { + err := json.Unmarshal([]byte(configJsonStr), e) + So(err, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + So(e.Format, ShouldBeEmpty) + + Convey("Then should init failed", func() { + err = e.Init(mock.NewEmptyContext("p", "l", "c")) + So(err, ShouldNotBeNil) + So(e.Encoder, ShouldBeNil) + }) + + Convey("Then should stop success", func() { + err := e.Stop() + So(err, ShouldBeNil) + }) + }) + }) +} From b3c11f41efcbca71a1dbe2e4ca2b2c074c9ec9a2 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Tue, 20 Aug 2024 16:47:07 +0800 Subject: [PATCH 4/9] refactor: rename package name, var name of encoder --- go.mod | 4 ++-- pkg/go.mod | 2 +- .../encoder/{protocol.go => common/common.go} | 13 ++----------- pkg/protocol/encoder/encoder.go | 2 +- plugins.yml | 2 +- .../default_encoder.go} | 2 +- .../default_encoder_test.go} | 14 +++++++------- 7 files changed, 15 insertions(+), 24 deletions(-) rename pkg/protocol/encoder/{protocol.go => common/common.go} (62%) rename plugins/extension/{encoder/encoder.go => default_encoder/default_encoder.go} (98%) rename plugins/extension/{encoder/encoder_test.go => default_encoder/default_encoder_test.go} (92%) diff --git a/go.mod b/go.mod index e6f183cc2a..483e82614b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/ClickHouse/clickhouse-go/v2 v2.6.0 github.com/IBM/sarama v1.42.2 - github.com/VictoriaMetrics/VictoriaMetrics v1.83.1 + github.com/VictoriaMetrics/VictoriaMetrics v1.83.0 github.com/alibaba/ilogtail/pkg v0.0.0 github.com/apache/pulsar-client-go v0.10.0 github.com/buger/jsonparser v1.1.1 @@ -44,7 +44,7 @@ require ( github.com/prometheus/client_golang v1.14.0 github.com/prometheus/common v0.42.0 github.com/prometheus/procfs v0.8.0 - github.com/pyroscope-io/pyroscope v0.37.2 + github.com/pyroscope-io/pyroscope v1.5.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/sirupsen/logrus v1.8.1 github.com/smartystreets/goconvey v1.7.2 diff --git a/pkg/go.mod b/pkg/go.mod index 35ff26af66..e62336a119 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/Microsoft/go-winio v0.5.2 - github.com/VictoriaMetrics/VictoriaMetrics v1.83.4 + github.com/VictoriaMetrics/VictoriaMetrics v1.83.0 github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.2.0 github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 diff --git a/pkg/protocol/encoder/protocol.go b/pkg/protocol/encoder/common/common.go similarity index 62% rename from pkg/protocol/encoder/protocol.go rename to pkg/protocol/encoder/common/common.go index a5ab263740..4caf8109df 100644 --- a/pkg/protocol/encoder/protocol.go +++ b/pkg/protocol/encoder/common/common.go @@ -12,17 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -package encoder +package common const ( - protocolSLS = "sls" - protocolPrometheus = "prometheus" - protocolInflux = "influx" - protocolInfluxdb = "influxdb" - protocolStatsd = "statsd" - protocolOTLPLogV1 = "otlp_logv1" - protocolOTLPMetricV1 = "otlp_metricv1" - protocolOTLPTraceV1 = "otlp_tracev1" - protocolRaw = "raw" - protocolPyroscope = "pyroscope" + ProtocolPrometheus = "prometheus" ) diff --git a/pkg/protocol/encoder/encoder.go b/pkg/protocol/encoder/encoder.go index 00c90cba9e..2adf0f8001 100644 --- a/pkg/protocol/encoder/encoder.go +++ b/pkg/protocol/encoder/encoder.go @@ -26,7 +26,7 @@ import ( func NewEncoder(format string, options map[string]any) (extensions.Encoder, error) { switch strings.TrimSpace(strings.ToLower(format)) { - case protocolPrometheus: + case ProtocolPrometheus: var opt prometheus.Option if err := mapstructure.Decode(options, &opt); err != nil { return nil, err diff --git a/plugins.yml b/plugins.yml index f997c790bf..f09d74d16d 100644 --- a/plugins.yml +++ b/plugins.yml @@ -25,7 +25,7 @@ plugins: - import: "github.com/alibaba/ilogtail/plugins/aggregator/skywalking" - import: "github.com/alibaba/ilogtail/plugins/extension/basicauth" - import: "github.com/alibaba/ilogtail/plugins/extension/default_decoder" - - import: "github.com/alibaba/ilogtail/plugins/extension/encoder" + - import: "github.com/alibaba/ilogtail/plugins/extension/default_encoder" - import: "github.com/alibaba/ilogtail/plugins/extension/group_info_filter" - import: "github.com/alibaba/ilogtail/plugins/extension/request_breaker" - import: "github.com/alibaba/ilogtail/plugins/flusher/checker" diff --git a/plugins/extension/encoder/encoder.go b/plugins/extension/default_encoder/default_encoder.go similarity index 98% rename from plugins/extension/encoder/encoder.go rename to plugins/extension/default_encoder/default_encoder.go index f6298d9dec..646d4b2f16 100644 --- a/plugins/extension/encoder/encoder.go +++ b/plugins/extension/default_encoder/default_encoder.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package encoder +package defaultencoder import ( "encoding/json" diff --git a/plugins/extension/encoder/encoder_test.go b/plugins/extension/default_encoder/default_encoder_test.go similarity index 92% rename from plugins/extension/encoder/encoder_test.go rename to plugins/extension/default_encoder/default_encoder_test.go index e5089b6f35..0e7a2d1dc6 100644 --- a/plugins/extension/encoder/encoder_test.go +++ b/plugins/extension/default_encoder/default_encoder_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package encoder +package defaultencoder import ( "encoding/json" @@ -39,13 +39,13 @@ func TestEncoder_ShouldPassConfigToRealEncoder_GivenCorrectConfigInput(t *testin encodeProtocol := "prometheus" optField, optValue := "SeriesLimit", 1024 - configJsonStr := fmt.Sprintf(`{"Format":"%s","%s":%d}`, encodeProtocol, optField, optValue) + configJSONStr := fmt.Sprintf(`{"Format":"%s","%s":%d}`, encodeProtocol, optField, optValue) // must using float64(optValue), not optValue // https://github.com/smartystreets/goconvey/issues/437 wantOpts := map[string]any{optField: float64(optValue)} Convey("Then should json unmarshal success", func() { - err := json.Unmarshal([]byte(configJsonStr), e) + err := json.Unmarshal([]byte(configJSONStr), e) So(err, ShouldBeNil) So(e.Encoder, ShouldBeNil) So(e.options, ShouldResemble, wantOpts) @@ -85,10 +85,10 @@ func TestEncoder_ShouldNotPassConfigToRealEncoder_GivenIncorrectConfigInput(t *t So(e.options, ShouldBeNil) encodeProtocol := "unknown" - configJsonStr := fmt.Sprintf(`{"Format":"%s"}`, encodeProtocol) + configJSONStr := fmt.Sprintf(`{"Format":"%s"}`, encodeProtocol) Convey("Then should json unmarshal success", func() { - err := json.Unmarshal([]byte(configJsonStr), e) + err := json.Unmarshal([]byte(configJSONStr), e) So(err, ShouldBeNil) So(e.Encoder, ShouldBeNil) So(e.Format, ShouldEqual, encodeProtocol) @@ -118,10 +118,10 @@ func TestEncoder_ShouldUnmarshalFailed_GivenConfigWithoutFormat(t *testing.T) { So(e.Format, ShouldBeEmpty) So(e.options, ShouldBeNil) - configJsonStr := `{"Unknown":"unknown"}` + configJSONStr := `{"Unknown":"unknown"}` Convey("Then should json unmarshal failed", func() { - err := json.Unmarshal([]byte(configJsonStr), e) + err := json.Unmarshal([]byte(configJSONStr), e) So(err, ShouldNotBeNil) So(e.Encoder, ShouldBeNil) So(e.Format, ShouldBeEmpty) From 9841a1286a6095b4d29077510f09fe2bd8b8daeb Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Tue, 20 Aug 2024 17:35:44 +0800 Subject: [PATCH 5/9] docs: add docs for ext_default_encoder & update docs of other extensions --- docs/cn/SUMMARY.md | 6 +++ docs/cn/plugins/extension/README.md | 3 ++ .../plugins/extension/ext-default-encoder.md | 37 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 docs/cn/plugins/extension/README.md create mode 100644 docs/cn/plugins/extension/ext-default-encoder.md diff --git a/docs/cn/SUMMARY.md b/docs/cn/SUMMARY.md index 2db8b9343e..d897d68602 100644 --- a/docs/cn/SUMMARY.md +++ b/docs/cn/SUMMARY.md @@ -115,6 +115,12 @@ * [Pulsar](plugins/flusher/flusher-pulsar.md) * [HTTP](plugins/flusher/flusher-http.md) * [Loki](plugins/flusher/loki.md) +* [扩展](plugins/extension/README.md) + * [BasicAuth鉴权](plugins/extension/ext-basicauth.md) + * [协议解码/反序列化](plugins/extension/ext-default-decoder.md) + * [协议编码/序列化](plugins/extension/ext-default-encoder.md) + * [数据筛选](plugins/extension/ext-groupinfo-filter.md) + * [请求熔断](plugins/extension/ext-request-breaker.md) ## 工作原理 diff --git a/docs/cn/plugins/extension/README.md b/docs/cn/plugins/extension/README.md new file mode 100644 index 0000000000..bbf31e717b --- /dev/null +++ b/docs/cn/plugins/extension/README.md @@ -0,0 +1,3 @@ +# 扩展 + +扩展插件用于对其它插件能力的补充(e.g. 鉴权、编解码、熔断、限流……) \ No newline at end of file diff --git a/docs/cn/plugins/extension/ext-default-encoder.md b/docs/cn/plugins/extension/ext-default-encoder.md new file mode 100644 index 0000000000..b5f6df9078 --- /dev/null +++ b/docs/cn/plugins/extension/ext-default-encoder.md @@ -0,0 +1,37 @@ +# DefaultEncoder Encoder扩展 + +## 简介 + +[ext_default_encoder](https://github.com/alibaba/ilogtail/blob/main/plugins/extension/default_encoder/default_encoder.go) +扩展,实现了 [Encoder](https://github.com/alibaba/ilogtail/blob/main/pkg/pipeline/extensions/encoder.go) 接口,可以用在 +`flusher_http` 等插件中用于序列化不同的协议数据。 + +## 版本 + +[Alpha](../stability-level.md) + +## 配置参数 + +| 参数 | 类型 | 是否必选 | 说明 | +|-------------|--------|------|-----------------------------------------------------------------------------------------------------------| +| Format | String | 是 | 具体的协议,[查看支持的具体协议列表](https://github.com/alibaba/ilogtail/blob/master/pkg/protocol/encoder/common/comon.go) | +| SeriesLimit | Int | 否 | 触发序列化时序切片的最大长度,默认 1000,仅针对 Format=prometheus 时有效 | + +## 样例 + +使用 `flusher_http` flusher 插件,配置发送 `prometheus` 协议数据。 + +```yaml +enable: true +flushers: +- Type: flusher_http + ... + Encoder: + Type: ext_default_encoder/prometheus + ... +... +extensions: +- Type: ext_default_encoder/prometheus + Format: 'prometheus' + SeriesLimit: 1024 +``` From d2f18cfbdc869c267d872fa536bde62d461a3ec4 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Tue, 20 Aug 2024 19:07:19 +0800 Subject: [PATCH 6/9] docs: add ext_default_encoder doc in docs/cn/plugins/overview.md --- docs/cn/plugins/overview.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/cn/plugins/overview.md b/docs/cn/plugins/overview.md index f462a05692..c0ccf6a220 100644 --- a/docs/cn/plugins/overview.md +++ b/docs/cn/plugins/overview.md @@ -115,3 +115,9 @@ | 名称 | 提供方 | 简介 | |----------------------------------------------------------------------------|-------------------------------------------------|-----------------------------| | [`ext_default_decoder`](extension/ext-default-decoder.md)
默认的decoder扩展 | 社区
[`snakorse`](https://github.com/snakorse) | 将内置支持的Format以Decoder扩展的形式封装 | + +### Encoder + +| 名称 | 提供方 | 简介 | +|----------------------------------------------------------------------------|--------------------------------------------------------|-----------------------------| +| [`ext_default_encoder`](extension/ext-default-encoder.md)
默认的encoder扩展 | 社区
[`yuanshuai.1900`](https://github.com/aiops1900) | 将内置支持的Format以Encoder扩展的形式封装 | From 93edbb35225982b555e833d1b17c9638898e05e9 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Wed, 21 Aug 2024 14:23:09 +0800 Subject: [PATCH 7/9] fix: fix build failed for package referenece --- pkg/protocol/encoder/encoder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/protocol/encoder/encoder.go b/pkg/protocol/encoder/encoder.go index 2adf0f8001..81fc0667ce 100644 --- a/pkg/protocol/encoder/encoder.go +++ b/pkg/protocol/encoder/encoder.go @@ -21,12 +21,13 @@ import ( "github.com/mitchellh/mapstructure" "github.com/alibaba/ilogtail/pkg/pipeline/extensions" + "github.com/alibaba/ilogtail/pkg/protocol/encoder/common" "github.com/alibaba/ilogtail/pkg/protocol/encoder/prometheus" ) func NewEncoder(format string, options map[string]any) (extensions.Encoder, error) { switch strings.TrimSpace(strings.ToLower(format)) { - case ProtocolPrometheus: + case common.ProtocolPrometheus: var opt prometheus.Option if err := mapstructure.Decode(options, &opt); err != nil { return nil, err From bc56301f0692e0402c247036de1fb0ec4fbdb497 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Wed, 21 Aug 2024 19:43:25 +0800 Subject: [PATCH 8/9] chore: update pkg/go.mod to solve lint failed --- pkg/go.mod | 1 + pkg/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/go.mod b/pkg/go.mod index e62336a119..5e9313b879 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -19,6 +19,7 @@ require ( github.com/influxdata/telegraf v1.20.0 github.com/json-iterator/go v1.1.12 github.com/mailru/easyjson v0.7.7 + github.com/mitchellh/mapstructure v1.4.2 github.com/narqo/go-dogstatsd-parser v0.2.0 github.com/pierrec/lz4 v2.6.1+incompatible github.com/prometheus/common v0.42.0 diff --git a/pkg/go.sum b/pkg/go.sum index 48a04d58bf..c8cced1fc8 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -946,6 +946,8 @@ github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo= +github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= From 73db48f3146c548446819eb8ef66e37fc9865937 Mon Sep 17 00:00:00 2001 From: "yuanshuai.1900" Date: Thu, 22 Aug 2024 10:49:43 +0800 Subject: [PATCH 9/9] fix: fix ci-lint error --- pkg/protocol/encoder/prometheus/encoder_prometheus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/protocol/encoder/prometheus/encoder_prometheus.go b/pkg/protocol/encoder/prometheus/encoder_prometheus.go index 5b4ec2d8c5..4be1ef5792 100644 --- a/pkg/protocol/encoder/prometheus/encoder_prometheus.go +++ b/pkg/protocol/encoder/prometheus/encoder_prometheus.go @@ -103,7 +103,7 @@ func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, e } func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents) ([][]byte, error) { - if groupEventsSlice == nil || len(groupEventsSlice) == 0 { + if len(groupEventsSlice) == 0 { return nil, errNilOrZeroGroupEvents }