Skip to content

Commit

Permalink
[receiver/otlpjsonfile] append token.Attributes to scoped attribute o…
Browse files Browse the repository at this point in the history
…f the signal (#36715)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
It loops through token.Attributes and appends it to the scoped attribute
of the signal

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #36641

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added UT

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
khushijain21 authored Jan 24, 2025
1 parent fc2ad32 commit bb6e0ef
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/receiver-otlpjson-token-attr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otlpjsonfilereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Include file attributes and append it to the log record

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36641]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
50 changes: 50 additions & 0 deletions receiver/otlpjsonfilereceiver/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
Expand Down Expand Up @@ -87,6 +88,17 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura
ctx = obsrecv.StartLogsOp(ctx)
var l plog.Logs
l, err = logsUnmarshaler.UnmarshalLogs(token.Body)
// Appends token.Attributes
for i := 0; i < l.ResourceLogs().Len(); i++ {
resourceLog := l.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
LogRecords := scopeLog.LogRecords().At(k)
appendToMap(token, LogRecords.Attributes())
}
}
}
if err != nil {
obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err)
} else {
Expand Down Expand Up @@ -124,6 +136,17 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config
ctx = obsrecv.StartMetricsOp(ctx)
var m pmetric.Metrics
m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body)
// Appends token.Attributes
for i := 0; i < m.ResourceMetrics().Len(); i++ {
resourceMetric := m.ResourceMetrics().At(i)
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
ScopeMetric := resourceMetric.ScopeMetrics().At(j)
for k := 0; k < ScopeMetric.Metrics().Len(); k++ {
metric := ScopeMetric.Metrics().At(k)
appendToMap(token, metric.Metadata())
}
}
}
if err != nil {
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err)
} else {
Expand Down Expand Up @@ -160,6 +183,17 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu
ctx = obsrecv.StartTracesOp(ctx)
var t ptrace.Traces
t, err = tracesUnmarshaler.UnmarshalTraces(token.Body)
// Appends token.Attributes
for i := 0; i < t.ResourceSpans().Len(); i++ {
resourceSpan := t.ResourceSpans().At(i)
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)
for k := 0; k < scopeSpan.Spans().Len(); k++ {
spans := scopeSpan.Spans().At(k)
appendToMap(token, spans.Attributes())
}
}
}
if err != nil {
obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err)
} else {
Expand All @@ -186,6 +220,7 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi
}
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body)
// TODO Append token.Attributes
if p.ResourceProfiles().Len() != 0 {
_ = profiles.ConsumeProfiles(ctx, p)
}
Expand All @@ -197,3 +232,18 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi

return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
}

func appendToMap(token emit.Token, attr pcommon.Map) {
for key, value := range token.Attributes {
switch v := value.(type) {
case string:
attr.PutStr(key, v)
case int:
attr.PutInt(key, int64(v))
case float64:
attr.PutDouble(key, float64(v))
case bool:
attr.PutBool(key, v)
}
}
}
19 changes: 15 additions & 4 deletions receiver/otlpjsonfilereceiver/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestFileProfilesReceiver(t *testing.T) {
err = receiver.Start(context.Background(), nil)
require.NoError(t, err)

pd := testdata.GenerateProfiles(5)
pd := testdata.GenerateProfiles(1)
marshaler := &pprofile.JSONMarshaler{}
b, err := marshaler.MarshalProfiles(pd)
assert.NoError(t, err)
Expand All @@ -76,7 +76,7 @@ func TestFileTracesReceiver(t *testing.T) {
err = receiver.Start(context.Background(), nil)
require.NoError(t, err)

td := testdata.GenerateTraces(2)
td := testdata.GenerateTraces(1)
marshaler := &ptrace.JSONMarshaler{}
b, err := marshaler.MarshalTraces(td)
assert.NoError(t, err)
Expand All @@ -85,6 +85,9 @@ func TestFileTracesReceiver(t *testing.T) {
assert.NoError(t, err)
time.Sleep(1 * time.Second)

// include_file_name is true by default
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("log.file.name", "traces.json")

require.Len(t, sink.AllTraces(), 1)
assert.EqualValues(t, td, sink.AllTraces()[0])
err = receiver.Shutdown(context.Background())
Expand All @@ -103,7 +106,7 @@ func TestFileMetricsReceiver(t *testing.T) {
err = receiver.Start(context.Background(), nil)
assert.NoError(t, err)

md := testdata.GenerateMetrics(5)
md := testdata.GenerateMetrics(1)
marshaler := &pmetric.JSONMarshaler{}
b, err := marshaler.MarshalMetrics(md)
assert.NoError(t, err)
Expand All @@ -112,6 +115,9 @@ func TestFileMetricsReceiver(t *testing.T) {
assert.NoError(t, err)
time.Sleep(1 * time.Second)

// include_file_name is true by default
md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Metadata().PutStr("log.file.name", "metrics.json")

require.Len(t, sink.AllMetrics(), 1)
assert.EqualValues(t, md, sink.AllMetrics()[0])
err = receiver.Shutdown(context.Background())
Expand All @@ -126,6 +132,7 @@ func TestFileMetricsReceiverWithReplay(t *testing.T) {
cfg.Config.StartAt = "beginning"
cfg.ReplayFile = true
cfg.Config.PollInterval = 5 * time.Second
cfg.IncludeFileName = false

sink := new(consumertest.MetricsSink)
receiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(), cfg, sink)
Expand Down Expand Up @@ -168,7 +175,7 @@ func TestFileLogsReceiver(t *testing.T) {
err = receiver.Start(context.Background(), nil)
assert.NoError(t, err)

ld := testdata.GenerateLogs(5)
ld := testdata.GenerateLogs(1)
marshaler := &plog.JSONMarshaler{}
b, err := marshaler.MarshalLogs(ld)
assert.NoError(t, err)
Expand All @@ -177,6 +184,9 @@ func TestFileLogsReceiver(t *testing.T) {
assert.NoError(t, err)
time.Sleep(1 * time.Second)

// include_file_name is true by default
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("log.file.name", "logs.json")

require.Len(t, sink.AllLogs(), 1)
assert.EqualValues(t, ld, sink.AllLogs()[0])
err = receiver.Shutdown(context.Background())
Expand Down Expand Up @@ -226,6 +236,7 @@ func TestFileMixedSignals(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
cfg.Config.StartAt = "beginning"
cfg.IncludeFileName = false
cs := receivertest.NewNopSettings()
ms := new(consumertest.MetricsSink)
mr, err := factory.CreateMetrics(context.Background(), cs, cfg, ms)
Expand Down

0 comments on commit bb6e0ef

Please sign in to comment.