Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context propagation: appsec, docker, kafka, k8s datasources #3284

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# we measure coverage but don't enforce it
# https://docs.codecov.com/docs/codecov-yaml
codecov:
require_ci_to_pass: false

coverage:
status:
patch:
default:
target: 0%
project:
default:
target: 0%

# if a directory is ignored, there is no way to un-ignore files like pkg/models/helpers.go
# so we make a full list
ignore:
- "./pkg/modelscapi/success_response.go"
- "./pkg/modelscapi/get_decisions_stream_response_deleted.go"
- "./pkg/modelscapi/login_request.go"
- "./pkg/modelscapi/get_decisions_stream_response_links.go"
- "./pkg/modelscapi/login_response.go"
- "./pkg/modelscapi/add_signals_request_item.go"
- "./pkg/modelscapi/blocklist_link.go"
- "./pkg/modelscapi/get_decisions_stream_response_deleted_item.go"
- "./pkg/modelscapi/decisions_sync_request.go"
- "./pkg/modelscapi/get_decisions_stream_response.go"
- "./pkg/modelscapi/metrics_request_machines_item.go"
- "./pkg/modelscapi/metrics_request.go"
- "./pkg/modelscapi/get_decisions_stream_response_new.go"
- "./pkg/modelscapi/add_signals_request_item_decisions_item.go"
- "./pkg/modelscapi/metrics_request_bouncers_item.go"
- "./pkg/modelscapi/decisions_sync_request_item_decisions_item.go"
- "./pkg/modelscapi/decisions_delete_request_item.go"
- "./pkg/modelscapi/get_decisions_stream_response_new_item.go"
- "./pkg/modelscapi/decisions_sync_request_item.go"
- "./pkg/modelscapi/add_signals_request.go"
- "./pkg/modelscapi/reset_password_request.go"
- "./pkg/modelscapi/add_signals_request_item_decisions.go"
- "./pkg/modelscapi/decisions_sync_request_item_source.go"
- "./pkg/modelscapi/error_response.go"
- "./pkg/modelscapi/decisions_delete_request.go"
- "./pkg/modelscapi/decisions_sync_request_item_decisions.go"
- "./pkg/modelscapi/enroll_request.go"
- "./pkg/modelscapi/register_request.go"
- "./pkg/modelscapi/add_signals_request_item_source.go"
- "./pkg/models/success_response.go"
- "./pkg/models/hub_items.go"
- "./pkg/models/alert.go"
- "./pkg/models/metrics_bouncer_info.go"
- "./pkg/models/add_signals_request_item.go"
- "./pkg/models/metrics_meta.go"
- "./pkg/models/metrics_detail_item.go"
- "./pkg/models/add_signals_request_item_decisions_item.go"
- "./pkg/models/hub_item.go"
- "./pkg/models/get_alerts_response.go"
- "./pkg/models/metrics_labels.go"
- "./pkg/models/watcher_auth_request.go"
- "./pkg/models/add_alerts_request.go"
- "./pkg/models/event.go"
- "./pkg/models/decisions_delete_request_item.go"
- "./pkg/models/meta.go"
- "./pkg/models/detailed_metrics.go"
- "./pkg/models/delete_alerts_response.go"
- "./pkg/models/remediation_components_metrics.go"
- "./pkg/models/console_options.go"
- "./pkg/models/topx_response.go"
- "./pkg/models/add_signals_request.go"
- "./pkg/models/delete_decision_response.go"
- "./pkg/models/get_decisions_response.go"
- "./pkg/models/add_signals_request_item_decisions.go"
- "./pkg/models/source.go"
- "./pkg/models/decisions_stream_response.go"
- "./pkg/models/error_response.go"
- "./pkg/models/all_metrics.go"
- "./pkg/models/o_sversion.go"
- "./pkg/models/decision.go"
- "./pkg/models/decisions_delete_request.go"
- "./pkg/models/flush_decision_response.go"
- "./pkg/models/watcher_auth_response.go"
- "./pkg/models/lapi_metrics.go"
- "./pkg/models/watcher_registration_request.go"
- "./pkg/models/metrics_agent_info.go"
- "./pkg/models/log_processors_metrics.go"
- "./pkg/models/add_signals_request_item_source.go"
- "./pkg/models/base_metrics.go"
- "./pkg/models/add_alerts_response.go"
- "./pkg/models/metrics.go"
- "./pkg/protobufs/notifier.pb.go"
- "./pkg/protobufs/notifier_grpc.pb.go"
- "./pkg/database/ent/metric_update.go"
- "./pkg/database/ent/machine_delete.go"
- "./pkg/database/ent/decision_query.go"
- "./pkg/database/ent/meta_query.go"
- "./pkg/database/ent/metric/where.go"
- "./pkg/database/ent/metric/metric.go"
- "./pkg/database/ent/machine_create.go"
- "./pkg/database/ent/alert.go"
- "./pkg/database/ent/event_update.go"
- "./pkg/database/ent/alert_create.go"
- "./pkg/database/ent/alert_query.go"
- "./pkg/database/ent/metric_delete.go"
- "./pkg/database/ent/lock_create.go"
- "./pkg/database/ent/bouncer_update.go"
- "./pkg/database/ent/meta_update.go"
- "./pkg/database/ent/decision_create.go"
- "./pkg/database/ent/configitem_update.go"
- "./pkg/database/ent/machine_query.go"
- "./pkg/database/ent/client.go"
- "./pkg/database/ent/predicate/predicate.go"
- "./pkg/database/ent/lock/where.go"
- "./pkg/database/ent/lock/lock.go"
- "./pkg/database/ent/mutation.go"
- "./pkg/database/ent/migrate/migrate.go"
- "./pkg/database/ent/migrate/schema.go"
- "./pkg/database/ent/configitem.go"
- "./pkg/database/ent/metric_query.go"
- "./pkg/database/ent/event.go"
- "./pkg/database/ent/event_query.go"
- "./pkg/database/ent/lock_update.go"
- "./pkg/database/ent/meta.go"
- "./pkg/database/ent/configitem_query.go"
- "./pkg/database/ent/bouncer.go"
- "./pkg/database/ent/alert_update.go"
- "./pkg/database/ent/meta/meta.go"
- "./pkg/database/ent/meta/where.go"
- "./pkg/database/ent/decision_update.go"
- "./pkg/database/ent/alert_delete.go"
- "./pkg/database/ent/lock.go"
- "./pkg/database/ent/runtime/runtime.go"
- "./pkg/database/ent/alert/alert.go"
- "./pkg/database/ent/alert/where.go"
- "./pkg/database/ent/runtime.go"
- "./pkg/database/ent/bouncer/bouncer.go"
- "./pkg/database/ent/bouncer/where.go"
- "./pkg/database/ent/hook/hook.go"
- "./pkg/database/ent/metric.go"
- "./pkg/database/ent/configitem_create.go"
- "./pkg/database/ent/configitem_delete.go"
- "./pkg/database/ent/tx.go"
- "./pkg/database/ent/decision.go"
- "./pkg/database/ent/lock_delete.go"
- "./pkg/database/ent/decision_delete.go"
- "./pkg/database/ent/machine/where.go"
- "./pkg/database/ent/machine/machine.go"
- "./pkg/database/ent/event_create.go"
- "./pkg/database/ent/metric_create.go"
- "./pkg/database/ent/decision/where.go"
- "./pkg/database/ent/decision/decision.go"
- "./pkg/database/ent/enttest/enttest.go"
- "./pkg/database/ent/lock_query.go"
- "./pkg/database/ent/bouncer_create.go"
- "./pkg/database/ent/event_delete.go"
- "./pkg/database/ent/bouncer_delete.go"
- "./pkg/database/ent/event/event.go"
- "./pkg/database/ent/event/where.go"
- "./pkg/database/ent/machine.go"
- "./pkg/database/ent/ent.go"
- "./pkg/database/ent/meta_create.go"
- "./pkg/database/ent/bouncer_query.go"
- "./pkg/database/ent/meta_delete.go"
- "./pkg/database/ent/machine_update.go"
- "./pkg/database/ent/configitem/configitem.go"
- "./pkg/database/ent/configitem/where.go"
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,3 @@ msi
__pycache__
*.py[cod]
*.egg-info

# automatically generated before running codecov
.github/codecov.yml
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
w.logger.Info("Shutting down Appsec server")
// xx let's clean up the appsec runners :)
appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails)
w.server.Shutdown(context.TODO())
w.server.Shutdown(ctx)

Check warning on line 297 in pkg/acquisition/modules/appsec/appsec.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/appsec/appsec.go#L297

Added line #L297 was not covered by tests
return nil
})
return nil
Expand Down
47 changes: 24 additions & 23 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@

// OneShotAcquisition reads a set of file and returns when done
func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
ctx := context.TODO()
d.logger.Debug("In oneshot")
runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{})
if err != nil {
return err
}
Expand All @@ -298,10 +299,10 @@
d.logger.Debugf("container with id %s is already being read from", container.ID)
continue
}
if containerConfig := d.EvalContainer(container); containerConfig != nil {
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
d.logger.Infof("reading logs from container %s", containerConfig.Name)
d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(ctx, containerConfig.ID, *d.containerLogsOptions)
if err != nil {
d.logger.Errorf("unable to read logs from container: %+v", err)
return err
Expand Down Expand Up @@ -372,26 +373,26 @@
return nil
}

func (d *DockerSource) getContainerTTY(containerId string) bool {
containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
func (d *DockerSource) getContainerTTY(ctx context.Context, containerId string) bool {
containerDetails, err := d.Client.ContainerInspect(ctx, containerId)
if err != nil {
return false
}
return containerDetails.Config.Tty
}

func (d *DockerSource) getContainerLabels(containerId string) map[string]interface{} {
containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
func (d *DockerSource) getContainerLabels(ctx context.Context, containerId string) map[string]interface{} {
containerDetails, err := d.Client.ContainerInspect(ctx, containerId)

Check warning on line 385 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L384-L385

Added lines #L384 - L385 were not covered by tests
if err != nil {
return map[string]interface{}{}
}
return parseLabels(containerDetails.Config.Labels)
}

func (d *DockerSource) EvalContainer(container dockerTypes.Container) *ContainerConfig {
func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.Container) *ContainerConfig {
for _, containerID := range d.Config.ContainerID {
if containerID == container.ID {
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}

Check warning on line 395 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L395

Added line #L395 was not covered by tests
}
}

Expand All @@ -401,27 +402,27 @@
name = name[1:]
}
if name == containerName {
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}
}

for _, cont := range d.compiledContainerID {
if matched := cont.MatchString(container.ID); matched {
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}

Check warning on line 412 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L412

Added line #L412 was not covered by tests
}
}

for _, cont := range d.compiledContainerName {
for _, name := range container.Names {
if matched := cont.MatchString(name); matched {
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}
}

if d.Config.UseContainerLabels {
parsedLabels := d.getContainerLabels(container.ID)
parsedLabels := d.getContainerLabels(ctx, container.ID)

Check warning on line 425 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L425

Added line #L425 was not covered by tests
if len(parsedLabels) == 0 {
d.logger.Tracef("container has no 'crowdsec' labels set, ignoring container: %s", container.ID)
return nil
Expand Down Expand Up @@ -458,13 +459,13 @@
}
d.logger.Errorf("label %s is not a string", k)
}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(ctx, container.ID)}

Check warning on line 462 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L462

Added line #L462 was not covered by tests
}

return nil
}

func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
ticker := time.NewTicker(d.CheckIntervalDuration)
d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
for {
Expand All @@ -475,7 +476,7 @@
case <-ticker.C:
// to track for garbage collection
runningContainersID := make(map[string]bool)
runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for idx, container := range d.runningContainerState {
Expand All @@ -501,7 +502,7 @@
if _, ok := d.runningContainerState[container.ID]; ok {
continue
}
if containerConfig := d.EvalContainer(container); containerConfig != nil {
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
monitChan <- containerConfig
}
}
Expand All @@ -524,10 +525,10 @@
deleteChan := make(chan *ContainerConfig)
d.logger.Infof("Starting docker acquisition")
t.Go(func() error {
return d.DockerManager(monitChan, deleteChan, out)
return d.DockerManager(ctx, monitChan, deleteChan, out)
})

return d.WatchContainer(monitChan, deleteChan)
return d.WatchContainer(ctx, monitChan, deleteChan)
}

func (d *DockerSource) Dump() interface{} {
Expand All @@ -541,9 +542,9 @@
return scanner.Err()
}

func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
container.logger.Infof("start tail for container %s", container.Name)
dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *d.containerLogsOptions)
if err != nil {
container.logger.Errorf("unable to read logs from container: %+v", err)
return err
Expand Down Expand Up @@ -601,7 +602,7 @@
}
}

func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
d.logger.Info("DockerSource Manager started")
for {
select {
Expand All @@ -610,7 +611,7 @@
newContainer.t = &tomb.Tomb{}
newContainer.logger = d.logger.WithField("container_name", newContainer.Name)
newContainer.t.Go(func() error {
return d.TailDocker(newContainer, outChan, deleteChan)
return d.TailDocker(ctx, newContainer, outChan, deleteChan)
})
d.runningContainerState[newContainer.ID] = newContainer
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/acquisition/modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ func (k *KafkaSource) Dump() interface{} {
return k
}

func (k *KafkaSource) ReadMessage(out chan types.Event) error {
func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) error {
// Start processing from latest Offset
k.Reader.SetOffsetAt(context.Background(), time.Now())
k.Reader.SetOffsetAt(ctx, time.Now())
for {
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
m, err := k.Reader.ReadMessage(context.Background())
m, err := k.Reader.ReadMessage(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
Expand Down Expand Up @@ -184,10 +184,10 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
}
}

func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
func (k *KafkaSource) RunReader(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
t.Go(func() error {
return k.ReadMessage(out)
return k.ReadMessage(ctx, out)
})
//nolint //fp
for {
Expand All @@ -207,7 +207,7 @@ func (k *KafkaSource) StreamingAcquisition(ctx context.Context, out chan types.E

t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/kafka/live")
return k.RunReader(out, t)
return k.RunReader(ctx, out, t)
})

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kubernetesaudit/k8s_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (ka *KubernetesAuditSource) StreamingAcquisition(ctx context.Context, out c
})
<-t.Dying()
ka.logger.Infof("Stopping k8s-audit server on %s:%d%s", ka.config.ListenAddr, ka.config.ListenPort, ka.config.WebhookPath)
ka.server.Shutdown(context.TODO())
ka.server.Shutdown(ctx)
return nil
})
return nil
Expand Down