diff --git a/agent/agent.go b/agent/agent.go index ae75c340..3d69816e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ type ( recorder *SpanRecorder recorderFilename string flushFrequency time.Duration + concurrencyLevel int optionalRecorders []tracer.SpanRecorder @@ -64,8 +65,8 @@ type ( var ( version = "0.1.16-pre2" - testingModeFrequency = time.Second - nonTestingModeFrequency = time.Minute + testingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequencyInTestMode.Value) * time.Millisecond + nonTestingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequency.Value) * time.Millisecond ) func WithApiKey(apiKey string) Option { @@ -201,6 +202,7 @@ func NewAgent(options ...Option) (*Agent, error) { agent.userAgent = fmt.Sprintf("scope-agent-go/%s", agent.version) agent.panicAsFail = false agent.failRetriesCount = 0 + agent.concurrencyLevel = env.ScopeTracerDispatcherConcurrencyLevel.Value for _, opt := range options { opt(agent) diff --git a/agent/recorder.go b/agent/recorder.go index 400e085a..d8087c83 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -48,6 +48,10 @@ type ( logger *log.Logger stats *RecorderStats statsOnce sync.Once + + concurrencyLevel int + workerJobs chan *workerJob + workerResults chan *workerResult } RecorderStats struct { totalSpans int64 @@ -66,6 +70,18 @@ type ( PayloadSpan map[string]interface{} PayloadEvent map[string]interface{} + + workerJob struct { + spans []PayloadSpan + totalSpans int + events []PayloadEvent + totalEvents int + } + workerResult struct { + workerId int + error error + shouldExit bool + } ) func NewSpanRecorder(agent *Agent) *SpanRecorder { @@ -79,9 +95,19 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.metadata = agent.metadata r.logger = agent.logger r.flushFrequency = agent.flushFrequency + r.concurrencyLevel = agent.concurrencyLevel r.url = agent.getUrl("api/agent/ingest") r.client = &http.Client{} r.stats = &RecorderStats{} + r.logger.Printf("recorder frequency: %v", agent.flushFrequency) + r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel) + + // start workers + r.workerJobs = make(chan *workerJob, r.concurrencyLevel) + r.workerResults = make(chan *workerResult, r.concurrencyLevel) + for i := 0; i < r.concurrencyLevel; i++ { + go r.worker(i + 1) + } r.t.Go(r.loop) return r } @@ -144,51 +170,104 @@ func (r *SpanRecorder) sendSpans() (error, bool) { atomic.AddInt64(&r.stats.sendSpansCalls, 1) const batchSize = 1000 var lastError error + var jobs int for { spans, spMore, spTotal := r.popPayloadSpan(batchSize) events, evMore, evTotal := r.popPayloadEvents(batchSize) - payload := map[string]interface{}{ - "metadata": r.metadata, - "spans": spans, - "events": events, - tags.AgentID: r.agentId, + r.workerJobs <- &workerJob{ + spans: spans, + totalSpans: spTotal, + events: events, + totalEvents: evTotal, } - buf, err := encodePayload(payload) - if err != nil { - atomic.AddInt64(&r.stats.sendSpansKo, 1) - atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans))) - return err, false - } - - var testSpans int64 - for _, span := range spans { - if isTestSpan(span) { - testSpans++ + jobs++ + + if len(r.workerResults) > 0 { + // We check if a previous result to check if we need to cancel all + result := <-r.workerResults + lastError = result.error + jobs-- + if result.shouldExit { + r.logger.Printf("worker %d: received a should exit response", result.workerId) + for i := 0; i < jobs; i++ { + <-r.workerResults + } + return result.error, result.shouldExit } } - r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal) - statusCode, err := r.callIngest(buf) - if err != nil { - atomic.AddInt64(&r.stats.sendSpansKo, 1) - atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans))) - atomic.AddInt64(&r.stats.testSpansNotSent, testSpans) - } else { - atomic.AddInt64(&r.stats.sendSpansOk, 1) - atomic.AddInt64(&r.stats.spansSent, int64(len(spans))) - atomic.AddInt64(&r.stats.testSpansSent, testSpans) + if !spMore && !evMore { + break } - if statusCode == 401 { - return err, true + } + shouldExit := false + for i := 0; i < jobs; i++ { + result := <-r.workerResults + lastError = result.error + if result.shouldExit { + r.logger.Printf("worker %d: received a should exit response", result.workerId) + shouldExit = true } - lastError = err + } + return lastError, shouldExit +} - if !spMore && !evMore { - break +func (r *SpanRecorder) worker(id int) { + for { + select { + case j, ok := <-r.workerJobs: + if !ok { + if r.debugMode { + r.logger.Printf("exiting from worker: %d", id) + } + return + } + + payload := map[string]interface{}{ + "metadata": r.metadata, + "spans": j.spans, + "events": j.events, + tags.AgentID: r.agentId, + } + + buf, err := encodePayload(payload) + if err != nil { + atomic.AddInt64(&r.stats.sendSpansKo, 1) + atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans))) + r.workerResults <- &workerResult{ + workerId: id, + error: err, + shouldExit: false, + } + continue + } + + var testSpans int64 + for _, span := range j.spans { + if isTestSpan(span) { + testSpans++ + } + } + + r.logger.Printf("worker %d: sending %d/%d spans with %d/%d events", id, len(j.spans), j.totalSpans, len(j.events), j.totalEvents) + statusCode, err := r.callIngest(buf) + if err != nil { + atomic.AddInt64(&r.stats.sendSpansKo, 1) + atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans))) + atomic.AddInt64(&r.stats.testSpansNotSent, testSpans) + } else { + atomic.AddInt64(&r.stats.sendSpansOk, 1) + atomic.AddInt64(&r.stats.spansSent, int64(len(j.spans))) + atomic.AddInt64(&r.stats.testSpansSent, testSpans) + } + r.workerResults <- &workerResult{ + workerId: id, + error: err, + shouldExit: statusCode == 401, + } } } - return lastError, false } // Stop recorder @@ -198,6 +277,8 @@ func (r *SpanRecorder) Stop() { } r.t.Kill(nil) _ = r.t.Wait() + close(r.workerJobs) + close(r.workerResults) if r.debugMode { r.writeStats() } diff --git a/env/vars.go b/env/vars.go index bd726a20..3804a2c2 100644 --- a/env/vars.go +++ b/env/vars.go @@ -3,24 +3,27 @@ package env import "go.undefinedlabs.com/scopeagent/tags" var ( - ScopeDsn = newStringEnvVar("", "SCOPE_DSN") - ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY") - ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT") - ScopeService = newStringEnvVar("default", "SCOPE_SERVICE") - ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY") - ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA") - ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH") - ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT") - ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH") - ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG") - ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER") - ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE") - ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES") - ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL") - ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION") - ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA") - ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS") - ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE") - ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES") - ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE") + ScopeDsn = newStringEnvVar("", "SCOPE_DSN") + ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY") + ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT") + ScopeService = newStringEnvVar("default", "SCOPE_SERVICE") + ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY") + ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA") + ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH") + ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT") + ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH") + ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG") + ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER") + ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE") + ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES") + ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL") + ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION") + ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA") + ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS") + ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE") + ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES") + ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE") + ScopeTracerDispatcherHealthcheckFrequency = newIntEnvVar(60000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY") + ScopeTracerDispatcherHealthcheckFrequencyInTestMode = newIntEnvVar(1000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY_IN_TESTMODE") + ScopeTracerDispatcherConcurrencyLevel = newIntEnvVar(5, "SCOPE_TRACER_DISPATCHER_CONCURRENCY_LEVEL") )