Skip to content

Commit

Permalink
Merge pull request #845 from forta-network/kisel/forta-1545-docker-st…
Browse files Browse the repository at this point in the history
…ats-metrics

Poll docker resources and publish as metrics
  • Loading branch information
dkeysil authored Jan 29, 2024
2 parents c097d7f + d02cf42 commit d27ce5c
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 103 deletions.
35 changes: 35 additions & 0 deletions clients/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,41 @@ func (d *dockerClient) GetContainerFromRemoteAddr(ctx context.Context, hostPort
return agentContainer, nil
}

func (d *dockerClient) ContainerStats(ctx context.Context, id string) (*ContainerResources, error) {
stats, err := d.cli.ContainerStatsOneShot(ctx, id)
if err != nil {
return nil, err
}

var containerResources types.StatsJSON

err = json.NewDecoder(stats.Body).Decode(&containerResources)
if err != nil {
return nil, err
}

resources := ContainerResources{
CPUStats: CPUStats{
CPUUsage: CPUUsage{
TotalUsage: containerResources.CPUStats.CPUUsage.TotalUsage,
},
},
MemoryStats: MemoryStats{
Usage: containerResources.MemoryStats.Usage,
},
NetworkStats: make(map[string]NetworkStats),
}

for name, network := range containerResources.Networks {
resources.NetworkStats[name] = NetworkStats{
RxBytes: network.RxBytes,
TxBytes: network.TxBytes,
}
}

return &resources, nil
}

func initLabels(name string) []dockerLabel {
if len(name) == 0 {
return defaultLabels
Expand Down
26 changes: 26 additions & 0 deletions clients/docker/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package docker

type ContainerResources struct {
CPUStats CPUStats `json:"cpu_stats"`
MemoryStats MemoryStats `json:"memory_stats"`
NetworkStats map[string]NetworkStats `json:"networks"`
}

type CPUStats struct {
CPUUsage CPUUsage `json:"cpu_usage"`
}

type CPUUsage struct {
TotalUsage uint64 `json:"total_usage"`
}

type MemoryStats struct {
Usage uint64 `json:"usage"`
}

type NetworkStats struct {
// Bytes received
RxBytes uint64 `json:"rx_bytes"`
// Bytes sent
TxBytes uint64 `json:"tx_bytes"`
}
1 change: 1 addition & 0 deletions clients/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type DockerClient interface {
GetContainerFromRemoteAddr(ctx context.Context, hostPort string) (*types.Container, error)
SetImagePullCooldown(threshold int, cooldownDuration time.Duration)
Events(ctx context.Context, since time.Time) (<-chan events.Message, <-chan error)
ContainerStats(ctx context.Context, containerID string) (*docker.ContainerResources, error)
}

// MessageClient receives and publishes messages.
Expand Down
15 changes: 15 additions & 0 deletions clients/mocks/mock_clients.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible
require (
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6 h1:fDydEruBxtBYrWG5ct+YPwXuJiuUrT+QRU6A8Xz3ids=
github.com/forta-network/forta-core-go v0.0.0-20240110111218-b4b8774d2bb6/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283 h1:MmvZ3so59eNLtsJgEnRS1cwy/uqI/PazAS0x9Xkl3+E=
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
2 changes: 2 additions & 0 deletions services/components/botio/bot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func (s *BotClientSuite) TestStartProcessStop() {
s.lifecycleMetrics.EXPECT().ActionUnsubscribe(combinerSubscriptions)

s.r.NoError(s.botClient.Close())
// Using small sleep to allow goroutines to be executed (e.g. health check)
time.Sleep(30 * time.Millisecond)
}

func (s *BotClientSuite) TestCombinerBotSubscriptions() {
Expand Down
2 changes: 1 addition & 1 deletion services/components/containers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (es *eventHandler) HandleEvent(ctx context.Context, event *events.Message)
}

metrics.SendAgentMetrics(es.msgClient, []*protocol.AgentMetric{metric})
return
}

func isOneOf(input string, values ...string) bool {
Expand All @@ -101,6 +100,7 @@ func isOneOf(input string, values ...string) bool {
}

func metricNameFrom(event *events.Message) string {
// metric name should be alligned with forta-core-go/domain/metrics.go
return strings.Join([]string{"docker", event.Type, event.Action}, ".")
}

Expand Down
74 changes: 74 additions & 0 deletions services/components/containers/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package containers

import (
"context"
"time"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/services/components/metrics"
"github.com/sirupsen/logrus"
)

var (
defaultPollingInterval = time.Second * 10
)

// PollDockerResources gets CPU and MEM usage for all agent containers with docker stats.
func PollDockerResources(
ctx context.Context, dockerClient clients.DockerClient,
msgClient clients.MessageClient,
) {
pollingTicker := time.NewTicker(defaultPollingInterval)

for {
select {
case <-ctx.Done():
logrus.Info("stopping docker resources poller")
return
case <-pollingTicker.C:
logrus.Info("polling docker resources")
containers, err := dockerClient.GetContainers(ctx)
if err != nil {
logrus.WithError(err).Error("error while getting docker containers")
continue
}

for _, container := range containers {
resources, err := dockerClient.ContainerStats(ctx, container.ID)
if err != nil {
logrus.WithError(err).Error("error while getting container stats", container.ID)
continue
}

botID, ok := container.Labels[docker.LabelFortaBotID]
if !ok {
continue
}

var (
bytesSent uint64
bytesRecv uint64
)

for _, network := range resources.NetworkStats {
bytesSent += network.TxBytes
bytesRecv += network.RxBytes
}

metrics.SendAgentMetrics(msgClient, []*protocol.AgentMetric{
metrics.CreateResourcesMetric(
botID, domain.MetricDockerResourcesCPU, float64(resources.CPUStats.CPUUsage.TotalUsage)),
metrics.CreateResourcesMetric(
botID, domain.MetricDockerResourcesMemory, float64(resources.MemoryStats.Usage)),
metrics.CreateResourcesMetric(
botID, domain.MetricDockerResourcesNetworkSent, float64(bytesSent)),
metrics.CreateResourcesMetric(
botID, domain.MetricDockerResourcesNetworkReceive, float64(bytesRecv)),
})
}
}
}
}
86 changes: 86 additions & 0 deletions services/components/containers/resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package containers

import (
"context"
"testing"
"time"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/clients/messaging"
mock_clients "github.com/forta-network/forta-node/clients/mocks"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

func TestPollDockerResources(t *testing.T) {
defaultPollingInterval = 100 * time.Millisecond

ctrl := gomock.NewController(t)
dockerClient := mock_clients.NewMockDockerClient(ctrl)
msgClient := mock_clients.NewMockMessageClient(ctrl)

ctx := context.Background()

dockerContainerID := "test-container-id"
agentID := "test-agent-id"

dockerClient.EXPECT().GetContainers(ctx).Return(docker.ContainerList{
{
ID: dockerContainerID,
Labels: map[string]string{
docker.LabelFortaBotID: agentID,
},
},
}, nil)

dockerClient.EXPECT().ContainerStats(ctx, dockerContainerID).Return(&docker.ContainerResources{
CPUStats: docker.CPUStats{
CPUUsage: docker.CPUUsage{
TotalUsage: 33,
},
},
MemoryStats: docker.MemoryStats{
Usage: 100,
},
NetworkStats: map[string]docker.NetworkStats{
"eth0": {
RxBytes: 123,
TxBytes: 456,
},
},
}, nil)

doneCh := make(chan struct{})

msgClient.EXPECT().PublishProto(messaging.SubjectMetricAgent, gomock.Any()).Do(func(v1, v2 interface{}) {
metrics := v2.(*protocol.AgentMetricList)
assert.Len(t, metrics.Metrics, 4)

// CPU metric
assert.Equal(t, agentID, metrics.Metrics[0].AgentId)
assert.Equal(t, domain.MetricDockerResourcesCPU, metrics.Metrics[0].Name)
assert.Equal(t, float64(33), metrics.Metrics[0].Value)

// Memory metric
assert.Equal(t, agentID, metrics.Metrics[1].AgentId)
assert.Equal(t, domain.MetricDockerResourcesMemory, metrics.Metrics[1].Name)
assert.Equal(t, float64(100), metrics.Metrics[1].Value)

// Network bytes sent metric
assert.Equal(t, agentID, metrics.Metrics[2].AgentId)
assert.Equal(t, domain.MetricDockerResourcesNetworkSent, metrics.Metrics[2].Name)
assert.Equal(t, float64(456), metrics.Metrics[2].Value)

// Network bytes received metric
assert.Equal(t, agentID, metrics.Metrics[3].AgentId)
assert.Equal(t, domain.MetricDockerResourcesNetworkReceive, metrics.Metrics[3].Name)
assert.Equal(t, float64(123), metrics.Metrics[3].Value)

close(doneCh)
})

go PollDockerResources(ctx, dockerClient, msgClient)
<-doneCh
}
9 changes: 9 additions & 0 deletions services/components/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func CreateEventMetric(t time.Time, id string, metric string, details string) *p
}
}

func CreateResourcesMetric(id, metric string, value float64) *protocol.AgentMetric {
return &protocol.AgentMetric{
AgentId: id,
Timestamp: time.Now().Format(time.RFC3339),
Name: metric,
Value: value,
}
}

func createMetrics(agt config.AgentConfig, timestamp string, metricMap map[string]float64) []*protocol.AgentMetric {
var res []*protocol.AgentMetric

Expand Down
3 changes: 2 additions & 1 deletion services/supervisor/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func (sup *SupervisorService) start() error {
go func() {
// wait for the publisher so it can catch the metrics
time.Sleep(time.Minute)
containers.ListenToDockerEvents(sup.ctx, sup.globalClient, sup.msgClient, startTime)
go containers.ListenToDockerEvents(sup.ctx, sup.globalClient, sup.msgClient, startTime)
go containers.PollDockerResources(sup.ctx, sup.globalClient, sup.msgClient)
}()

sup.scannerContainer, err = sup.client.StartContainer(
Expand Down
Loading

0 comments on commit d27ce5c

Please sign in to comment.