diff --git a/services/components/components.go b/services/components/components.go index fdecb315..8c0e8a68 100644 --- a/services/components/components.go +++ b/services/components/components.go @@ -4,7 +4,9 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" + "github.com/forta-network/forta-core-go/clients/agentlogs" "github.com/forta-network/forta-core-go/utils" "github.com/forta-network/forta-node/clients" "github.com/forta-network/forta-node/clients/agentgrpc" @@ -72,6 +74,7 @@ type BotLifecycleConfig struct { ScannerAddress common.Address MessageClient clients.MessageClient BotRegistry registry.BotRegistry + Key *keystore.Key } // BotLifecycle contains the bot lifecycle components. @@ -79,10 +82,14 @@ type BotLifecycle struct { BotManager lifecycle.BotLifecycleManager BotClient containers.BotClient ImageCleanup containers.ImageCleanup + BotLogger lifecycle.BotLogger } // GetBotLifecycleComponents returns the bot lifecycle management components. -func GetBotLifecycleComponents(ctx context.Context, botLifeConfig BotLifecycleConfig) (BotLifecycle, error) { +func GetBotLifecycleComponents( + ctx context.Context, + botLifeConfig BotLifecycleConfig, +) (BotLifecycle, error) { cfg := botLifeConfig.Config // bot image client is helpful for loading local mode agents from a restricted container registry var ( @@ -120,10 +127,17 @@ func GetBotLifecycleComponents(ctx context.Context, botLifeConfig BotLifecycleCo lifecycleMetrics, botMonitor, ) imageCleanup := containers.NewImageCleanup(dockerClient, botLifeConfig.BotRegistry) + botLogger := lifecycle.NewBotLogger( + botClient, + dockerClient, + botLifeConfig.Key, + agentlogs.NewClient(botLifeConfig.Config.AgentLogsConfig.URL).SendLogs, + ) return BotLifecycle{ BotManager: botManager, BotClient: botClient, ImageCleanup: imageCleanup, + BotLogger: botLogger, }, nil } diff --git a/services/components/lifecycle/bot_logger.go b/services/components/lifecycle/bot_logger.go new file mode 100644 index 00000000..da7620ac --- /dev/null +++ b/services/components/lifecycle/bot_logger.go @@ -0,0 +1,112 @@ +package lifecycle + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/forta-network/forta-core-go/clients/agentlogs" + "github.com/forta-network/forta-core-go/security" + "github.com/forta-network/forta-node/clients" + "github.com/forta-network/forta-node/clients/docker" + "github.com/forta-network/forta-node/services/components/containers" + log "github.com/sirupsen/logrus" +) + +// BotLogger manages bots logging. +type BotLogger interface { + SendBotLogs(ctx context.Context) error +} + +type botLogger struct { + botClient containers.BotClient + dockerClient clients.DockerClient + key *keystore.Key + prevAgentLogs agentlogs.Agents + + sendAgentLogs func(agents agentlogs.Agents, authToken string) error +} + +var _ BotLogger = &botLogger{} + +func NewBotLogger( + botClient containers.BotClient, + dockerClient clients.DockerClient, + key *keystore.Key, + sendAgentLogs func(agents agentlogs.Agents, authToken string) error, +) *botLogger { + return &botLogger{ + botClient: botClient, + dockerClient: dockerClient, + key: key, + sendAgentLogs: sendAgentLogs, + } +} + +// adjust these better with auto-upgrade later +const ( + defaultAgentLogSendInterval = time.Minute + defaultAgentLogTailLines = 50 + defaultAgentLogAvgMaxCharsPerLine = 200 +) + +func (bl *botLogger) SendBotLogs(ctx context.Context) error { + var ( + sendLogs agentlogs.Agents + keepLogs agentlogs.Agents + ) + + botContainers, err := bl.botClient.LoadBotContainers(ctx) + if err != nil { + return fmt.Errorf("failed to load the bot containers: %v", err) + } + + for _, container := range botContainers { + if container.Labels[docker.LabelFortaSettingsAgentLogsEnable] != "true" { + continue + } + logs, err := bl.dockerClient.GetContainerLogs( + ctx, container.ID, + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ) + if err != nil { + log.WithError(err).Warn("failed to get agent container logs") + continue + } + + agent := &agentlogs.Agent{ + ID: container.Labels[docker.LabelFortaBotID], + Logs: logs, + } + // don't send if it's the same with previous logs but keep it for next time + // so we can check + keepLogs = append(keepLogs, agent) + if !bl.prevAgentLogs.Has(agent.ID, logs) { + log.WithField("agent", agent.ID).Debug("new agent logs found") + sendLogs = append(sendLogs, agent) + } else { + log.WithField("agent", agent.ID).Debug("no new agent logs") + } + } + + if len(sendLogs) > 0 { + scannerJwt, err := security.CreateScannerJWT(bl.key, map[string]interface{}{ + "access": "bot_logger", + }) + if err != nil { + return fmt.Errorf("failed to create scanner token: %v", err) + } + if err := bl.sendAgentLogs(sendLogs, scannerJwt); err != nil { + return fmt.Errorf("failed to send agent logs: %v", err) + } + log.WithField("count", len(sendLogs)).Debug("successfully sent new agent logs") + } else { + log.Debug("no new agent logs were found - not sending") + } + + bl.prevAgentLogs = keepLogs + return nil +} diff --git a/services/components/lifecycle/bot_logger_test.go b/services/components/lifecycle/bot_logger_test.go new file mode 100644 index 00000000..55e84946 --- /dev/null +++ b/services/components/lifecycle/bot_logger_test.go @@ -0,0 +1,200 @@ +package lifecycle + +import ( + "context" + "errors" + "strconv" + "testing" + + "github.com/docker/docker/api/types" + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/forta-network/forta-core-go/clients/agentlogs" + "github.com/forta-network/forta-core-go/security" + "github.com/forta-network/forta-node/clients/docker" + mock_clients "github.com/forta-network/forta-node/clients/mocks" + mock_containers "github.com/forta-network/forta-node/services/components/containers/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +func TestSendBotLogsSuite(t *testing.T) { + suite.Run(t, &BotLoggerSuite{}) +} + +type BotLoggerSuite struct { + r *require.Assertions + + botLogger *botLogger + botClient *mock_containers.MockBotClient + dockerClient *mock_clients.MockDockerClient + key *keystore.Key + suite.Suite +} + +func (s *BotLoggerSuite) SetupTest() { + t := s.T() + ctrl := gomock.NewController(s.T()) + r := s.Require() + + botClient := mock_containers.NewMockBotClient(ctrl) + dockerClient := mock_clients.NewMockDockerClient(ctrl) + + dir := t.TempDir() + ks := keystore.NewKeyStore(dir, keystore.StandardScryptN, keystore.StandardScryptP) + + _, err := ks.NewAccount("Forta123") + r.NoError(err) + + key, err := security.LoadKeyWithPassphrase(dir, "Forta123") + r.NoError(err) + + s.botClient = botClient + s.dockerClient = dockerClient + s.key = key + s.r = r +} + +func (s *BotLoggerSuite) TestSendBotLogs() { + botLogger := NewBotLogger( + s.botClient, s.dockerClient, s.key, + func(agents agentlogs.Agents, authToken string) error { + s.r.Equal(2, len(agents)) + s.r.Equal("bot1ID", agents[0].ID) + s.r.Equal("bot2ID", agents[1].ID) + return nil + }, + ) + ctx := context.Background() + + mockContainers := []types.Container{ + { + ID: "bot1", + Image: "forta/bot:latest", + Labels: map[string]string{ + docker.LabelFortaSettingsAgentLogsEnable: "true", + docker.LabelFortaBotID: "bot1ID", + }, + }, + { + ID: "bot2", + Image: "forta/bot:latest", + Labels: map[string]string{ + docker.LabelFortaSettingsAgentLogsEnable: "true", + docker.LabelFortaBotID: "bot2ID", + }, + }, + } + s.dockerClient.EXPECT().GetContainerLogs( + ctx, "bot1", + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ).Return("some log", nil).Times(1) + + s.dockerClient.EXPECT().GetContainerLogs( + ctx, "bot2", + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ).Return("some log", nil).Times(1) + + s.botClient.EXPECT().LoadBotContainers(ctx).Return(mockContainers, nil) + s.r.NoError(botLogger.SendBotLogs(ctx)) +} + +// should fail if there is an error loading +// bot containers +func (s *BotLoggerSuite) TestLoadBotContainersError() { + botLogger := NewBotLogger( + s.botClient, s.dockerClient, s.key, + func(agents agentlogs.Agents, authToken string) error { + return nil + }, + ) + ctx := context.Background() + + mockContainers := []types.Container{} + + s.botClient.EXPECT().LoadBotContainers(ctx).Return(mockContainers, errors.New("test")) + s.r.EqualError(botLogger.SendBotLogs(ctx), "failed to load the bot containers: test") +} + +// Should not send agent logs if fails +// to get container logs but continue processing +func (s *BotLoggerSuite) TestGetContainerLogsError() { + botLogger := NewBotLogger( + s.botClient, s.dockerClient, s.key, + func(agents agentlogs.Agents, authToken string) error { + s.r.Equal(1, len(agents)) + s.r.Equal("bot2ID", agents[0].ID) + s.r.Equal("some log", agents[0].Logs) + return nil + }, + ) + ctx := context.Background() + + mockContainers := []types.Container{ + { + ID: "bot1", + Image: "forta/bot:latest", + Labels: map[string]string{ + docker.LabelFortaSettingsAgentLogsEnable: "true", + }, + }, + { + ID: "bot2", + Image: "forta/bot:latest", + Labels: map[string]string{ + docker.LabelFortaSettingsAgentLogsEnable: "true", + docker.LabelFortaBotID: "bot2ID", + }, + }, + } + + s.botClient.EXPECT().LoadBotContainers(ctx).Return(mockContainers, nil) + + s.dockerClient.EXPECT().GetContainerLogs( + ctx, "bot1", + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ).Return("", errors.New("test")).Times(1) + + s.dockerClient.EXPECT().GetContainerLogs( + ctx, "bot2", + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ).Return("some log", nil).Times(1) + + s.r.NoError(botLogger.SendBotLogs(ctx)) +} + +// Fails sending agent logs +func (s *BotLoggerSuite) TestFailsToSendLogs() { + botLogger := NewBotLogger( + s.botClient, s.dockerClient, s.key, + func(agents agentlogs.Agents, authToken string) error { + return errors.New("test") + }, + ) + ctx := context.Background() + + mockContainers := []types.Container{ + { + ID: "bot1", + Image: "forta/bot:latest", + Labels: map[string]string{ + docker.LabelFortaSettingsAgentLogsEnable: "true", + docker.LabelFortaBotID: "bot1ID", + }, + }, + } + + s.botClient.EXPECT().LoadBotContainers(ctx).Return(mockContainers, nil) + + s.dockerClient.EXPECT().GetContainerLogs( + ctx, "bot1", + strconv.Itoa(defaultAgentLogTailLines), + defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, + ).Return("some log", nil).Times(1) + + s.r.EqualError(botLogger.SendBotLogs(ctx), "failed to send agent logs: test") +} diff --git a/services/components/lifecycle/bot_pool.go b/services/components/lifecycle/bot_pool.go index 7dc891b3..348f4ed6 100644 --- a/services/components/lifecycle/bot_pool.go +++ b/services/components/lifecycle/bot_pool.go @@ -91,7 +91,7 @@ func (bp *botPool) UpdateBotsWithLatestConfigs(latestConfigs messaging.AgentPayl // add new bots var latestBotClients []botio.BotClient for _, botConfig := range latestConfigs { - logger := botLogger(botConfig) + logger := botLog(botConfig) botClient, ok := bp.getBotClient(botConfig.ContainerName()) if ok && !botClient.IsClosed() { logger.Debug("bot client already exists - skipping update") @@ -112,7 +112,7 @@ func (bp *botPool) UpdateBotsWithLatestConfigs(latestConfigs messaging.AgentPayl // updated the config of the bots that have different config updatedBotConfigs := FindUpdatedBots(bp.getConfigsUnsafe(), latestConfigs) for _, updatedBotConfig := range updatedBotConfigs { - logger := botLogger(updatedBotConfig) + logger := botLog(updatedBotConfig) botClient, ok := bp.getBotClient(updatedBotConfig.ContainerName()) if !ok { logger.Info("could not find the updated bot! skipping") @@ -153,7 +153,7 @@ func (bp *botPool) RemoveBotsWithConfigs(removedBotConfigs messaging.AgentPayloa // close and discard the removed bots for _, removedBotConfig := range removedBotConfigs { - logger := botLogger(removedBotConfig) + logger := botLog(removedBotConfig) botClient, ok := bp.getBotClient(removedBotConfig.ContainerName()) if !ok { logger.Info("could not find the removed bot! skipping") @@ -213,7 +213,7 @@ func (bp *botPool) WaitForAll() { bp.botWg.Wait() } -func botLogger(botConfig config.AgentConfig) *log.Entry { +func botLog(botConfig config.AgentConfig) *log.Entry { return log.WithField("bot", botConfig.ID).WithField("container", botConfig.ContainerName()) } diff --git a/services/supervisor/agent_logs.go b/services/supervisor/agent_logs.go index cc307fb7..27352884 100644 --- a/services/supervisor/agent_logs.go +++ b/services/supervisor/agent_logs.go @@ -1,28 +1,16 @@ package supervisor import ( - "fmt" - "strconv" "time" - "github.com/forta-network/forta-core-go/clients/agentlogs" - "github.com/forta-network/forta-core-go/security" - "github.com/forta-network/forta-node/clients/docker" log "github.com/sirupsen/logrus" ) -// adjust these better with auto-upgrade later -const ( - defaultAgentLogSendInterval = time.Minute - defaultAgentLogTailLines = 50 - defaultAgentLogAvgMaxCharsPerLine = 200 -) - func (sup *SupervisorService) syncAgentLogs() { interval := time.Duration(sup.botLifecycleConfig.Config.AgentLogsConfig.SendIntervalSeconds) * time.Second ticker := time.NewTicker(interval) for range ticker.C { - err := sup.doSyncAgentLogs() + err := sup.botLifecycle.BotLogger.SendBotLogs(sup.ctx) sup.lastAgentLogsRequest.Set() sup.lastAgentLogsRequestError.Set(err) if err != nil { @@ -30,65 +18,3 @@ func (sup *SupervisorService) syncAgentLogs() { } } } - -func (sup *SupervisorService) doSyncAgentLogs() error { - sup.mu.RLock() - defer sup.mu.RUnlock() - - var ( - sendLogs agentlogs.Agents - keepLogs agentlogs.Agents - ) - - botContainers, err := sup.botLifecycle.BotClient.LoadBotContainers(sup.ctx) - if err != nil { - return fmt.Errorf("failed to load the bot containers: %v", err) - } - - for _, container := range botContainers { - if container.Labels[docker.LabelFortaSettingsAgentLogsEnable] != "true" { - continue - } - logs, err := sup.client.GetContainerLogs( - sup.ctx, container.ID, - strconv.Itoa(defaultAgentLogTailLines), - defaultAgentLogAvgMaxCharsPerLine*defaultAgentLogTailLines, - ) - if err != nil { - log.WithError(err).Warn("failed to get agent container logs") - continue - } - - agent := &agentlogs.Agent{ - ID: container.Labels[docker.LabelFortaBotID], - Logs: logs, - } - // don't send if it's the same with previous logs but keep it for next time - // so we can check - keepLogs = append(keepLogs, agent) - if !sup.prevAgentLogs.Has(agent.ID, logs) { - log.WithField("agent", agent.ID).Debug("new agent logs found") - sendLogs = append(sendLogs, agent) - } else { - log.WithField("agent", agent.ID).Debug("no new agent logs") - } - } - - if len(sendLogs) > 0 { - scannerJwt, err := security.CreateScannerJWT(sup.config.Key, map[string]interface{}{ - "access": "agent_logs", - }) - if err != nil { - return fmt.Errorf("failed to create scanner token: %v", err) - } - if err := sup.sendAgentLogs(sendLogs, scannerJwt); err != nil { - return fmt.Errorf("failed to send agent logs: %v", err) - } - log.WithField("count", len(sendLogs)).Debug("successfully sent new agent logs") - } else { - log.Debug("no new agent logs were found - not sending") - } - - sup.prevAgentLogs = keepLogs - return nil -} diff --git a/services/supervisor/services.go b/services/supervisor/services.go index 938c0bd2..ccb94d0c 100644 --- a/services/supervisor/services.go +++ b/services/supervisor/services.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts/keystore" - "github.com/forta-network/forta-core-go/clients/agentlogs" "github.com/forta-network/forta-core-go/clients/health" "github.com/forta-network/forta-core-go/manifest" "github.com/forta-network/forta-core-go/protocol" @@ -78,9 +77,7 @@ type SupervisorService struct { healthClient health.HealthClient - sendAgentLogs func(agents agentlogs.Agents, authToken string) error - prevAgentLogs agentlogs.Agents - inspectionCh chan *protocol.InspectionResults + inspectionCh chan *protocol.InspectionResults } type SupervisorServiceConfig struct { @@ -786,7 +783,6 @@ func NewSupervisorService(ctx context.Context, cfg SupervisorServiceConfig) (*Su botLifecycleConfig: cfg.BotLifecycleConfig, config: cfg, healthClient: health.NewClient(), - sendAgentLogs: agentlogs.NewClient(cfg.Config.AgentLogsConfig.URL).SendLogs, inspectionCh: make(chan *protocol.InspectionResults), } sup.autoUpdatesDisabled.Set(strconv.FormatBool(cfg.Config.AutoUpdate.Disable)) diff --git a/services/supervisor/services_test.go b/services/supervisor/services_test.go index 395db990..ebc923c6 100644 --- a/services/supervisor/services_test.go +++ b/services/supervisor/services_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/ethereum/go-ethereum/accounts/keystore" - "github.com/forta-network/forta-core-go/clients/agentlogs" "github.com/forta-network/forta-core-go/release" "github.com/forta-network/forta-core-go/security" "github.com/forta-network/forta-core-go/utils" @@ -251,19 +250,6 @@ func (s *Suite) TestStartServices() { s.r.NoError(s.supervisor.start()) } -func (s *Suite) TestDoSyncAgentLogs() { - s.botClient.EXPECT().LoadBotContainers(gomock.Any()).Return([]types.Container{{ - Labels: map[string]string{ - docker.LabelFortaSettingsAgentLogsEnable: "true", - }, - }}, nil) - s.dockerClient.EXPECT().GetContainerLogs(gomock.Any(), gomock.Any(), "50", 10000) - s.supervisor.sendAgentLogs = func(agents agentlogs.Agents, authToken string) error { - return nil - } - s.r.NoError(s.supervisor.doSyncAgentLogs()) -} - func (s *Suite) TestDoHealthCheck() { s.r.NoError(s.supervisor.doHealthCheck()) }