Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prototype] Auto Instrumentation #1540

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions apps/autoinstrumentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package apps

import (
"context"
"fmt"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
)

type ReceiverAutoInstrumentation struct {
confgenerator.ConfigComponent `yaml:",inline"`
GenerateToPath string `yaml:"generate_to_path" validate:"required"`
GenerationType string `yaml:"generation_type" validate:"required"`
TargetReceiver string `yaml:"target_receiver" validate:"required"`
MetricsEnabled bool `yaml:"metrics_enabled"`
TraceEnabled bool `yaml:"trace_enabled"`
ServiceName string `yaml:"-"`
Endpoint string `yaml:"-"`
}

func (r *ReceiverAutoInstrumentation) Pipelines(_ context.Context) []otel.ReceiverPipeline {
return nil
}

func (r *ReceiverAutoInstrumentation) Type() string {
return "autoinstrumentation"
}

func (r *ReceiverAutoInstrumentation) validateTargetReceiver(uc confgenerator.UnifiedConfig) bool {
if _, ok := uc.Combined.Receivers[r.TargetReceiver]; ok {
return true
}
return false
}

func (r *ReceiverAutoInstrumentation) GenerateConfig() (map[string]interface{}, error) {
config := map[string]interface{}{
"otel.traces.exporter": "otlp",
"otel.metrics.exporter": "otlp",
"otel.logs.exporter": "none",
"otel.exporter.otlp.traces.endpoint": fmt.Sprintf("http://%s", defaultGRPCEndpoint),
"otel.exporter.otlp.metrics.endpoint": fmt.Sprintf("http://%s", defaultGRPCEndpoint),
"otel.service.name": r.ServiceName,
}

if r.Endpoint != "" {
endpoint := fmt.Sprintf("http://%s", r.Endpoint)
config["otel.exporter.otlp.traces.endpoint"] = endpoint
config["otel.exporter.otlp.metrics.endpoint"] = endpoint
}

return config, nil
}

func (r *ReceiverAutoInstrumentation) GetTargetReceiver() string {
return r.TargetReceiver
}

func (r *ReceiverAutoInstrumentation) GetGenerationType() string {
return r.GenerationType
}

func (r *ReceiverAutoInstrumentation) GetGenerateToPath() string {
return r.GenerateToPath
}

func (r *ReceiverAutoInstrumentation) SetServiceName(name string) {
r.ServiceName = name
}

func (r *ReceiverAutoInstrumentation) SetEndpoint(endpoint string) {
r.Endpoint = endpoint
}

func init() {
confgenerator.CombinedReceiverTypes.RegisterType(func() confgenerator.CombinedReceiver { return &ReceiverAutoInstrumentation{} })
}
4 changes: 4 additions & 0 deletions apps/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (r ReceiverOTLP) Pipelines(_ context.Context) []otel.ReceiverPipeline {
}}
}

func (r ReceiverOTLP) GetEndpoint() string {
return r.GRPCEndpoint
}

func init() {
confgenerator.CombinedReceiverTypes.RegisterType(func() confgenerator.CombinedReceiver { return &ReceiverOTLP{} })
}
79 changes: 79 additions & 0 deletions confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"log"
"path"
"regexp"
"sort"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
"github.com/GoogleCloudPlatform/ops-agent/internal/set"
"github.com/gookit/properties"
)

func googleCloudExporter(userAgent string, instrumentationLabels bool) otel.Component {
Expand Down Expand Up @@ -87,6 +90,7 @@ func googleManagedPrometheusExporter(userAgent string) otel.Component {
}

func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context) (string, error) {
log.Println("Generating Otel Config")
p := platform.FromContext(ctx)
userAgent, _ := p.UserAgent("Google-Cloud-Ops-Agent-Metrics")
metricVersionLabel, _ := p.VersionLabel("google-cloud-ops-agent-metrics")
Expand Down Expand Up @@ -141,11 +145,54 @@ func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context) (string, error)
return otelConfig, nil
}

type AutoInstrumentationConfig struct {
GeneratedFile []byte
FileLocation string
}

func (uc *UnifiedConfig) GenerateAutoInstrumentationConfigs() ([]AutoInstrumentationConfig, error) {
autoComps := uc.AutoInstrumentationComponents()
var configs []AutoInstrumentationConfig
for _, comp := range autoComps {
var conf map[string]interface{}
conf, err := comp.GenerateConfig()
if err != nil {
return nil, err
}
autoConfig, err := marshalAutoInstrumentationByType(conf, comp.GetGenerationType())
if err != nil {
return nil, err
}
config := AutoInstrumentationConfig{
GeneratedFile: autoConfig,
FileLocation: comp.GetGenerateToPath(),
}
configs = append(configs, config)
}
return configs, nil
}

func marshalAutoInstrumentationByType(conf map[string]interface{}, generationType string) ([]byte, error) {
switch generationType {
case "properties":
{
c, err := properties.Marshal(&conf)
if err != nil {
return nil, err
}
return c, nil
}
default:
return nil, fmt.Errorf("unknown autoinstrumentation generation_type")
}
}

// generateOtelPipelines generates a map of OTel pipeline names to OTel pipelines.
func (uc *UnifiedConfig) generateOtelPipelines(ctx context.Context) (map[string]otel.ReceiverPipeline, map[string]otel.Pipeline, error) {
m := uc.Metrics
outR := make(map[string]otel.ReceiverPipeline)
outP := make(map[string]otel.Pipeline)
otlpReceiverIDsWithAutoInstrumentation := set.Set[string]{}
addReceiver := func(pipelineType, pID, rID string, receiver OTelReceiver, processorIDs []string) error {
for i, receiverPipeline := range receiver.Pipelines(ctx) {
receiverPipelineName := strings.ReplaceAll(rID, "_", "__")
Expand All @@ -159,6 +206,17 @@ func (uc *UnifiedConfig) generateOtelPipelines(ctx context.Context) (map[string]
prefix = fmt.Sprintf("%s_%s", pipelineType, prefix)
}

if receiver.Type() == "otlp" {
if _, ok := otlpReceiverIDsWithAutoInstrumentation[rID]; ok {
receiverPipeline.Processors["metrics"] = append(
receiverPipeline.Processors["metrics"],
otel.TransformationMetrics(
otel.FlattenResourceAttribute("service.name", "service_name"),
),
)
}
}

outR[receiverPipelineName] = receiverPipeline

pipeline := otel.Pipeline{
Expand All @@ -180,6 +238,7 @@ func (uc *UnifiedConfig) generateOtelPipelines(ctx context.Context) (map[string]
return fmt.Errorf("processor %q not found", pID)
}
pipeline.Processors = append(pipeline.Processors, processor.Processors()...)

}
outP[prefix] = pipeline
}
Expand All @@ -190,6 +249,26 @@ func (uc *UnifiedConfig) generateOtelPipelines(ctx context.Context) (map[string]
if err != nil {
return nil, nil, err
}

// set AutoInstrumentation receivers service name
for name, receiver := range receivers {
if aiReceiver, ok := receiver.(AutoInstrumentationComponent); ok {
targetReceiver := aiReceiver.GetTargetReceiver()
aiReceiver.SetServiceName(name)
r, exists := receivers[targetReceiver]
if exists {
otlpReceiverIDsWithAutoInstrumentation.Add(targetReceiver)
} else {
return nil, nil, fmt.Errorf("cannot find autoinstrumentation target receiver: %s, please make sure there is an otlp receiver with the same name", targetReceiver)
}
if aicc, ok := r.(AutoInstrumentationCollectorComponent); ok {
aiReceiver.SetEndpoint(aicc.GetEndpoint())
} else {
return nil, nil, fmt.Errorf("%s is not a valid AutoInstrumentation target receiver", r.Type())
}
}
}

for pID, p := range m.Service.Pipelines {
for _, rID := range p.ReceiverIDs {
receiver, ok := receivers[rID]
Expand Down
11 changes: 11 additions & 0 deletions confgenerator/confgenerator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/fs"
"os"
"path/filepath"
"sort"
"strings"
"testing"

Expand Down Expand Up @@ -215,6 +216,16 @@ func generateConfigs(pc platformConfig, testDir string) (got map[string]string,
}
got["otel.yaml"] = otelGeneratedConfig

autoInstrumentationConfigs, err := uc.GenerateAutoInstrumentationConfigs()
if err != nil {
return
}
for _, config := range autoInstrumentationConfigs {
lines := strings.Split(string(config.GeneratedFile), "\n")
sort.Strings(lines)
got[config.FileLocation] = strings.TrimSpace(strings.Join(lines, "\n"))
}

inputBytes, err := os.ReadFile(filepath.Join("testdata", testDir, inputFileName))

userConf, err := confgenerator.UnmarshalYamlToUnifiedConfig(ctx, inputBytes)
Expand Down
29 changes: 29 additions & 0 deletions confgenerator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,18 @@ func (uc *UnifiedConfig) ValidateCombined() error {
return nil
}

func (uc *UnifiedConfig) AutoInstrumentationComponents() []AutoInstrumentationComponent {
var comps []AutoInstrumentationComponent
if uc.Combined != nil {
for _, v := range uc.Combined.Receivers {
if c, ok := v.(AutoInstrumentationComponent); ok {
comps = append(comps, c)
}
}
}
return comps
}

func (uc *UnifiedConfig) MetricsReceivers() (map[string]MetricsReceiver, error) {
validReceivers := map[string]MetricsReceiver{}
for k, v := range uc.Metrics.Receivers {
Expand Down Expand Up @@ -1137,3 +1149,20 @@ func validateSSLConfig(receivers metricsReceiverMap, ctx context.Context) error
func parameterErrorPrefix(subagent string, kind string, id string, componentType string, parameter string) string {
return fmt.Sprintf(`parameter %q in %q type %s %s %q`, parameter, componentType, subagent, kind, id)
}

type ComponentProperties interface {
Properties() ([]byte, string, error)
}

// AutoInstrumentationComponent cannot cast to AutoInstrumentationReceiver because of cyclical dependency
type AutoInstrumentationComponent interface {
GetTargetReceiver() string
SetServiceName(name string)
GenerateConfig() (map[string]interface{}, error)
GetGenerationType() string
GetGenerateToPath() string
SetEndpoint(endpoint string)
}
type AutoInstrumentationCollectorComponent interface {
GetEndpoint() string
}
11 changes: 11 additions & 0 deletions confgenerator/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ func (uc *UnifiedConfig) GenerateFilesFromConfig(ctx context.Context, service, l
if err = WriteConfigFile([]byte(otelConfig), filepath.Join(outDir, "otel.yaml")); err != nil {
return err
}

configs, err := uc.GenerateAutoInstrumentationConfigs()
if err != nil {
return err
}
for _, config := range configs {
if err = WriteConfigFile(config.GeneratedFile, config.FileLocation); err != nil {
return err
}
}

default:
return fmt.Errorf("unknown service %q", service)
}
Expand Down
3 changes: 3 additions & 0 deletions confgenerator/testdata/feature/golden.csv
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ App,Field,Override,
*apps.MetricsReceiverVault,confgenerator.MetricsReceiverSharedTLS.InsecureSkipVerify,
*apps.MetricsReceiverWildfly,confgenerator.ConfigComponent.Type,
*apps.MetricsReceiverZookeeper,confgenerator.ConfigComponent.Type,
*apps.ReceiverAutoInstrumentation,MetricsEnabled,
*apps.ReceiverAutoInstrumentation,TraceEnabled,
*apps.ReceiverAutoInstrumentation,confgenerator.ConfigComponent.Type,
*apps.ReceiverOTLP,GRPCEndpoint,endpoint
*apps.ReceiverOTLP,MetricsMode,
*apps.ReceiverOTLP,confgenerator.ConfigComponent.Type,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

function process(tag, timestamp, record)
local __field_0 = (function()
return record["severity"]
end)();
(function(value)
record["severity"] = value
end)(nil);
local v = __field_0;
if v == "debug" then v = "DEBUG"
elseif v == "error" then v = "ERROR"
elseif v == "info" then v = "INFO"
elseif v == "warn" then v = "WARNING"
end
(function(value)
record["logging.googleapis.com/severity"] = value
end)(v)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

function shallow_merge(record, parsedRecord)
-- If no exiting record exists
if (record == nil) then
return parsedRecord
end

for k, v in pairs(parsedRecord) do
record[k] = v
end

return record
end

function merge(record, parsedRecord)
-- If no exiting record exists
if record == nil then
return parsedRecord
end

-- Potentially overwrite or merge the original records.
for k, v in pairs(parsedRecord) do
-- If there is no conflict
if k == "logging.googleapis.com/logName" then
-- Ignore the parsed payload since the logName is controlled
-- by the OpsAgent.
elseif k == "logging.googleapis.com/labels" then
-- LogEntry.labels are basically a map[string]string and so only require a
-- shallow merge (one level deep merge).
record[k] = shallow_merge(record[k], v)
else
record[k] = v
end
end

return record
end

function parser_merge_record(tag, timestamp, record)
originalPayload = record["logging.googleapis.com/__tmp"]
if originalPayload == nil then
return 0, timestamp, record
end

-- Remove original payload
record["logging.googleapis.com/__tmp"] = nil
record = merge(originalPayload, record)
return 2, timestamp, record
end
Loading
Loading