Skip to content

Commit

Permalink
context propagation: OneShotAcquisition(); enable contextcheck linter
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Oct 25, 2024
1 parent 9d6ccb0 commit 208762a
Show file tree
Hide file tree
Showing 23 changed files with 55 additions and 50 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ linters:
#

- containedctx # containedctx is a linter that detects struct contained context.Context field
- contextcheck # check whether the function uses a non-inherited context
- errname # Checks that sentinel errors are prefixed with the `Err` and error types are suffixed with the `Error`.
- ireturn # Accept Interfaces, Return Concrete Types
- mnd # An analyzer to detect magic numbers.
Expand Down
4 changes: 2 additions & 2 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type DataSource interface {
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
Expand Down Expand Up @@ -433,7 +433,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ
if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(outChan, AcquisTomb)
err = subsrc.OneShotAcquisition(ctx, outChan, AcquisTomb)
}

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int)
return nil
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) err
func (f *MockCat) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockCat) GetName() string { return "mock_cat" }
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
func (f *MockCat) OneShotAcquisition(ctx context.Context, out chan types.Event, tomb *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand Down Expand Up @@ -365,7 +365,7 @@ func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) er
func (f *MockTail) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockTail) GetName() string { return "mock_tail" }
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
func (f *MockTail) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error {
return errors.New("can't run in cat mode")
}

Expand Down Expand Up @@ -508,7 +508,7 @@ func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel
return nil
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) OneShotAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (w *AppsecSource) GetName() string {
return "appsec"
}

func (w *AppsecSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (w *AppsecSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error {

Check warning on line 240 in pkg/acquisition/modules/appsec/appsec.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/appsec/appsec.go#L240

Added line #L240 was not covered by tests
return errors.New("AppSec datasource does not support command line acquisition")
}

Expand Down
21 changes: 9 additions & 12 deletions pkg/acquisition/modules/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ func (cw *CloudwatchSource) StreamingAcquisition(ctx context.Context, out chan t
monitChan := make(chan LogStreamTailConfig)

t.Go(func() error {
return cw.LogStreamManager(monitChan, out)
return cw.LogStreamManager(ctx, monitChan, out)
})

return cw.WatchLogGroupForStreams(monitChan)
return cw.WatchLogGroupForStreams(ctx, monitChan)
}

func (cw *CloudwatchSource) GetMetrics() []prometheus.Collector {
Expand All @@ -289,7 +289,7 @@ func (cw *CloudwatchSource) Dump() interface{} {
return cw
}

func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig) error {
func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out chan LogStreamTailConfig) error {
cw.logger.Debugf("Starting to watch group (interval:%s)", cw.Config.PollNewStreamInterval)
ticker := time.NewTicker(*cw.Config.PollNewStreamInterval)

Expand All @@ -307,7 +307,6 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig
for hasMoreStreams {
cw.logger.Tracef("doing the call to DescribeLogStreamsPagesWithContext")

ctx := context.Background()
// there can be a lot of streams in a group, and we're only interested in those recently written to, so we sort by LastEventTime
err := cw.cwClient.DescribeLogStreamsPagesWithContext(
ctx,
Expand Down Expand Up @@ -372,7 +371,7 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig
}

// LogStreamManager receives the potential streams to monitor, and starts a go routine when needed
func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outChan chan types.Event) error {
func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStreamTailConfig, outChan chan types.Event) error {
cw.logger.Debugf("starting to monitor streams for %s", cw.Config.GroupName)
pollDeadStreamInterval := time.NewTicker(def_PollDeadStreamInterval)

Expand Down Expand Up @@ -422,7 +421,7 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
newStream.logger = cw.logger.WithField("stream", newStream.StreamName)
cw.logger.Debugf("starting tail of stream %s", newStream.StreamName)
newStream.t.Go(func() error {
return cw.TailLogStream(&newStream, outChan)
return cw.TailLogStream(ctx, &newStream, outChan)
})
cw.monitoredStreams = append(cw.monitoredStreams, &newStream)
}
Expand Down Expand Up @@ -457,7 +456,7 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
}
}

func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error {
func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error {
var startFrom *string
lastReadMessage := time.Now().UTC()
ticker := time.NewTicker(cfg.PollStreamInterval)
Expand All @@ -479,7 +478,6 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
for hasMorePages {
/*for the first call, we only consume the last item*/
cfg.logger.Tracef("calling GetLogEventsPagesWithContext")
ctx := context.Background()
err := cw.cwClient.GetLogEventsPagesWithContext(ctx,
&cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(cfg.GetLogEventsPagesLimit),
Expand Down Expand Up @@ -633,7 +631,7 @@ func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string,
return nil
}

func (cw *CloudwatchSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (cw *CloudwatchSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
// StreamName string, Start time.Time, End time.Time
config := LogStreamTailConfig{
GroupName: cw.Config.GroupName,
Expand All @@ -648,10 +646,10 @@ func (cw *CloudwatchSource) OneShotAcquisition(out chan types.Event, t *tomb.Tom
Labels: cw.Config.Labels,
ExpectMode: types.TIMEMACHINE,
}
return cw.CatLogStream(&config, out)
return cw.CatLogStream(ctx, &config, out)
}

func (cw *CloudwatchSource) CatLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error {
func (cw *CloudwatchSource) CatLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error {
var startFrom *string
head := true
/*convert the times*/
Expand All @@ -667,7 +665,6 @@ func (cw *CloudwatchSource) CatLogStream(cfg *LogStreamTailConfig, outChan chan
if startFrom != nil {
cfg.logger.Tracef("next_token: %s", *startFrom)
}
ctx := context.Background()
err := cw.cwClient.GetLogEventsPagesWithContext(ctx,
&cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(10),
Expand Down
6 changes: 4 additions & 2 deletions pkg/acquisition/modules/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ stream_name: test_stream`),
case "tail":
err = cw.StreamingAcquisition(ctx, out, &tmb)
case "cat":
err = cw.OneShotAcquisition(out, &tmb)
err = cw.OneShotAcquisition(ctx, out, &tmb)
}

cstest.RequireErrorContains(t, err, tc.expectedStartErr)
Expand Down Expand Up @@ -637,6 +637,8 @@ func TestConfigureByDSN(t *testing.T) {
}

func TestOneShotAcquisition(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -768,7 +770,7 @@ func TestOneShotAcquisition(t *testing.T) {
var rcvdEvts []types.Event

dbgLogger.Infof("running StreamingAcquisition")
err = cw.OneShotAcquisition(out, &tmb)
err = cw.OneShotAcquisition(ctx, out, &tmb)
dbgLogger.Infof("acquis done")
cstest.RequireErrorContains(t, err, tc.expectedStartErr)
close(out)
Expand Down
3 changes: 1 addition & 2 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ func (d *DockerSource) SupportedModes() []string {
}

// OneShotAcquisition reads a set of file and returns when done
func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
ctx := context.TODO()
func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
d.logger.Debug("In oneshot")
runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/acquisition/modules/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke
}

func TestOneShot(t *testing.T) {
ctx := context.Background()

log.Infof("Test 'TestOneShot'")

tests := []struct {
Expand Down Expand Up @@ -321,7 +323,7 @@ func TestOneShot(t *testing.T) {
dockerClient.Client = new(mockDockerCli)
out := make(chan types.Event, 100)
tomb := tomb.Tomb{}
err := dockerClient.OneShotAcquisition(out, &tomb)
err := dockerClient.OneShotAcquisition(ctx, out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)

// else we do the check before actualLines is incremented ...
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (f *FileSource) SupportedModes() []string {
}

// OneShotAcquisition reads a set of file and returns when done
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *FileSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("In oneshot")

for _, file := range f.files {
Expand Down
4 changes: 3 additions & 1 deletion pkg/acquisition/modules/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func TestConfigureDSN(t *testing.T) {
}

func TestOneShot(t *testing.T) {
ctx := context.Background()

permDeniedFile := "/etc/shadow"
permDeniedError := "failed opening /etc/shadow: open /etc/shadow: permission denied"

Expand Down Expand Up @@ -224,7 +226,7 @@ filename: test_files/test_delete.log`,
if tc.afterConfigure != nil {
tc.afterConfigure()
}
err = f.OneShotAcquisition(out, &tomb)
err = f.OneShotAcquisition(ctx, out, &tomb)
actualLines := len(out)
cstest.RequireErrorContains(t, err, tc.expectedErr)

Expand Down
10 changes: 5 additions & 5 deletions pkg/acquisition/modules/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func readLine(scanner *bufio.Scanner, out chan string, errChan chan error) error
return nil
}

func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) error {
ctx, cancel := context.WithCancel(context.Background())
func (j *JournalCtlSource) runJournalCtl(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
ctx, cancel := context.WithCancel(ctx)

cmd := exec.CommandContext(ctx, journalctlCmd, j.args...)
stdout, err := cmd.StdoutPipe()
Expand Down Expand Up @@ -262,17 +262,17 @@ func (j *JournalCtlSource) GetName() string {
return "journalctl"
}

func (j *JournalCtlSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (j *JournalCtlSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
defer trace.CatchPanic("crowdsec/acquis/journalctl/oneshot")
err := j.runJournalCtl(out, t)
err := j.runJournalCtl(ctx, out, t)
j.logger.Debug("Oneshot journalctl acquisition is done")
return err
}

func (j *JournalCtlSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/journalctl/streaming")
return j.runJournalCtl(out, t)
return j.runJournalCtl(ctx, out, t)
})
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/acquisition/modules/journalctl/journalctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func TestConfigureDSN(t *testing.T) {
}

func TestOneShot(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -165,7 +167,7 @@ journalctl_filter:
t.Fatalf("Unexpected error : %s", err)
}

err = j.OneShotAcquisition(out, &tomb)
err = j.OneShotAcquisition(ctx, out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (k *KafkaSource) GetName() string {
return dataSourceName
}

func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (k *KafkaSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error {

Check warning on line 130 in pkg/acquisition/modules/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kafka/kafka.go#L130

Added line #L130 was not covered by tests
return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (k *KinesisSource) GetName() string {
return "kinesis"
}

func (k *KinesisSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (k *KinesisSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error {

Check warning on line 185 in pkg/acquisition/modules/kinesis/kinesis.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kinesis/kinesis.go#L185

Added line #L185 was not covered by tests
return errors.New("kinesis datasource does not support one-shot acquisition")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kubernetesaudit/k8s_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (ka *KubernetesAuditSource) GetName() string {
return "k8s-audit"
}

func (ka *KubernetesAuditSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (ka *KubernetesAuditSource) OneShotAcquisition(_ context.Context, _ chan types.Event, _ *tomb.Tomb) error {

Check warning on line 134 in pkg/acquisition/modules/kubernetesaudit/k8s_audit.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kubernetesaudit/k8s_audit.go#L134

Added line #L134 was not covered by tests
return errors.New("k8s-audit datasource does not support one-shot acquisition")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/acquisition/modules/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,17 @@ func (l *LokiSource) GetName() string {
}

// OneShotAcquisition reads a set of file and returns when done
func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (l *LokiSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
l.logger.Debug("Loki one shot acquisition")
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel = context.WithCancel(ctx)
c := l.Client.QueryRange(ctx, false)

for {
Expand Down
6 changes: 3 additions & 3 deletions pkg/acquisition/modules/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func feedLoki(ctx context.Context, logger *log.Entry, n int, title string) error
}

func TestOneShotAcquisition(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -346,8 +348,6 @@ since: 1h
t.Fatalf("Unexpected error : %s", err)
}

ctx := context.Background()

err = feedLoki(ctx, subLogger, 20, title)
if err != nil {
t.Fatalf("Unexpected error : %s", err)
Expand All @@ -366,7 +366,7 @@ since: 1h

lokiTomb := tomb.Tomb{}

err = lokiSource.OneShotAcquisition(out, &lokiTomb)
err = lokiSource.OneShotAcquisition(ctx, out, &lokiTomb)
if err != nil {
t.Fatalf("Unexpected error : %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/acquisition/modules/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,10 @@ func (s *S3Source) GetName() string {
return "s3"
}

func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (s *S3Source) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
s.logger.Infof("starting acquisition of %s/%s/%s", s.Config.BucketName, s.Config.Prefix, s.Config.Key)
s.out = out
s.ctx, s.cancel = context.WithCancel(context.Background())
s.ctx, s.cancel = context.WithCancel(ctx)
s.Config.UseTimeMachine = true
s.t = t
if s.Config.Key != "" {
Expand Down
Loading

0 comments on commit 208762a

Please sign in to comment.