From 208762a081fada9d114af19ad294c89f8c0c0810 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 11 Oct 2024 15:56:19 +0200 Subject: [PATCH] context propagation: OneShotAcquisition(); enable contextcheck linter --- .golangci.yml | 1 - pkg/acquisition/acquisition.go | 4 ++-- pkg/acquisition/acquisition_test.go | 8 +++---- pkg/acquisition/modules/appsec/appsec.go | 2 +- .../modules/cloudwatch/cloudwatch.go | 21 ++++++++----------- .../modules/cloudwatch/cloudwatch_test.go | 6 ++++-- pkg/acquisition/modules/docker/docker.go | 3 +-- pkg/acquisition/modules/docker/docker_test.go | 4 +++- pkg/acquisition/modules/file/file.go | 2 +- pkg/acquisition/modules/file/file_test.go | 4 +++- .../modules/journalctl/journalctl.go | 10 ++++----- .../modules/journalctl/journalctl_test.go | 4 +++- pkg/acquisition/modules/kafka/kafka.go | 2 +- pkg/acquisition/modules/kinesis/kinesis.go | 2 +- .../modules/kubernetesaudit/k8s_audit.go | 2 +- pkg/acquisition/modules/loki/loki.go | 6 +++--- pkg/acquisition/modules/loki/loki_test.go | 6 +++--- pkg/acquisition/modules/s3/s3.go | 4 ++-- pkg/acquisition/modules/s3/s3_test.go | 3 ++- pkg/acquisition/modules/syslog/syslog.go | 2 +- .../modules/wineventlog/wineventlog.go | 2 +- .../wineventlog/wineventlog_windows.go | 4 ++-- .../wineventlog/wineventlog_windows_test.go | 3 ++- 23 files changed, 55 insertions(+), 50 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 4909d3e60c0..271e3a57d34 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -254,7 +254,6 @@ linters: # - containedctx # containedctx is a linter that detects struct contained context.Context field - - contextcheck # check whether the function uses a non-inherited context - errname # Checks that sentinel errors are prefixed with the `Err` and error types are suffixed with the `Error`. - ireturn # Accept Interfaces, Return Concrete Types - mnd # An analyzer to detect magic numbers. diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 4519ea7392b..1ad385105d3 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -47,7 +47,7 @@ type DataSource interface { ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource GetMode() string // Get the mode (TAIL, CAT or SERVER) GetName() string // Get the name of the module - OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) + OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense) GetUuid() string // Get the unique identifier of the datasource @@ -433,7 +433,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ if subsrc.GetMode() == configuration.TAIL_MODE { err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb) } else { - err = subsrc.OneShotAcquisition(outChan, AcquisTomb) + err = subsrc.OneShotAcquisition(ctx, outChan, AcquisTomb) } if err != nil { diff --git a/pkg/acquisition/acquisition_test.go b/pkg/acquisition/acquisition_test.go index e82b3df54c2..95b3033ba2e 100644 --- a/pkg/acquisition/acquisition_test.go +++ b/pkg/acquisition/acquisition_test.go @@ -58,7 +58,7 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int) return nil } func (f *MockSource) GetMode() string { return f.Mode } -func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil } +func (f *MockSource) OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil } func (f *MockSource) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil } @@ -320,7 +320,7 @@ func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) err func (f *MockCat) UnmarshalConfig(cfg []byte) error { return nil } func (f *MockCat) GetName() string { return "mock_cat" } func (f *MockCat) GetMode() string { return "cat" } -func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error { +func (f *MockCat) OneShotAcquisition(ctx context.Context, out chan types.Event, tomb *tomb.Tomb) error { for range 10 { evt := types.Event{} evt.Line.Src = "test" @@ -365,7 +365,7 @@ func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) er func (f *MockTail) UnmarshalConfig(cfg []byte) error { return nil } func (f *MockTail) GetName() string { return "mock_tail" } func (f *MockTail) GetMode() string { return "tail" } -func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error { +func (f *MockTail) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return errors.New("can't run in cat mode") } @@ -508,7 +508,7 @@ func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel return nil } func (f *MockSourceByDSN) GetMode() string { return f.Mode } -func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil } +func (f *MockSourceByDSN) OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil } func (f *MockSourceByDSN) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil } diff --git a/pkg/acquisition/modules/appsec/appsec.go b/pkg/acquisition/modules/appsec/appsec.go index 4ab980ee860..a6dcffe89a2 100644 --- a/pkg/acquisition/modules/appsec/appsec.go +++ b/pkg/acquisition/modules/appsec/appsec.go @@ -237,7 +237,7 @@ func (w *AppsecSource) GetName() string { return "appsec" } -func (w *AppsecSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (w *AppsecSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return errors.New("AppSec datasource does not support command line acquisition") } diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index e4b6c95d77f..2df70b3312b 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -259,10 +259,10 @@ func (cw *CloudwatchSource) StreamingAcquisition(ctx context.Context, out chan t monitChan := make(chan LogStreamTailConfig) t.Go(func() error { - return cw.LogStreamManager(monitChan, out) + return cw.LogStreamManager(ctx, monitChan, out) }) - return cw.WatchLogGroupForStreams(monitChan) + return cw.WatchLogGroupForStreams(ctx, monitChan) } func (cw *CloudwatchSource) GetMetrics() []prometheus.Collector { @@ -289,7 +289,7 @@ func (cw *CloudwatchSource) Dump() interface{} { return cw } -func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig) error { +func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out chan LogStreamTailConfig) error { cw.logger.Debugf("Starting to watch group (interval:%s)", cw.Config.PollNewStreamInterval) ticker := time.NewTicker(*cw.Config.PollNewStreamInterval) @@ -307,7 +307,6 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig for hasMoreStreams { cw.logger.Tracef("doing the call to DescribeLogStreamsPagesWithContext") - ctx := context.Background() // there can be a lot of streams in a group, and we're only interested in those recently written to, so we sort by LastEventTime err := cw.cwClient.DescribeLogStreamsPagesWithContext( ctx, @@ -372,7 +371,7 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig } // LogStreamManager receives the potential streams to monitor, and starts a go routine when needed -func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outChan chan types.Event) error { +func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStreamTailConfig, outChan chan types.Event) error { cw.logger.Debugf("starting to monitor streams for %s", cw.Config.GroupName) pollDeadStreamInterval := time.NewTicker(def_PollDeadStreamInterval) @@ -422,7 +421,7 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha newStream.logger = cw.logger.WithField("stream", newStream.StreamName) cw.logger.Debugf("starting tail of stream %s", newStream.StreamName) newStream.t.Go(func() error { - return cw.TailLogStream(&newStream, outChan) + return cw.TailLogStream(ctx, &newStream, outChan) }) cw.monitoredStreams = append(cw.monitoredStreams, &newStream) } @@ -457,7 +456,7 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha } } -func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error { +func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error { var startFrom *string lastReadMessage := time.Now().UTC() ticker := time.NewTicker(cfg.PollStreamInterval) @@ -479,7 +478,6 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan for hasMorePages { /*for the first call, we only consume the last item*/ cfg.logger.Tracef("calling GetLogEventsPagesWithContext") - ctx := context.Background() err := cw.cwClient.GetLogEventsPagesWithContext(ctx, &cloudwatchlogs.GetLogEventsInput{ Limit: aws.Int64(cfg.GetLogEventsPagesLimit), @@ -633,7 +631,7 @@ func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, return nil } -func (cw *CloudwatchSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (cw *CloudwatchSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { // StreamName string, Start time.Time, End time.Time config := LogStreamTailConfig{ GroupName: cw.Config.GroupName, @@ -648,10 +646,10 @@ func (cw *CloudwatchSource) OneShotAcquisition(out chan types.Event, t *tomb.Tom Labels: cw.Config.Labels, ExpectMode: types.TIMEMACHINE, } - return cw.CatLogStream(&config, out) + return cw.CatLogStream(ctx, &config, out) } -func (cw *CloudwatchSource) CatLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error { +func (cw *CloudwatchSource) CatLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error { var startFrom *string head := true /*convert the times*/ @@ -667,7 +665,6 @@ func (cw *CloudwatchSource) CatLogStream(cfg *LogStreamTailConfig, outChan chan if startFrom != nil { cfg.logger.Tracef("next_token: %s", *startFrom) } - ctx := context.Background() err := cw.cwClient.GetLogEventsPagesWithContext(ctx, &cloudwatchlogs.GetLogEventsInput{ Limit: aws.Int64(10), diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index d62c3f6e3dd..3d638896537 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -579,7 +579,7 @@ stream_name: test_stream`), case "tail": err = cw.StreamingAcquisition(ctx, out, &tmb) case "cat": - err = cw.OneShotAcquisition(out, &tmb) + err = cw.OneShotAcquisition(ctx, out, &tmb) } cstest.RequireErrorContains(t, err, tc.expectedStartErr) @@ -637,6 +637,8 @@ func TestConfigureByDSN(t *testing.T) { } func TestOneShotAcquisition(t *testing.T) { + ctx := context.Background() + if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } @@ -768,7 +770,7 @@ func TestOneShotAcquisition(t *testing.T) { var rcvdEvts []types.Event dbgLogger.Infof("running StreamingAcquisition") - err = cw.OneShotAcquisition(out, &tmb) + err = cw.OneShotAcquisition(ctx, out, &tmb) dbgLogger.Infof("acquis done") cstest.RequireErrorContains(t, err, tc.expectedStartErr) close(out) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 57ec7c7abda..2f79d4dcee6 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -286,8 +286,7 @@ func (d *DockerSource) SupportedModes() []string { } // OneShotAcquisition reads a set of file and returns when done -func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { - ctx := context.TODO() +func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { d.logger.Debug("In oneshot") runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{}) if err != nil { diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index e394c9cbe79..5d8208637e8 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -267,6 +267,8 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke } func TestOneShot(t *testing.T) { + ctx := context.Background() + log.Infof("Test 'TestOneShot'") tests := []struct { @@ -321,7 +323,7 @@ func TestOneShot(t *testing.T) { dockerClient.Client = new(mockDockerCli) out := make(chan types.Event, 100) tomb := tomb.Tomb{} - err := dockerClient.OneShotAcquisition(out, &tomb) + err := dockerClient.OneShotAcquisition(ctx, out, &tomb) cstest.AssertErrorContains(t, err, ts.expectedErr) // else we do the check before actualLines is incremented ... diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 2d2df3ff4d4..f752d04aada 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -280,7 +280,7 @@ func (f *FileSource) SupportedModes() []string { } // OneShotAcquisition reads a set of file and returns when done -func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (f *FileSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { f.logger.Debug("In oneshot") for _, file := range f.files { diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index 3db0042ba2f..a26e44cc9c7 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -101,6 +101,8 @@ func TestConfigureDSN(t *testing.T) { } func TestOneShot(t *testing.T) { + ctx := context.Background() + permDeniedFile := "/etc/shadow" permDeniedError := "failed opening /etc/shadow: open /etc/shadow: permission denied" @@ -224,7 +226,7 @@ filename: test_files/test_delete.log`, if tc.afterConfigure != nil { tc.afterConfigure() } - err = f.OneShotAcquisition(out, &tomb) + err = f.OneShotAcquisition(ctx, out, &tomb) actualLines := len(out) cstest.RequireErrorContains(t, err, tc.expectedErr) diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index b9cda54a472..e7a35d5a3ba 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -65,8 +65,8 @@ func readLine(scanner *bufio.Scanner, out chan string, errChan chan error) error return nil } -func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) error { - ctx, cancel := context.WithCancel(context.Background()) +func (j *JournalCtlSource) runJournalCtl(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { + ctx, cancel := context.WithCancel(ctx) cmd := exec.CommandContext(ctx, journalctlCmd, j.args...) stdout, err := cmd.StdoutPipe() @@ -262,9 +262,9 @@ func (j *JournalCtlSource) GetName() string { return "journalctl" } -func (j *JournalCtlSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (j *JournalCtlSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { defer trace.CatchPanic("crowdsec/acquis/journalctl/oneshot") - err := j.runJournalCtl(out, t) + err := j.runJournalCtl(ctx, out, t) j.logger.Debug("Oneshot journalctl acquisition is done") return err } @@ -272,7 +272,7 @@ func (j *JournalCtlSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb func (j *JournalCtlSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/journalctl/streaming") - return j.runJournalCtl(out, t) + return j.runJournalCtl(ctx, out, t) }) return nil } diff --git a/pkg/acquisition/modules/journalctl/journalctl_test.go b/pkg/acquisition/modules/journalctl/journalctl_test.go index c416bb5d23e..687067c1881 100644 --- a/pkg/acquisition/modules/journalctl/journalctl_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_test.go @@ -107,6 +107,8 @@ func TestConfigureDSN(t *testing.T) { } func TestOneShot(t *testing.T) { + ctx := context.Background() + if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } @@ -165,7 +167,7 @@ journalctl_filter: t.Fatalf("Unexpected error : %s", err) } - err = j.OneShotAcquisition(out, &tomb) + err = j.OneShotAcquisition(ctx, out, &tomb) cstest.AssertErrorContains(t, err, ts.expectedErr) if err != nil { diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index d08a0ae4e4d..a9a5e13e958 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -127,7 +127,7 @@ func (k *KafkaSource) GetName() string { return dataSourceName } -func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (k *KafkaSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName) } diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index ca3a847dbfb..3cfc224aa25 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -182,7 +182,7 @@ func (k *KinesisSource) GetName() string { return "kinesis" } -func (k *KinesisSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (k *KinesisSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return errors.New("kinesis datasource does not support one-shot acquisition") } diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index 0d64345a4a0..30fc5c467ea 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -131,7 +131,7 @@ func (ka *KubernetesAuditSource) GetName() string { return "k8s-audit" } -func (ka *KubernetesAuditSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (ka *KubernetesAuditSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return errors.New("k8s-audit datasource does not support one-shot acquisition") } diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index f867feeb84b..e39c76af22c 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -261,17 +261,17 @@ func (l *LokiSource) GetName() string { } // OneShotAcquisition reads a set of file and returns when done -func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (l *LokiSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { l.logger.Debug("Loki one shot acquisition") l.Client.SetTomb(t) - readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) + readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady) defer cancel() err := l.Client.Ready(readyCtx) if err != nil { return fmt.Errorf("loki is not ready: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c := l.Client.QueryRange(ctx, false) for { diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 627200217f5..cacdda32d80 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -312,6 +312,8 @@ func feedLoki(ctx context.Context, logger *log.Entry, n int, title string) error } func TestOneShotAcquisition(t *testing.T) { + ctx := context.Background() + if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } @@ -346,8 +348,6 @@ since: 1h t.Fatalf("Unexpected error : %s", err) } - ctx := context.Background() - err = feedLoki(ctx, subLogger, 20, title) if err != nil { t.Fatalf("Unexpected error : %s", err) @@ -366,7 +366,7 @@ since: 1h lokiTomb := tomb.Tomb{} - err = lokiSource.OneShotAcquisition(out, &lokiTomb) + err = lokiSource.OneShotAcquisition(ctx, out, &lokiTomb) if err != nil { t.Fatalf("Unexpected error : %s", err) } diff --git a/pkg/acquisition/modules/s3/s3.go b/pkg/acquisition/modules/s3/s3.go index ed1964edebf..acd78ceba8f 100644 --- a/pkg/acquisition/modules/s3/s3.go +++ b/pkg/acquisition/modules/s3/s3.go @@ -643,10 +643,10 @@ func (s *S3Source) GetName() string { return "s3" } -func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (s *S3Source) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { s.logger.Infof("starting acquisition of %s/%s/%s", s.Config.BucketName, s.Config.Prefix, s.Config.Key) s.out = out - s.ctx, s.cancel = context.WithCancel(context.Background()) + s.ctx, s.cancel = context.WithCancel(ctx) s.Config.UseTimeMachine = true s.t = t if s.Config.Key != "" { diff --git a/pkg/acquisition/modules/s3/s3_test.go b/pkg/acquisition/modules/s3/s3_test.go index 05a974517a0..367048aa33a 100644 --- a/pkg/acquisition/modules/s3/s3_test.go +++ b/pkg/acquisition/modules/s3/s3_test.go @@ -208,6 +208,7 @@ func (msqs mockSQSClientNotif) DeleteMessage(input *sqs.DeleteMessageInput) (*sq } func TestDSNAcquis(t *testing.T) { + ctx := context.Background() tests := []struct { name string dsn string @@ -260,7 +261,7 @@ func TestDSNAcquis(t *testing.T) { f.s3Client = mockS3Client{} tmb := tomb.Tomb{} - err = f.OneShotAcquisition(out, &tmb) + err = f.OneShotAcquisition(ctx, out, &tmb) if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index 5315096fb9b..33a2f1542db 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -84,7 +84,7 @@ func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logg return errors.New("syslog datasource does not support one shot acquisition") } -func (s *SyslogSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (s *SyslogSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return errors.New("syslog datasource does not support one shot acquisition") } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog.go b/pkg/acquisition/modules/wineventlog/wineventlog.go index 6d522d8d8cb..3023a371576 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog.go @@ -40,7 +40,7 @@ func (w *WinEventLogSource) SupportedModes() []string { return []string{configuration.TAIL_MODE, configuration.CAT_MODE} } -func (w *WinEventLogSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { +func (w *WinEventLogSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error { return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index ca40363155b..7be9932b21b 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -388,8 +388,7 @@ func (w *WinEventLogSource) SupportedModes() []string { return []string{configuration.TAIL_MODE, configuration.CAT_MODE} } -func (w *WinEventLogSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { - +func (w *WinEventLogSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { handle, err := wevtapi.EvtQuery(localMachine, w.evtConfig.ChannelPath, w.evtConfig.Query, w.evtConfig.Flags) if err != nil { @@ -436,6 +435,7 @@ OUTER_LOOP: } } } + return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go index 9afef963669..8d3163fe0cd 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go @@ -224,6 +224,7 @@ event_ids: } func TestOneShotAcquisition(t *testing.T) { + ctx := context.Background() tests := []struct { name string dsn string @@ -289,7 +290,7 @@ func TestOneShotAcquisition(t *testing.T) { } }() - err = f.OneShotAcquisition(c, to) + err = f.OneShotAcquisition(ctx, c, to) if test.expectedErr != "" { assert.Contains(t, err.Error(), test.expectedErr) } else {