Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated worker.go and config.go of the load generator to simulate tra… #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 86 additions & 80 deletions tracegen/internal/tracegen/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,40 @@
package tracegen // import "github.com/open-telemetry/opentelemetry-collector-contrib/tracegen/internal/tracegen"

import (
"flag"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"flag"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/zap"
"golang.org/x/time/rate"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Config describes the test scenario.
type Config struct {
WorkerCount int
NumTraces int
PropagateContext bool
Rate int64
TotalDuration time.Duration
ServiceName string

// OTLP config
Endpoint string
Insecure bool
UseHTTP bool
Headers HeaderValue
WorkerCount int
NumTraces int
PropagateContext bool
Rate int64
TotalDuration time.Duration
ServiceName string

// OTLP config
Endpoint string
Insecure bool
UseHTTP bool
Headers HeaderValue

// Other config
TotalTraceTypes int
Exp *otlptrace.Exporter
}

Expand All @@ -55,38 +58,40 @@ type HeaderValue map[string]string
var _ flag.Value = (*HeaderValue)(nil)

func (v *HeaderValue) String() string {
return ""
return ""
}

func (v *HeaderValue) Set(s string) error {
kv := strings.SplitN(s, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("value should be of the format key=value")
}
(*v)[kv[0]] = kv[1]
return nil
kv := strings.SplitN(s, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("value should be of the format key=value")
}
(*v)[kv[0]] = kv[1]
return nil
}

// Flags registers config flags.
func (c *Config) Flags(fs *flag.FlagSet) {
fs.IntVar(&c.WorkerCount, "workers", 1, "Number of workers (goroutines) to run")
fs.IntVar(&c.NumTraces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided")
fs.BoolVar(&c.PropagateContext, "marshal", false, "Whether to marshal trace context via HTTP headers")
fs.Int64Var(&c.Rate, "rate", 0, "Approximately how many traces per second each worker should generate. Zero means no throttling.")
fs.DurationVar(&c.TotalDuration, "duration", 0, "For how long to run the test")
fs.StringVar(&c.ServiceName, "service", "tracegen", "Service name to use")

// unfortunately, at this moment, the otel-go client doesn't support configuring OTLP via env vars
fs.StringVar(&c.Endpoint, "otlp-endpoint", "localhost:4317", "Target to which the exporter is going to send spans or metrics. This MAY be configured to include a path (e.g. example.com/v1/traces)")
fs.BoolVar(&c.Insecure, "otlp-insecure", false, "Whether to enable client transport security for the exporter's grpc or http connection")
fs.BoolVar(&c.UseHTTP, "otlp-http", false, "Whether to use HTTP exporter rather than a gRPC one")

// custom headers
c.Headers = make(map[string]string)
fs.Var(&c.Headers, "otlp-header", "Custom header to be passed along with each OTLP request. The value is expected in the format key=value."+
"Flag may be repeated to set multiple headers (e.g -otlp-header key1=value1 -otlp-header key2=value2)")
fs.IntVar(&c.WorkerCount, "workers", 1, "Number of workers (goroutines) to run")
fs.IntVar(&c.NumTraces, "traces", 5, "Number of traces to generate in each worker (ignored if duration is provided")
fs.IntVar(&c.TotalTraceTypes, "types", 5, "Number of trace types to generate in each worker")
fs.BoolVar(&c.PropagateContext, "marshal", false, "Whether to marshal trace context via HTTP headers")
fs.Int64Var(&c.Rate, "rate", 0, "Approximately how many traces per second each worker should generate. Zero means no throttling.")
fs.DurationVar(&c.TotalDuration, "duration", 0, "For how long to run the test")
fs.StringVar(&c.ServiceName, "service", "tracegen", "Service name to use")

// unfortunately, at this moment, the otel-go client doesn't support configuring OTLP via env vars
fs.StringVar(&c.Endpoint, "otlp-endpoint", "localhost:4317", "Target to which the exporter is going to send spans or metrics. This MAY be configured to include a path (e.g. example.com/v1/traces)")
fs.BoolVar(&c.Insecure, "otlp-insecure", false, "Whether to enable client transport security for the exporter's grpc or http connection")
fs.BoolVar(&c.UseHTTP, "otlp-http", false, "Whether to use HTTP exporter rather than a gRPC one")

// custom headers
c.Headers = make(map[string]string)
fs.Var(&c.Headers, "otlp-header", "Custom header to be passed along with each OTLP request. The value is expected in the format key=value."+
"Flag may be repeated to set multiple headers (e.g -otlp-header key1=value1 -otlp-header key2=value2)")
}


// Run executes the test scenario.
func Run(c *Config, logger *zap.Logger) error {
// TODO: make this not hard-coded
Expand All @@ -105,41 +110,42 @@ func Run(c *Config, logger *zap.Logger) error {
}
otel.SetTracerProvider(tracerProviders[0])

if c.TotalDuration > 0 {
c.NumTraces = 0
} else if c.NumTraces <= 0 {
return fmt.Errorf("either `traces` or `duration` must be greater than 0")
}

limit := rate.Limit(c.Rate)
if c.Rate == 0 {
limit = rate.Inf
logger.Info("generation of traces isn't being throttled")
} else {
logger.Info("generation of traces is limited", zap.Float64("per-second", float64(limit)))
}

wg := sync.WaitGroup{}
var running uint32 = 1
for i := 0; i < c.WorkerCount; i++ {
wg.Add(1)
w := worker{
numTraces: c.NumTraces,
propagateContext: c.PropagateContext,
limitPerSecond: limit,
totalDuration: c.TotalDuration,
running: &running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
if c.TotalDuration > 0 {
c.NumTraces = 0
} else if c.NumTraces <= 0 {
return fmt.Errorf("either `traces` or `duration` must be greater than 0")
}

limit := rate.Limit(c.Rate)
if c.Rate == 0 {
limit = rate.Inf
logger.Info("generation of traces isn't being throttled")
} else {
logger.Info("generation of traces is limited", zap.Float64("per-second", float64(limit)))
}

wg := sync.WaitGroup{}
var running uint32 = 1
for i := 0; i < c.WorkerCount; i++ {
wg.Add(1)
w := worker{
numTraces: c.NumTraces,
propagateContext: c.PropagateContext,
limitPerSecond: limit,
totalDuration: c.TotalDuration,
running: &running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
traceTypes: c.TotalTraceTypes,
serviceNames: services,
tracerProviders: tracerProviders,
}

go w.simulateTraces()
}
if c.TotalDuration > 0 {
time.Sleep(c.TotalDuration)
atomic.StoreUint32(&running, 0)
}
wg.Wait()
return nil
}
}
go w.simulateTraces()
}
if c.TotalDuration > 0 {
time.Sleep(c.TotalDuration)
atomic.StoreUint32(&running, 0)
}
wg.Wait()
return nil
}
Loading