From e2adb18d3a04334c6c687131f3ea061921afc2e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Caner=20=C3=87=C4=B1dam?= Date: Wed, 20 Sep 2023 17:39:00 +0300 Subject: [PATCH 1/2] report docker events as metrics --- Makefile | 2 - clients/docker/client.go | 11 +- clients/interfaces.go | 8 +- clients/mocks/mock_clients.go | 16 + services/components/containers/definitions.go | 2 +- services/components/containers/events.go | 117 +++++++ services/components/containers/events_test.go | 287 ++++++++++++++++++ services/components/metrics/metrics.go | 10 + services/publisher/publisher.go | 2 +- services/supervisor/services.go | 9 + 10 files changed, 454 insertions(+), 10 deletions(-) create mode 100644 services/components/containers/events.go create mode 100644 services/components/containers/events_test.go diff --git a/Makefile b/Makefile index 52a8785c..4fae1499 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,9 @@ containers: docker build -t forta-network/forta-node -f Dockerfile.node . - docker pull nats:2.3.2 containers-dev: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o forta-node cmd/node/main.go DOCKER_BUILDKIT=1 docker build --no-cache --network=host -t forta-network/forta-node -f Dockerfile.buildkit.dev.node . - docker pull nats:2.3.2 main: docker build -t build-forta -f Dockerfile.cli . diff --git a/clients/docker/client.go b/clients/docker/client.go index cda73b6f..a67ed5dc 100644 --- a/clients/docker/client.go +++ b/clients/docker/client.go @@ -12,16 +12,16 @@ import ( "strings" "time" - "github.com/goccy/go-json" - "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/forta-network/forta-core-go/utils/workers" "github.com/forta-network/forta-node/clients/cooldown" "github.com/forta-network/forta-node/config" + "github.com/goccy/go-json" log "github.com/sirupsen/logrus" ) @@ -843,6 +843,13 @@ func makeLabelFilter(labels []dockerLabel) filters.Args { return filter } +// Events returns channels that send the Docker events and listening/decoding errors. +func (d *dockerClient) Events(ctx context.Context, since time.Time) (<-chan events.Message, <-chan error) { + return d.cli.Events(ctx, types.EventsOptions{ + Since: since.Format(time.RFC3339), + }) +} + func (d *dockerClient) GetContainerFromRemoteAddr(ctx context.Context, hostPort string) (*types.Container, error) { containers, err := d.GetContainers(ctx) if err != nil { diff --git a/clients/interfaces.go b/clients/interfaces.go index ccc5e7ff..bc820ab9 100644 --- a/clients/interfaces.go +++ b/clients/interfaces.go @@ -4,13 +4,12 @@ import ( "context" "time" - "github.com/forta-network/forta-core-go/domain" - "github.com/docker/docker/api/types" - "github.com/golang/protobuf/proto" - + "github.com/docker/docker/api/types/events" + "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-node/clients/docker" "github.com/forta-network/forta-node/config" + "github.com/golang/protobuf/proto" ) // DockerClient is a client interface for interacting with docker @@ -47,6 +46,7 @@ type DockerClient interface { GetContainerLogs(ctx context.Context, containerID, tail string, truncate int) (string, error) GetContainerFromRemoteAddr(ctx context.Context, hostPort string) (*types.Container, error) SetImagePullCooldown(threshold int, cooldownDuration time.Duration) + Events(ctx context.Context, since time.Time) (<-chan events.Message, <-chan error) } // MessageClient receives and publishes messages. diff --git a/clients/mocks/mock_clients.go b/clients/mocks/mock_clients.go index 1b37422c..24314432 100644 --- a/clients/mocks/mock_clients.go +++ b/clients/mocks/mock_clients.go @@ -10,6 +10,7 @@ import ( time "time" types "github.com/docker/docker/api/types" + events "github.com/docker/docker/api/types/events" domain "github.com/forta-network/forta-core-go/domain" docker "github.com/forta-network/forta-node/clients/docker" config "github.com/forta-network/forta-node/config" @@ -126,6 +127,21 @@ func (mr *MockDockerClientMockRecorder) EnsurePublicNetwork(ctx, name interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsurePublicNetwork", reflect.TypeOf((*MockDockerClient)(nil).EnsurePublicNetwork), ctx, name) } +// Events mocks base method. +func (m *MockDockerClient) Events(ctx context.Context, since time.Time) (<-chan events.Message, <-chan error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Events", ctx, since) + ret0, _ := ret[0].(<-chan events.Message) + ret1, _ := ret[1].(<-chan error) + return ret0, ret1 +} + +// Events indicates an expected call of Events. +func (mr *MockDockerClientMockRecorder) Events(ctx, since interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Events", reflect.TypeOf((*MockDockerClient)(nil).Events), ctx, since) +} + // GetContainerByID mocks base method. func (m *MockDockerClient) GetContainerByID(ctx context.Context, id string) (*types.Container, error) { m.ctrl.T.Helper() diff --git a/services/components/containers/definitions.go b/services/components/containers/definitions.go index b010191f..dd93070b 100644 --- a/services/components/containers/definitions.go +++ b/services/components/containers/definitions.go @@ -15,7 +15,7 @@ const ( LabelValueFortaIsBot = "true" // LabelValueStrategyVersion is for versioning the critical changes in container management strategy. // It's effective in deciding if a bot container should be re-created or not. - LabelValueStrategyVersion = "2023-06-16T15:00:00Z" + LabelValueStrategyVersion = "2023-09-20T12:00:00Z" ) // Limits define container limits. diff --git a/services/components/containers/events.go b/services/components/containers/events.go new file mode 100644 index 00000000..b7a1c243 --- /dev/null +++ b/services/components/containers/events.go @@ -0,0 +1,117 @@ +package containers + +import ( + "context" + "strings" + "time" + + "github.com/docker/docker/api/types/events" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-node/clients" + "github.com/forta-network/forta-node/clients/docker" + "github.com/forta-network/forta-node/services/components/metrics" + "github.com/sirupsen/logrus" +) + +// ListenToDockerEvents creates new. +func ListenToDockerEvents( + ctx context.Context, dockerClient clients.DockerClient, msgClient clients.MessageClient, + startFrom time.Time, +) { + handler := &eventHandler{ + dockerClient: dockerClient, + msgClient: msgClient, + } + + for { + select { + case <-ctx.Done(): + logrus.Info("stopping docker events listener") + return + default: + } + + events, errs := dockerClient.Events(ctx, startFrom) + + var restartListening bool + for { + select { + case event := <-events: + handler.HandleEvent(ctx, &event) + + case err := <-errs: + logrus.WithError(err).Error("error while listening to docker events") + // set the start time and restart listening + startFrom = time.Now().Add(-1 * time.Second) + restartListening = true + + case <-ctx.Done(): + logrus.Info("stopping docker events listener") + return + } + if restartListening { + break + } + } + } +} + +type eventHandler struct { + dockerClient clients.DockerClient + msgClient clients.MessageClient +} + +func (es *eventHandler) HandleEvent(ctx context.Context, event *events.Message) { + var metric *protocol.AgentMetric + ts := time.Unix(0, event.TimeNano) + switch event.Type { + case "image": + if event.Action != "pull" { + return + } + imageRef := getEventAttribute(event, "name") + metric = metrics.CreateEventMetric(ts, "system", metricNameFrom(event), imageRef) + + case "container", "network": + if !isOneOf(event.Action, "create", "destroy", "connect", "disconnect") { + return + } + botID, ok := getBotID(event) + if !ok { + botID = "system" + } + containerName := getEventAttribute(event, "name") + metric = metrics.CreateEventMetric(ts, botID, metricNameFrom(event), containerName) + + default: + return + } + + metrics.SendAgentMetrics(es.msgClient, []*protocol.AgentMetric{metric}) + return +} + +func isOneOf(input string, values ...string) bool { + for _, value := range values { + if input == value { + return true + } + } + return false +} + +func metricNameFrom(event *events.Message) string { + return strings.Join([]string{"docker", event.Type, event.Action}, ".") +} + +func getBotID(event *events.Message) (string, bool) { + val := getEventAttribute(event, docker.LabelFortaBotID) + return val, len(val) > 0 +} + +func getEventAttribute(event *events.Message, attr string) string { + if event.Actor.Attributes == nil { + return "" + } + return event.Actor.Attributes[attr] +} diff --git a/services/components/containers/events_test.go b/services/components/containers/events_test.go new file mode 100644 index 00000000..6c67236b --- /dev/null +++ b/services/components/containers/events_test.go @@ -0,0 +1,287 @@ +package containers + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/docker/docker/api/types/events" + "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-node/clients/docker" + "github.com/forta-network/forta-node/clients/messaging" + mock_clients "github.com/forta-network/forta-node/clients/mocks" + "github.com/golang/mock/gomock" +) + +var ( + testEventBotID = "0x12345" + testEventContainerName = "test-container-name" + testImageName = "test-image-name" + testEventTime = time.Now() +) + +// type metricsMatcher struct { +// metrics protocol.AgentMetricList +// } + +// func (m *metricsMatcher) Match(x interface{}) bool { +// incoming, ok := x.(protocol.AgentMetricList) +// if !ok { +// return false +// } +// m1 := m.metrics.Metrics[0] +// m2 := incoming.Metrics[0] + +// sameBot := m1.AgentId == m2.AgentId +// sameDetails := m2. +// } + +// func (m *metricsMatcher) String() string { +// return "metrics" +// } + +func TestListenToDockerEvents(t *testing.T) { + //r := require.New(t) + + ctrl := gomock.NewController(t) + dockerClient := mock_clients.NewMockDockerClient(ctrl) + msgClient := mock_clients.NewMockMessageClient(ctrl) + + eventsSince := time.Now() + ctx := context.Background() + eventCh := make(chan events.Message) + errCh := make(chan error) + + doneCh := make(chan struct{}) + dockerClient.EXPECT().Events(ctx, eventsSince).Return(eventCh, errCh).Do(func(v1, v2 interface{}) { + close(doneCh) + }) + resettedSince := gomock.Not(eventsSince) + dockerClient.EXPECT().Events(ctx, resettedSince).Return(eventCh, errCh) + go ListenToDockerEvents(ctx, dockerClient, msgClient, eventsSince) + errCh <- errors.New("test error") // causes the second dockerClient.Events() call + <-doneCh + + doneCh = make(chan struct{}) + msgClient.EXPECT().PublishProto(messaging.SubjectMetricAgent, gomock.Any()).Do(func(v1, v2 interface{}) { + close(doneCh) + }) + eventCh <- events.Message{ + Type: "container", + Action: "create", + Actor: events.Actor{ + Attributes: map[string]string{ + docker.LabelFortaBotID: testEventBotID, + }, + }, + } + <-doneCh +} + +func TestHandleEvent(t *testing.T) { + testCases := []struct { + incomingEvent events.Message + producedMetric *protocol.AgentMetric + }{ + { + incomingEvent: events.Message{ + Type: "image", + Action: "foo", // causes skip + Actor: events.Actor{ + Attributes: map[string]string{ + docker.LabelFortaBotID: testEventBotID, + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: nil, + }, + { + incomingEvent: events.Message{ + Type: "foo", // causes skip + Action: "bar", + Actor: events.Actor{ + Attributes: map[string]string{ + docker.LabelFortaBotID: testEventBotID, + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: nil, + }, + { + incomingEvent: events.Message{ + Type: "image", + Action: "pull", + Actor: events.Actor{ + Attributes: map[string]string{ + "name": testImageName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: "system", + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.image.pull", + Value: 1, + Details: testImageName, + }, + }, + { + incomingEvent: events.Message{ + Type: "container", + Action: "create", + Actor: events.Actor{ + Attributes: map[string]string{ + docker.LabelFortaBotID: testEventBotID, + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: testEventBotID, + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.container.create", + Value: 1, + Details: testEventContainerName, + }, + }, + { + incomingEvent: events.Message{ + Type: "container", + Action: "destroy", + Actor: events.Actor{ + Attributes: map[string]string{ + docker.LabelFortaBotID: testEventBotID, + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: testEventBotID, + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.container.destroy", + Value: 1, + Details: testEventContainerName, + }, + }, + { + incomingEvent: events.Message{ + Type: "network", + Action: "create", + Actor: events.Actor{ + Attributes: map[string]string{ + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: "system", + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.network.create", + Value: 1, + Details: testEventContainerName, + }, + }, + { + incomingEvent: events.Message{ + Type: "network", + Action: "destroy", + Actor: events.Actor{ + Attributes: map[string]string{ + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: "system", + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.network.destroy", + Value: 1, + Details: testEventContainerName, + }, + }, + { + incomingEvent: events.Message{ + Type: "network", + Action: "connect", + Actor: events.Actor{ + Attributes: map[string]string{ + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: "system", // disregards bot id in the label + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.network.connect", + Value: 1, + Details: testEventContainerName, + }, + }, + { + incomingEvent: events.Message{ + Type: "network", + Action: "disconnect", + Actor: events.Actor{ + Attributes: map[string]string{ + "name": testEventContainerName, + }, + }, + Time: testEventTime.Unix(), + TimeNano: testEventTime.UnixNano(), + }, + producedMetric: &protocol.AgentMetric{ + AgentId: "system", // disregards bot id in the label + Timestamp: testEventTime.Format(time.RFC3339), + Name: "docker.network.disconnect", + Value: 1, + Details: testEventContainerName, + }, + }, + } + + for _, testCase := range testCases { + t.Run(fmt.Sprintf("%s_%s", testCase.incomingEvent.Type, testCase.incomingEvent.Action), func(t *testing.T) { + ctrl := gomock.NewController(t) + dockerClient := mock_clients.NewMockDockerClient(ctrl) + msgClient := mock_clients.NewMockMessageClient(ctrl) + handler := &eventHandler{ + dockerClient: dockerClient, + msgClient: msgClient, + } + ctx := context.Background() + + if testCase.producedMetric != nil { + msgClient.EXPECT().PublishProto( + messaging.SubjectMetricAgent, newMetrics(testCase.producedMetric), + ) + } + handler.HandleEvent(ctx, &testCase.incomingEvent) + }) + } +} + +func newMetrics(ms ...*protocol.AgentMetric) *protocol.AgentMetricList { + return &protocol.AgentMetricList{ + Metrics: ms, + } +} diff --git a/services/components/metrics/metrics.go b/services/components/metrics/metrics.go index 543ea762..d803f48c 100644 --- a/services/components/metrics/metrics.go +++ b/services/components/metrics/metrics.go @@ -62,6 +62,16 @@ func CreateAgentMetric(agt config.AgentConfig, metric string, value float64) *pr } } +func CreateEventMetric(t time.Time, id string, metric string, details string) *protocol.AgentMetric { + return &protocol.AgentMetric{ + AgentId: id, + Timestamp: t.Format(time.RFC3339), + Name: metric, + Value: 1, + Details: details, + } +} + func createMetrics(agt config.AgentConfig, timestamp string, metricMap map[string]float64) []*protocol.AgentMetric { var res []*protocol.AgentMetric diff --git a/services/publisher/publisher.go b/services/publisher/publisher.go index 0b860ec7..baf24d28 100644 --- a/services/publisher/publisher.go +++ b/services/publisher/publisher.go @@ -303,7 +303,7 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo }, ) if err != nil { - logger.WithError(err).Error("failed to sign cid") + logger.WithError(err).Error("failed to create batch jwt") return false, err } resp, err = pub.alertClient.PostBatch(&domain.AlertBatchRequest{ diff --git a/services/supervisor/services.go b/services/supervisor/services.go index dbfb2287..4e935957 100644 --- a/services/supervisor/services.go +++ b/services/supervisor/services.go @@ -155,6 +155,9 @@ func (sup *SupervisorService) start() error { } commonNodeImage := supervisorContainer.Image + // start of service network and container launch + startTime := time.Now() + nodeNetworkID, err := sup.client.EnsurePublicNetwork(sup.ctx, config.DockerNetworkName) if err != nil { return err @@ -388,6 +391,12 @@ func (sup *SupervisorService) start() error { log.Info("inspection to completed") } + go func() { + // wait for the publisher so it can catch the metrics + time.Sleep(time.Minute) + containers.ListenToDockerEvents(sup.ctx, sup.globalClient, sup.msgClient, startTime) + }() + sup.scannerContainer, err = sup.client.StartContainer( sup.ctx, docker.ContainerConfig{ Name: config.DockerScannerContainerName, From 75583fbadb6eba2d22605abb5c6113a1f990a4d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Caner=20=C3=87=C4=B1dam?= Date: Wed, 20 Sep 2023 17:40:14 +0300 Subject: [PATCH 2/2] remove commented out code --- services/components/containers/events_test.go | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/services/components/containers/events_test.go b/services/components/containers/events_test.go index 6c67236b..710845d1 100644 --- a/services/components/containers/events_test.go +++ b/services/components/containers/events_test.go @@ -22,29 +22,7 @@ var ( testEventTime = time.Now() ) -// type metricsMatcher struct { -// metrics protocol.AgentMetricList -// } - -// func (m *metricsMatcher) Match(x interface{}) bool { -// incoming, ok := x.(protocol.AgentMetricList) -// if !ok { -// return false -// } -// m1 := m.metrics.Metrics[0] -// m2 := incoming.Metrics[0] - -// sameBot := m1.AgentId == m2.AgentId -// sameDetails := m2. -// } - -// func (m *metricsMatcher) String() string { -// return "metrics" -// } - func TestListenToDockerEvents(t *testing.T) { - //r := require.New(t) - ctrl := gomock.NewController(t) dockerClient := mock_clients.NewMockDockerClient(ctrl) msgClient := mock_clients.NewMockMessageClient(ctrl)