Skip to content

Commit

Permalink
feat(open-telemetry#5408): add otlplogfile exporter
Browse files Browse the repository at this point in the history
This commit adds a new experimental exporter `otlplogfile`, that outputs log records to a JSON line file.
It is based on the following specification: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md

Signed-off-by: thomasgouveia <[email protected]>
  • Loading branch information
thomasgouveia committed Aug 27, 2024
1 parent f079b03 commit 1f72ae3
Show file tree
Hide file tree
Showing 17 changed files with 1,670 additions and 0 deletions.
3 changes: 3 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# OTLP Log File Exporter

[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile)](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile)
54 changes: 54 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"

import "time"

type fnOpt func(config) config

func (f fnOpt) applyOption(c config) config { return f(c) }

// Option sets the configuration value for an Exporter.
type Option interface {
applyOption(config) config
}

// config contains options for the OTLP Log file exporter.
type config struct {
// Path to a file on disk where records must be appended.
// This file is preferably a json line file as stated in the specification.
// See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file
// See: https://jsonlines.org
path string
// Duration represents the interval when the buffer should be flushed.
flushInterval time.Duration
}

func newConfig(options []Option) config {
c := config{
path: "/var/log/opentelemetry/logs.jsonl",
flushInterval: 5 * time.Second,
}
for _, opt := range options {
c = opt.applyOption(c)
}
return c
}

// WithFlushInterval configures the duration after which the buffer is periodically flushed to the disk.
func WithFlushInterval(flushInterval time.Duration) Option {
return fnOpt(func(c config) config {
c.flushInterval = flushInterval
return c
})
}

// WithPath defines a path to a file where the log records will be written.
// If not set, will default to /var/log/opentelemetry/logs.jsonl.
func WithPath(path string) Option {
return fnOpt(func(c config) config {
c.path = path
return c
})
}
12 changes: 12 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

/*
Package otlplogfile provides an OTLP log exporter that outputs log records to a JSON line file. The exporter uses a buffered
file writer to write log records to file to reduce I/O and improve performance.
All Exporters must be created with [New].
See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md
*/
package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"
94 changes: 94 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"

import (
"context"
"sync"

"google.golang.org/protobuf/encoding/protojson"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/transform"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer"
"go.opentelemetry.io/otel/sdk/log"
lpb "go.opentelemetry.io/proto/otlp/logs/v1"
)

// Exporter is an OpenTelemetry log exporter that outputs log records
// into JSON files. The implementation is based on the specification
// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md
type Exporter struct {
mu sync.Mutex
fw *writer.FileWriter
stopped bool
}

// Compile-time check that the implementation satisfies the interface.
var _ log.Exporter = &Exporter{}

// New returns a new [Exporter].
func New(options ...Option) (*Exporter, error) {
cfg := newConfig(options)

fw, err := writer.NewFileWriter(cfg.path, cfg.flushInterval)
if err != nil {
return nil, err
}

return &Exporter{
fw: fw,
stopped: false,
}, nil
}

// Export exports logs records to the file.
func (e *Exporter) Export(ctx context.Context, records []log.Record) error {
// Honor context cancellation
if err := ctx.Err(); err != nil {
return err
}

e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}

data := &lpb.LogsData{
ResourceLogs: transform.ResourceLogs(records),
}

by, err := protojson.Marshal(data)
if err != nil {
return err
}

return e.fw.Export(by)
}

// ForceFlush flushes data to the destination.
func (e *Exporter) ForceFlush(_ context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}
return e.fw.Flush()
}

// Shutdown shuts down the exporter. Data buffered will be written to disk,
// and opened resources such as file will be closed.
func (e *Exporter) Shutdown(_ context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}

e.stopped = true
return e.fw.Shutdown()
}
139 changes: 139 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"
import (
"context"
"fmt"
"os"
"path"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/log"

sdklog "go.opentelemetry.io/otel/sdk/log"
)

// tempFile creates a temporary file for the given test case and returns its path on disk.
// The file is automatically cleaned up when the test ends.
func tempFile(tb testing.TB) string {
f, err := os.CreateTemp(tb.TempDir(), tb.Name())
assert.NoError(tb, err, "must not error when creating temporary file")
tb.Cleanup(func() {
assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written")
})
return f.Name()
}

// makeRecords is a helper function to generate an array of log record with the desired size.
func makeRecords(count int, message string) []sdklog.Record {
var records []sdklog.Record
for i := 0; i < count; i++ {
r := sdklog.Record{}
r.SetSeverityText("INFO")
r.SetSeverity(log.SeverityInfo)
r.SetBody(log.StringValue(message))
r.SetTimestamp(time.Now())
r.SetObservedTimestamp(time.Now())
records = append(records, r)
}
return records
}

func TestExporter(t *testing.T) {
filepath := tempFile(t)
records := makeRecords(1, "hello, world!")

exporter, err := New(WithPath(filepath))
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, exporter.Shutdown(context.TODO()))
})

err = exporter.Export(context.TODO(), records)
assert.NoError(t, err)
err = exporter.ForceFlush(context.TODO())
assert.NoError(t, err)
}

func TestExporterConcurrentSafe(t *testing.T) {
filepath := tempFile(t)
exporter, err := New(WithPath(filepath))
require.NoError(t, err, "New()")

const goroutines = 10

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
runs := new(uint64)
for i := 0; i < goroutines; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
_ = exporter.Export(ctx, makeRecords(1, fmt.Sprintf("log from goroutine %d", i)))
_ = exporter.ForceFlush(ctx)
atomic.AddUint64(runs, 1)
}
}
}()
}

for atomic.LoadUint64(runs) == 0 {
runtime.Gosched()
}

assert.NoError(t, exporter.Shutdown(ctx), "must not error when shutting down")
cancel()
wg.Wait()
}

func BenchmarkExporter(b *testing.B) {
for _, logCount := range []int{
10,
100,
500,
1000,
} {
records := makeRecords(logCount, "benchmark")

for name, interval := range map[string]time.Duration{
"no-flush": 0,
"flush-10ms": 10 * time.Millisecond,
"flush-100ms": 100 * time.Millisecond,
"flush-1s": time.Second,
"flush-10s": 10 * time.Second,
} {
filepath := tempFile(b)
exporter, err := New(WithPath(filepath), WithFlushInterval(interval))
require.NoError(b, err, "must not error when calling New()")

b.Run(fmt.Sprintf("%s/%d-logs", name, logCount), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
if err := exporter.Export(context.Background(), records); err != nil {
b.Fatalf("failed to export records: %v", err)
}
}
})

if err := exporter.Shutdown(context.Background()); err != nil {
b.Fatalf("failed to shutdown exporter: %v", err)
}
}
}
}
37 changes: 37 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile

go 1.21

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/log v0.4.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/log v0.4.0
go.opentelemetry.io/otel/trace v1.28.0
go.opentelemetry.io/proto/otlp v1.3.1
google.golang.org/protobuf v1.34.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
golang.org/x/sys v0.22.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/otel => ../../../..

replace go.opentelemetry.io/otel/sdk/log => ../../../../sdk/log

replace go.opentelemetry.io/otel/sdk => ../../../../sdk

replace go.opentelemetry.io/otel/log => ../../../../log

replace go.opentelemetry.io/otel/trace => ../../../../trace

replace go.opentelemetry.io/otel/metric => ../../../../metric
25 changes: 25 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
9 changes: 9 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/internal/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal"

//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/attr_test.go.tmpl "--data={}" --out=transform/attr_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log.go.tmpl "--data={}" --out=transform/log.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log_attr_test.go.tmpl "--data={}" --out=transform/log_attr_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log_test.go.tmpl "--data={}" --out=transform/log_test.go
Loading

0 comments on commit 1f72ae3

Please sign in to comment.