From 91d99cc928edb880a797e856174cafc784f10397 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:32:04 +0100 Subject: [PATCH 1/9] recorder concurrency support using channels. --- agent/agent.go | 6 +- agent/recorder.go | 146 ++++++++++++++++++++++++++++++++++++---------- env/vars.go | 43 +++++++------- 3 files changed, 141 insertions(+), 54 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ae75c340..3d69816e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ type ( recorder *SpanRecorder recorderFilename string flushFrequency time.Duration + concurrencyLevel int optionalRecorders []tracer.SpanRecorder @@ -64,8 +65,8 @@ type ( var ( version = "0.1.16-pre2" - testingModeFrequency = time.Second - nonTestingModeFrequency = time.Minute + testingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequencyInTestMode.Value) * time.Millisecond + nonTestingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequency.Value) * time.Millisecond ) func WithApiKey(apiKey string) Option { @@ -201,6 +202,7 @@ func NewAgent(options ...Option) (*Agent, error) { agent.userAgent = fmt.Sprintf("scope-agent-go/%s", agent.version) agent.panicAsFail = false agent.failRetriesCount = 0 + agent.concurrencyLevel = env.ScopeTracerDispatcherConcurrencyLevel.Value for _, opt := range options { opt(agent) diff --git a/agent/recorder.go b/agent/recorder.go index 400e085a..b19176d8 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -48,6 +48,10 @@ type ( logger *log.Logger stats *RecorderStats statsOnce sync.Once + + concurrencyLevel int + workerJobs chan *workerJob + workerResults chan *workerResult } RecorderStats struct { totalSpans int64 @@ -66,6 +70,18 @@ type ( PayloadSpan map[string]interface{} PayloadEvent map[string]interface{} + + workerJob struct { + spans []PayloadSpan + totalSpans int + events []PayloadEvent + totalEvents int + } + workerResult struct { + workerId int + error error + shouldExit bool + } ) func NewSpanRecorder(agent *Agent) *SpanRecorder { @@ -79,9 +95,12 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.metadata = agent.metadata r.logger = agent.logger r.flushFrequency = agent.flushFrequency + r.concurrencyLevel = agent.concurrencyLevel r.url = agent.getUrl("api/agent/ingest") r.client = &http.Client{} r.stats = &RecorderStats{} + r.logger.Printf("recorder frequency: %v", agent.flushFrequency) + r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel) r.t.Go(r.loop) return r } @@ -103,8 +122,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { func (r *SpanRecorder) loop() error { defer func() { + close(r.workerJobs) + close(r.workerResults) r.logger.Println("recorder has been stopped.") }() + + // start workers + r.workerJobs = make(chan *workerJob, r.concurrencyLevel) + r.workerResults = make(chan *workerResult, r.concurrencyLevel) + for i := 0; i < r.concurrencyLevel; i++ { + go r.worker(i + 1) + } + ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { @@ -144,51 +173,104 @@ func (r *SpanRecorder) sendSpans() (error, bool) { atomic.AddInt64(&r.stats.sendSpansCalls, 1) const batchSize = 1000 var lastError error + var jobs int for { spans, spMore, spTotal := r.popPayloadSpan(batchSize) events, evMore, evTotal := r.popPayloadEvents(batchSize) - payload := map[string]interface{}{ - "metadata": r.metadata, - "spans": spans, - "events": events, - tags.AgentID: r.agentId, + r.workerJobs <- &workerJob{ + spans: spans, + totalSpans: spTotal, + events: events, + totalEvents: evTotal, } - buf, err := encodePayload(payload) - if err != nil { - atomic.AddInt64(&r.stats.sendSpansKo, 1) - atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans))) - return err, false - } - - var testSpans int64 - for _, span := range spans { - if isTestSpan(span) { - testSpans++ + jobs++ + + if len(r.workerResults) > 0 { + // We check if a previous result call the cancellation of the send + result := <-r.workerResults + lastError = result.error + jobs-- + if result.shouldExit { + r.logger.Printf("worker %d: received a should exit response", result.workerId) + for i := 0; i < jobs; i++ { + <-r.workerResults + } + return result.error, result.shouldExit } } - r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal) - statusCode, err := r.callIngest(buf) - if err != nil { - atomic.AddInt64(&r.stats.sendSpansKo, 1) - atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans))) - atomic.AddInt64(&r.stats.testSpansNotSent, testSpans) - } else { - atomic.AddInt64(&r.stats.sendSpansOk, 1) - atomic.AddInt64(&r.stats.spansSent, int64(len(spans))) - atomic.AddInt64(&r.stats.testSpansSent, testSpans) + if !spMore && !evMore { + break } - if statusCode == 401 { - return err, true + } + shouldExit := false + for i := 0; i < jobs; i++ { + result := <-r.workerResults + lastError = result.error + if result.shouldExit { + r.logger.Printf("worker %d: received a should exit response", result.workerId) + shouldExit = true } - lastError = err + } + return lastError, shouldExit +} - if !spMore && !evMore { - break +func (r *SpanRecorder) worker(id int) { + for { + select { + case j, ok := <-r.workerJobs: + if !ok { + if r.debugMode { + r.logger.Printf("exiting from worker: %d", id) + } + return + } + + payload := map[string]interface{}{ + "metadata": r.metadata, + "spans": j.spans, + "events": j.events, + tags.AgentID: r.agentId, + } + + buf, err := encodePayload(payload) + if err != nil { + atomic.AddInt64(&r.stats.sendSpansKo, 1) + atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans))) + r.workerResults <- &workerResult{ + workerId: id, + error: err, + shouldExit: false, + } + continue + } + + var testSpans int64 + for _, span := range j.spans { + if isTestSpan(span) { + testSpans++ + } + } + + r.logger.Printf("worker %d: sending %d/%d spans with %d/%d events", id, len(j.spans), j.totalSpans, len(j.events), j.totalEvents) + statusCode, err := r.callIngest(buf) + if err != nil { + atomic.AddInt64(&r.stats.sendSpansKo, 1) + atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans))) + atomic.AddInt64(&r.stats.testSpansNotSent, testSpans) + } else { + atomic.AddInt64(&r.stats.sendSpansOk, 1) + atomic.AddInt64(&r.stats.spansSent, int64(len(j.spans))) + atomic.AddInt64(&r.stats.testSpansSent, testSpans) + } + r.workerResults <- &workerResult{ + workerId: id, + error: err, + shouldExit: statusCode == 401, + } } } - return lastError, false } // Stop recorder diff --git a/env/vars.go b/env/vars.go index bd726a20..3804a2c2 100644 --- a/env/vars.go +++ b/env/vars.go @@ -3,24 +3,27 @@ package env import "go.undefinedlabs.com/scopeagent/tags" var ( - ScopeDsn = newStringEnvVar("", "SCOPE_DSN") - ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY") - ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT") - ScopeService = newStringEnvVar("default", "SCOPE_SERVICE") - ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY") - ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA") - ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH") - ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT") - ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH") - ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG") - ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER") - ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE") - ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES") - ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL") - ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION") - ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA") - ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS") - ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE") - ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES") - ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE") + ScopeDsn = newStringEnvVar("", "SCOPE_DSN") + ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY") + ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT") + ScopeService = newStringEnvVar("default", "SCOPE_SERVICE") + ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY") + ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA") + ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH") + ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT") + ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH") + ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG") + ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER") + ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE") + ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES") + ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL") + ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION") + ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA") + ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS") + ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE") + ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES") + ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE") + ScopeTracerDispatcherHealthcheckFrequency = newIntEnvVar(60000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY") + ScopeTracerDispatcherHealthcheckFrequencyInTestMode = newIntEnvVar(1000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY_IN_TESTMODE") + ScopeTracerDispatcherConcurrencyLevel = newIntEnvVar(5, "SCOPE_TRACER_DISPATCHER_CONCURRENCY_LEVEL") ) From fe6b852342a7fb4e7c56e0d7281daadd0b219af7 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:42:44 +0100 Subject: [PATCH 2/9] create channel outside the tomb --- agent/recorder.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index b19176d8..8ecdf1ba 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -101,6 +101,13 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.stats = &RecorderStats{} r.logger.Printf("recorder frequency: %v", agent.flushFrequency) r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel) + + // start workers + r.workerJobs = make(chan *workerJob, r.concurrencyLevel) + r.workerResults = make(chan *workerResult, r.concurrencyLevel) + for i := 0; i < r.concurrencyLevel; i++ { + go r.worker(i + 1) + } r.t.Go(r.loop) return r } @@ -122,18 +129,8 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { func (r *SpanRecorder) loop() error { defer func() { - close(r.workerJobs) - close(r.workerResults) r.logger.Println("recorder has been stopped.") }() - - // start workers - r.workerJobs = make(chan *workerJob, r.concurrencyLevel) - r.workerResults = make(chan *workerResult, r.concurrencyLevel) - for i := 0; i < r.concurrencyLevel; i++ { - go r.worker(i + 1) - } - ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { @@ -280,6 +277,8 @@ func (r *SpanRecorder) Stop() { } r.t.Kill(nil) _ = r.t.Wait() + close(r.workerJobs) + close(r.workerResults) if r.debugMode { r.writeStats() } From b8dcd84c8d1e49f2ff44320fd35f1c22d5931301 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:47:33 +0100 Subject: [PATCH 3/9] go.mod fix --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index a990d6d7..0dd1d56e 100644 --- a/go.mod +++ b/go.mod @@ -22,3 +22,4 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) + From b1953c6f40ec92cc2cf1cf0da9ede0697b833c1d Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 14:45:35 +0100 Subject: [PATCH 4/9] go mod tidy --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 0dd1d56e..a990d6d7 100644 --- a/go.mod +++ b/go.mod @@ -22,4 +22,3 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) - From 13bbbe24f6a3723f4df2a7f12d24d07f03eb475c Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 17:46:59 +0100 Subject: [PATCH 5/9] fix comment --- agent/recorder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/recorder.go b/agent/recorder.go index 8ecdf1ba..d8087c83 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -184,7 +184,7 @@ func (r *SpanRecorder) sendSpans() (error, bool) { jobs++ if len(r.workerResults) > 0 { - // We check if a previous result call the cancellation of the send + // We check if a previous result to check if we need to cancel all result := <-r.workerResults lastError = result.error jobs-- From 3a4a84c9f4e97bf717498cfc25e8d555ee40a110 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:32:04 +0100 Subject: [PATCH 6/9] recorder concurrency support using channels. --- agent/recorder.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/agent/recorder.go b/agent/recorder.go index d8087c83..d5a9f823 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -101,7 +101,6 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.stats = &RecorderStats{} r.logger.Printf("recorder frequency: %v", agent.flushFrequency) r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel) - // start workers r.workerJobs = make(chan *workerJob, r.concurrencyLevel) r.workerResults = make(chan *workerResult, r.concurrencyLevel) @@ -129,8 +128,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { func (r *SpanRecorder) loop() error { defer func() { + close(r.workerJobs) + close(r.workerResults) r.logger.Println("recorder has been stopped.") }() + + // start workers + r.workerJobs = make(chan *workerJob, r.concurrencyLevel) + r.workerResults = make(chan *workerResult, r.concurrencyLevel) + for i := 0; i < r.concurrencyLevel; i++ { + go r.worker(i + 1) + } + ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { From b1a1ae39d20a3d3ec720b7431fed0077f8c5f767 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:42:44 +0100 Subject: [PATCH 7/9] create channel outside the tomb --- agent/recorder.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index d5a9f823..d8087c83 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -101,6 +101,7 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.stats = &RecorderStats{} r.logger.Printf("recorder frequency: %v", agent.flushFrequency) r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel) + // start workers r.workerJobs = make(chan *workerJob, r.concurrencyLevel) r.workerResults = make(chan *workerResult, r.concurrencyLevel) @@ -128,18 +129,8 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { func (r *SpanRecorder) loop() error { defer func() { - close(r.workerJobs) - close(r.workerResults) r.logger.Println("recorder has been stopped.") }() - - // start workers - r.workerJobs = make(chan *workerJob, r.concurrencyLevel) - r.workerResults = make(chan *workerResult, r.concurrencyLevel) - for i := 0; i < r.concurrencyLevel; i++ { - go r.worker(i + 1) - } - ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { From a795f7b5aed235826123fa097ef3eea6757e7cff Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 10:47:33 +0100 Subject: [PATCH 8/9] go.mod fix --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index a990d6d7..0dd1d56e 100644 --- a/go.mod +++ b/go.mod @@ -22,3 +22,4 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) + From 15e97fd1c516eadc52a7708da1a83cfe5e630ebb Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 17 Mar 2020 14:45:35 +0100 Subject: [PATCH 9/9] go mod tidy --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 0dd1d56e..a990d6d7 100644 --- a/go.mod +++ b/go.mod @@ -22,4 +22,3 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) -