Skip to content

Commit

Permalink
[exporter/elasticsearch] [chore] move esIndex into elasticsearch.Index (
Browse files Browse the repository at this point in the history
#37468)

#### Description

This moves the internal `esIndex` struct into `elasticsearch.Index`, so
other internal packages can use it.
  • Loading branch information
dmathieu authored Jan 24, 2025
1 parent de003ec commit 8c275f4
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 75 deletions.
30 changes: 6 additions & 24 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"unicode"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

var receiverRegex = regexp.MustCompile(`/receiver/(\w*receiver)`)
Expand Down Expand Up @@ -46,15 +48,15 @@ func routeWithDefaults(defaultDSType string) func(
string,
bool,
string,
) esIndex {
) elasticsearch.Index {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
otel bool,
scopeName string,
) esIndex {
) elasticsearch.Index {
// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
Expand All @@ -67,7 +69,7 @@ func routeWithDefaults(defaultDSType string) func(
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return esIndex{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)}
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)}
}
}

Expand All @@ -89,30 +91,10 @@ func routeWithDefaults(defaultDSType string) func(

dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix)
namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "")
return newDataStream(defaultDSType, dataset, namespace)
}
}

type esIndex struct {
Index string
Type string
Dataset string
Namespace string
}

func newDataStream(typ, dataset, namespace string) esIndex {
return esIndex{
Index: fmt.Sprintf("%s-%s-%s", typ, dataset, namespace),
Type: typ,
Dataset: dataset,
Namespace: namespace,
return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace)
}
}

func (i esIndex) isDataStream() bool {
return i.Type != "" && i.Dataset != "" && i.Namespace != ""
}

var (
// routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes.
// This function may mutate record attributes.
Expand Down
8 changes: 5 additions & 3 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import (

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

type routeTestCase struct {
name string
otel bool
scopeName string
want esIndex
want elasticsearch.Index
}

func createRouteTests(dsType string) []routeTestCase {
renderWantRoute := func(dsType, dsDataset string, otel bool) esIndex {
renderWantRoute := func(dsType, dsDataset string, otel bool) elasticsearch.Index {
if otel {
dsDataset += ".otel"
}
return newDataStream(dsType, dsDataset, defaultDataStreamNamespace)
return elasticsearch.NewDataStreamIndex(dsType, dsDataset, defaultDataStreamNamespace)
}

return []routeTestCase{
Expand Down
23 changes: 12 additions & 11 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
)

Expand Down Expand Up @@ -167,7 +168,7 @@ func (e *elasticsearchExporter) pushLogRecord(
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := esIndex{Index: e.index}
fIndex := elasticsearch.Index{Index: e.index}
if e.dynamicIndex {
fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name())
}
Expand All @@ -177,7 +178,7 @@ func (e *elasticsearchExporter) pushLogRecord(
if err != nil {
return err
}
fIndex = esIndex{Index: formattedIndex}
fIndex = elasticsearch.Index{Index: formattedIndex}
}

buf := e.bufferPool.NewPooledBuffer()
Expand Down Expand Up @@ -216,7 +217,7 @@ func (e *elasticsearchExporter) pushMetricsData(
var validationErrs []error // log instead of returning these so that upstream does not retry
scopeMetrics := scopeMetrics.At(j)
scope := scopeMetrics.Scope()
groupedDataPointsByIndex := make(map[esIndex]map[uint32][]dataPoint)
groupedDataPointsByIndex := make(map[elasticsearch.Index]map[uint32][]dataPoint)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)

Expand Down Expand Up @@ -334,18 +335,18 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
dataPoint dataPoint,
) (esIndex, error) {
fIndex := esIndex{Index: e.index}
) (elasticsearch.Index, error) {
fIndex := elasticsearch.Index{Index: e.index}
if e.dynamicIndex {
fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name())
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now())
if err != nil {
return esIndex{}, err
return elasticsearch.Index{}, err
}
fIndex = esIndex{Index: formattedIndex}
fIndex = elasticsearch.Index{Index: formattedIndex}
}
return fIndex, nil
}
Expand Down Expand Up @@ -409,7 +410,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := esIndex{Index: e.index}
fIndex := elasticsearch.Index{Index: e.index}
if e.dynamicIndex {
fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, span.Name())
}
Expand All @@ -419,7 +420,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
if err != nil {
return err
}
fIndex = esIndex{Index: formattedIndex}
fIndex = elasticsearch.Index{Index: formattedIndex}
}

buf := e.bufferPool.NewPooledBuffer()
Expand All @@ -442,7 +443,7 @@ func (e *elasticsearchExporter) pushSpanEvent(
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := esIndex{Index: e.index}
fIndex := elasticsearch.Index{Index: e.index}
if e.dynamicIndex {
fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name())
}
Expand All @@ -452,7 +453,7 @@ func (e *elasticsearchExporter) pushSpanEvent(
if err != nil {
return err
}
fIndex = esIndex{Index: formattedIndex}
fIndex = elasticsearch.Index{Index: formattedIndex}
}
buf := e.bufferPool.NewPooledBuffer()
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, fIndex, buf.Buffer)
Expand Down
26 changes: 26 additions & 0 deletions exporter/elasticsearchexporter/internal/elasticsearch/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearch // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"

import "fmt"

type Index struct {
Index string
Type string
Dataset string
Namespace string
}

func NewDataStreamIndex(typ, dataset, namespace string) Index {
return Index{
Index: fmt.Sprintf("%s-%s-%s", typ, dataset, namespace),
Type: typ,
Dataset: dataset,
Namespace: namespace,
}
}

func (i Index) IsDataStream() bool {
return i.Type != "" && i.Dataset != "" && i.Namespace != ""
}
30 changes: 15 additions & 15 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ var resourceAttrsToPreserve = map[string]bool{
var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode")

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer)
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer)
hashDataPoint(dataPoint) uint32
encodeDocument(objmodel.Document, *bytes.Buffer) error
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error)
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error)
}

// encodeModel tries to keep the event as close to the original open telemetry semantics as is.
Expand Down Expand Up @@ -112,7 +112,7 @@ const (
attributeField = "attribute"
)

func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error {
func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) error {
var document objmodel.Document
switch m.mode {
case MappingECS:
Expand All @@ -129,7 +129,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
return document.Serialize(buf, m.dedot)
}

func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document {
func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document {
var document objmodel.Document

docTimeStamp := record.Timestamp()
Expand Down Expand Up @@ -160,7 +160,7 @@ func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buf
return nil
}

func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document {
func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document {
var document objmodel.Document

// First, try to map resource-level attributes to ECS fields.
Expand Down Expand Up @@ -224,7 +224,7 @@ func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 {
}
}

func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) {
func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
dp0 := dataPoints[0]
var document objmodel.Document
encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve)
Expand All @@ -245,15 +245,15 @@ func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoi
return document.DynamicTemplates(), err
}

func addDataStreamAttributes(document *objmodel.Document, key string, idx esIndex) {
if idx.isDataStream() {
func addDataStreamAttributes(document *objmodel.Document, key string, idx elasticsearch.Index) {
if idx.IsDataStream() {
document.AddString(key+"data_stream.type", idx.Type)
document.AddString(key+"data_stream.dataset", idx.Dataset)
document.AddString(key+"data_stream.namespace", idx.Namespace)
}
}

func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) {
func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
switch m.mode {
case MappingOTel:
return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, idx, buf)
Expand Down Expand Up @@ -494,7 +494,7 @@ func (dp numberDataPoint) Metric() pmetric.Metric {

var errInvalidNumberDataPoint = errors.New("invalid number data point")

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error {
func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) error {
var document objmodel.Document
switch m.mode {
case MappingOTel:
Expand All @@ -507,7 +507,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st
return err
}

func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document {
func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
Expand All @@ -527,7 +527,7 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
return document
}

func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) {
func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) {
if m.mode != MappingOTel {
// Currently span events are stored separately only in OTel mapping mode.
// In other modes, they are stored within the span document.
Expand All @@ -536,7 +536,7 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU
serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, idx, buf)
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx esIndex) {
func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx elasticsearch.Index) {
key := "Attributes"
if m.mode == MappingRaw {
key = ""
Expand Down
Loading

0 comments on commit 8c275f4

Please sign in to comment.