diff --git a/CHANGELOG-6.md b/CHANGELOG-6.md index 5551cd6094..538cd1b8f2 100644 --- a/CHANGELOG-6.md +++ b/CHANGELOG-6.md @@ -8,6 +8,17 @@ Versioning](http://semver.org/spec/v2.0.0.html). ## Unreleased +### 2024-03-07 + +### Added +- Addition of a new watcher configuration to monitor the user updates +- Added the exit mechanism to disconnect agent when user is disabled +- Added a struct in store which will get passed down for userConfigs + +### Changed +- The session config to watch over the user updated and the wizzard bus + + ### Changed - Upgraded CI Go version to 1.21.3 - Upgraded jwt version to 4.4.3 diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 54f26eda2d..38cf207441 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -17,8 +17,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/apid/actions" "github.com/sensu/sensu-go/backend/apid/middlewares" "github.com/sensu/sensu-go/backend/apid/routers" @@ -105,6 +105,7 @@ type Agentd struct { serveWaitTime time.Duration ready func() backendEntity *corev2.Entity + userWatcher <-chan *store.WatchEventUserConfig } // Config configures an Agentd. @@ -121,6 +122,7 @@ type Config struct { EtcdClientTLSConfig *tls.Config Watcher <-chan store.WatchEventEntityConfig BackendEntity *corev2.Entity + UserWatcher <-chan *store.WatchEventUserConfig } // Option is a functional option. @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) { etcdClientTLSConfig: c.EtcdClientTLSConfig, serveWaitTime: c.ServeWaitTime, backendEntity: c.BackendEntity, + userWatcher: c.UserWatcher, } // prepare server TLS config @@ -275,6 +278,7 @@ func (a *Agentd) Start() error { func (a *Agentd) runWatcher() { defer func() { logger.Warn("shutting down entity config watcher") + logger.Warn("shutting down user config watcher") }() for { select { @@ -287,10 +291,18 @@ func (a *Agentd) runWatcher() { if err := a.handleEvent(event); err != nil { logger.WithError(err).Error("error handling entity config watch event") } + case userEvent, ok := <-a.userWatcher: + if !ok { + return + } + if err := a.handleUserEvent(userEvent); err != nil { + logger.WithError(err).Error("error handling user config watch event") + } } } } +// adding the config updates to the etcd bus for watcher to consume func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { if event.Entity == nil { return errors.New("nil entity received from entity config watcher") @@ -308,6 +320,24 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { return nil } +// adding the UserConfig updates to the etcd bus for the watcher to consume +func (a *Agentd) handleUserEvent(event *store.WatchEventUserConfig) error { + if event.User == nil { + return errors.New("nil entry received from the user config watcher") + } + + topic := messaging.UserConfigTopic(event.User.Username) + if err := a.bus.Publish(topic, event); err != nil { + logger.WithField("topic", topic).WithError(err). + Error("unable to publish a user config update to the bus") + return err + } + + logger.WithField("topic", topic). + Debug("successfully published an user config update to the bus") + return nil +} + // Stop Agentd. func (a *Agentd) Stop() error { a.cancel() diff --git a/backend/agentd/agentd_test.go b/backend/agentd/agentd_test.go index 27a7549779..c02c1874db 100644 --- a/backend/agentd/agentd_test.go +++ b/backend/agentd/agentd_test.go @@ -6,11 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" "github.com/sensu/sensu-go/backend/apid/middlewares" @@ -22,6 +17,10 @@ import ( "github.com/sensu/sensu-go/transport" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" ) func TestAgentdMiddlewares(t *testing.T) { diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 034b801cfc..bd0c24524a 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -11,9 +11,9 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/metrics" "github.com/sensu/sensu-go/backend/ringv2" @@ -95,6 +95,8 @@ type Session struct { marshal agent.MarshalFunc unmarshal agent.UnmarshalFunc entityConfig *entityConfig + userConfig *userConfig + user string mu sync.Mutex subscriptionsMap map[string]subscription } @@ -111,12 +113,23 @@ type entityConfig struct { updatesChannel chan interface{} } +// userConfig is used by a session to subscribe to entity config updates +type userConfig struct { + subscription chan messaging.Subscription + updatesChannel chan interface{} +} + // Receiver returns the channel for incoming entity updates from the entity // watcher func (e *entityConfig) Receiver() chan<- interface{} { return e.updatesChannel } +// Receiver returns the channel for incoming entity updates from the entity watcher +func (u *userConfig) Receiver() chan<- interface{} { + return u.updatesChannel +} + func newSessionHandler(s *Session) *handler.MessageHandler { handler := handler.NewMessageHandler() handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive) @@ -149,6 +162,7 @@ type SessionConfig struct { // with the session has been buried. Necessary when running parallel keepalived // workers. BurialReceiver *BurialReceiver + userConfig *userConfig } type BurialReceiver struct { @@ -193,10 +207,15 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { ringPool: cfg.RingPool, unmarshal: cfg.Unmarshal, marshal: cfg.Marshal, + user: cfg.User, entityConfig: &entityConfig{ subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), }, + userConfig: &userConfig{ + subscription: make(chan messaging.Subscription, 1), + updatesChannel: make(chan interface{}, 10), + }, } // Optionally subscribe to burial notifications @@ -211,6 +230,13 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { }() } + if len(cfg.User) > 0 { + _, err := s.bus.Subscribe(messaging.UserConfigTopic(cfg.User), cfg.AgentName, s.userConfig) + if err != nil { + return nil, err + } + } + if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil { return nil, err } @@ -321,6 +347,44 @@ func (s *Session) sender() { for { var msg *transport.Message select { + //---- user -----// + + case u := <-s.userConfig.updatesChannel: + watchEvent, ok := u.(*store.WatchEventUserConfig) + if !ok { + logger.Errorf("session received unexoected user struct : %T", u) + continue + } + // Handle the delete/disable event + switch watchEvent.Action { + case store.WatchUpdate: + if watchEvent.Disabled { + logger.Warn("The user associated with the agent is now disabled") + return + } + logger.Println("The update operation has been performed on user") + default: + panic("unhandled default case") + } + + if watchEvent.User == nil { + logger.Error("session received nil user in watch event") + } + // + lagger := logger.WithFields(logrus.Fields{ + "action": watchEvent.Action.String(), + "user": watchEvent.User.Username, + "namespace": watchEvent.User.GetMetadata().GetNamespace(), + }) + lagger.Debug("user update received") + + bytes, err := s.marshal(watchEvent.User) + if err != nil { + lagger.WithError(err).Error("session failed to serialize user config") + } + msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + + // ---- entity ----// case e := <-s.entityConfig.updatesChannel: watchEvent, ok := e.(*store.WatchEventEntityConfig) if !ok { @@ -448,7 +512,9 @@ func (s *Session) sender() { // 2. Start receiver // 3. Start goroutine that waits for context cancellation, and shuts down service. func (s *Session) Start() (err error) { + defer close(s.userConfig.subscription) defer close(s.entityConfig.subscriptions) + sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} s.wg.Add(2) @@ -471,22 +537,45 @@ func (s *Session) Start() (err error) { "agent": s.cfg.AgentName, "namespace": s.cfg.Namespace, }) - - // Subscribe the agent to its entity_config topic - topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) - lager.WithField("topic", topic).Debug("subscribing to topic") // Get a unique name for the agent, which will be used as the consumer of the // bus, in order to avoid problems with an agent reconnecting before its // session is ended agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName) + + // Subscribe the agent to its user_config topic + + userTopic := messaging.UserConfigTopic(s.cfg.User) + logger.WithField("topic", userTopic).Debug("subscribing to topic") + userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig) + if usrErr != nil { + lager.WithError(err).Error("error starting subscription") + return err + } + s.userConfig.subscription <- userSubscription + + // Send back this user config to the agent so it uses that rather than it's local config + watchEvent := &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &corev2.User{}, + } + usrErr = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent) + if usrErr != nil { + lager.WithError(err).Error("error publishing user config") + return err + } + + // Subscribe the agent to its entity_config topic + + topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) + lager.WithField("topic", topic).Debug("subscribing to topic") + + // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) if err != nil { lager.WithError(err).Error("error starting subscription") return err } s.entityConfig.subscriptions <- subscription - - // Determine if the entity already exists req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName()) wrapper, err := s.storev2.Get(req) if err != nil { @@ -577,6 +666,7 @@ func (s *Session) stop() { logger.WithError(err).Error("error closing session") } }() + defer close(s.userConfig.updatesChannel) defer close(s.entityConfig.updatesChannel) defer close(s.checkChannel) @@ -602,6 +692,13 @@ func (s *Session) stop() { } } + // Remove the user config subscription + for sub := range s.userConfig.subscription { + if err := sub.Cancel(); err != nil { + logger.WithError(err).Error("unable to unsubscribe from message bus") + } + } + // Unsubscribe the session from every configured check subscriptions s.unsubscribe(s.cfg.Subscriptions) } diff --git a/backend/agentd/session_test.go b/backend/agentd/session_test.go index 35d18b3acf..2c9d6f4fb9 100644 --- a/backend/agentd/session_test.go +++ b/backend/agentd/session_test.go @@ -1,18 +1,20 @@ package agentd import ( + "bytes" "context" "errors" "fmt" + "github.com/sirupsen/logrus" "reflect" "sync" "testing" "time" "github.com/gogo/protobuf/proto" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/store" storev2 "github.com/sensu/sensu-go/backend/store/v2" @@ -80,6 +82,117 @@ func TestSession_sender(t *testing.T) { type connFunc func(*mocktransport.MockTransport, *sync.WaitGroup) type storeFunc func(*storetest.Store, *sync.WaitGroup) + userTests := []struct { + name string + connFunc connFunc + watchEvent *store.WatchEventUserConfig + expectedLog string + expectedError bool + busFunc busFunc + storeFunc storeFunc + subscriptions []string + }{ + { + name: "valid watchEvent received", + watchEvent: &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &corev2.User{ + Username: "testUser", + }, + }, + expectedLog: "user update received", + }, + { + name: "watchEvent with nil user", + watchEvent: &store.WatchEventUserConfig{ + Action: store.WatchCreate, + User: nil, + }, + expectedLog: "session received nil user in watch event", + expectedError: true, + }, + } + + for _, tt := range userTests { + t.Run(tt.name, func(t *testing.T) { + wg := &sync.WaitGroup{} + + // Mock our transport + conn := new(mocktransport.MockTransport) + if tt.connFunc != nil { + tt.connFunc(conn, wg) + } + + // Mock our store + st := &mockstore.MockStore{} + storev2 := &storetest.Store{} + if tt.storeFunc != nil { + tt.storeFunc(storev2, wg) + } + + // Mock our bus + bus, err := messaging.NewWizardBus(messaging.WizardBusConfig{}) + if err != nil { + t.Fatal(err) + } + if err := bus.Start(); err != nil { + t.Fatal(err) + } + + // Mocking logger + var logBuffer bytes.Buffer + logger := logrus.New() + logger.SetOutput(&logBuffer) + + s := SessionConfig{ + AgentName: "testing", + Namespace: "default", + Conn: conn, + Bus: bus, + Store: st, + Storev2: storev2, + Unmarshal: agent.UnmarshalJSON, + Marshal: agent.MarshalJSON, + userConfig: &userConfig{ + updatesChannel: make(chan interface{}), + }, + } + + session, err := NewSession(context.Background(), s) + if err != nil { + t.Fatal(err) + } + session.wg = &sync.WaitGroup{} + session.wg.Add(1) + + userTopic := messaging.UserConfigTopic(session.cfg.AgentName) + _, err = session.bus.Subscribe(userTopic, session.cfg.AgentName, session.userConfig) + if err != nil { + t.Fatal(err) + } + + go session.sender() + // Send our watch events over the wizard bus + if tt.busFunc != nil { + tt.busFunc(bus, wg) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-session.ctx.Done(): + case <-done: + session.Stop() + case <-time.After(5 * time.Second): + t.Fatal("session never stopped, we probably never received an user update over the channel") + } + }) + } + tests := []struct { name string busFunc busFunc @@ -299,6 +412,12 @@ func TestSession_sender(t *testing.T) { if err != nil { t.Fatal(err) } + // + //userTopic := messaging.UserConfigTopic(session.cfg.AgentName) + //_, err = session.bus.Subscribe(userTopic, session.cfg.AgentName, session.userConfig) + //if err != nil { + // t.Fatal(err) + //} go session.sender() @@ -318,7 +437,7 @@ func TestSession_sender(t *testing.T) { case <-done: session.Stop() case <-time.After(5 * time.Second): - t.Fatal("session never stopped, we probably never received an entity update over the channel") + t.Fatal("session never stopped, we probably never received an entity/user update over the channel") } }) } diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index dc38220a74..c3c52185ab 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -3,7 +3,6 @@ package agentd import ( "context" "errors" - "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" @@ -70,3 +69,44 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan return ch } + +// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 +// over the bus as store.WatchEventUserConfig +func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan *store.WatchEventUserConfig { + + key := etcdstorev2.StoreKey(storev2.ResourceRequest{ + Context: ctx, + StoreName: new(corev2.User).StoreName(), + }) + w := etcdstore.Watch(ctx, client, key, true) + ch := make(chan *store.WatchEventUserConfig, 1) + + go func() { + defer close(ch) + for response := range w.Result() { + logger.Info("read from user config watcher channel") + if response.Type == store.WatchError { + logger. + WithError(errors.New(string(response.Object))). + Error("Unexpected error while watching for the user config updates") + ch <- &store.WatchEventUserConfig{ + Action: response.Type, + } + continue + } + + // unmarshal the user config + var userConfig corev2.User + if err := proto.Unmarshal(response.Object, &userConfig); err != nil { + continue + } + + ch <- &store.WatchEventUserConfig{ + User: &userConfig, + Action: response.Type, + Disabled: userConfig.Disabled, + } + } + }() + return ch +} diff --git a/backend/agentd/watcher_test.go b/backend/agentd/watcher_test.go index afcf80bc60..980a2ca8f7 100644 --- a/backend/agentd/watcher_test.go +++ b/backend/agentd/watcher_test.go @@ -17,3 +17,13 @@ func TestGetEntityConfigWatcher(t *testing.T) { ch := GetEntityConfigWatcher(context.Background(), client) assert.NotNil(t, ch) } + +func TestGetUserConfigWatcher(t *testing.T) { + e, cleanup := etcd.NewTestEtcd(t) + defer cleanup() + client := e.NewEmbeddedClient() + defer client.Close() + + ch := GetUserConfigWatcher(context.Background(), client) + assert.NotNil(t, ch) +} diff --git a/backend/backend.go b/backend/backend.go index ab2ae5e95c..6785011fd8 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -521,6 +521,9 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { // Start the entity config watcher, so agentd sessions are notified of updates entityConfigWatcher := agentd.GetEntityConfigWatcher(b.ctx, b.Client) + // Start the user config watcher, so agentd sessions are notified of updates + userConfigWatcher := agentd.GetUserConfigWatcher(b.ctx, b.Client) + // Prepare the etcd client TLS config etcdClientTLSInfo := (transport.TLSInfo)(config.EtcdClientTLSInfo) etcdClientTLSConfig, err := etcdClientTLSInfo.ClientConfig() @@ -661,6 +664,7 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { Watcher: entityConfigWatcher, EtcdClientTLSConfig: b.EtcdClientTLSConfig, BackendEntity: backendEntity, + UserWatcher: userConfigWatcher, }) if err != nil { return nil, fmt.Errorf("error initializing %s: %s", agent.Name(), err) diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 07823b25d2..9f8095326d 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -14,6 +14,8 @@ const ( // to agents TopicEntityConfig = "sensu:entity-config" + TopicUserConfig = "sensu:user-config" + // TopicEvent is the topic for events that have been written to Etcd and // normalized by eventd. TopicEvent = "sensu:event" @@ -104,6 +106,10 @@ func EntityConfigTopic(namespace, name string) string { return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name) } +func UserConfigTopic(name string) string { + return fmt.Sprintf("%s:%s", TopicUserConfig, name) +} + // SubscriptionTopic is a helper to determine the proper topic name for a // subscription based on the namespace func SubscriptionTopic(namespace, sub string) string { diff --git a/backend/store/store.go b/backend/store/store.go index 8953242c89..3ce4e225d1 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -156,6 +156,14 @@ type WatchEventEntityConfig struct { Action WatchActionType } +// WatchEventUserConfig contains and updated entity config and the action that +// occurred during this modification +type WatchEventUserConfig struct { + User *corev2.User + Action WatchActionType + Disabled bool +} + // Store is used to abstract the durable storage used by the Sensu backend // processses. Each Sensu resources is represented by its own interface. A // MockStore is available in order to mock a store implementation diff --git a/transport/transport.go b/transport/transport.go index a99b1c8c60..ea67356c84 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -27,6 +27,9 @@ const ( // MessageTypeEntityConfig is the message type sent for entity config updates MessageTypeEntityConfig = "entity_config" + // MessageTypeUserConfig is the message type sent for entity config updates + MessageTypeUserConfig = "user_config" + // HeaderKeyAgentName is the HTTP request header specifying the Agent name HeaderKeyAgentName = "Sensu-AgentName"