diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 54f26eda2d..559ebc541b 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -17,8 +17,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/apid/actions" "github.com/sensu/sensu-go/backend/apid/middlewares" "github.com/sensu/sensu-go/backend/apid/routers" @@ -105,6 +105,7 @@ type Agentd struct { serveWaitTime time.Duration ready func() backendEntity *corev2.Entity + userWatcher <-chan store.WatchEventUserConfig } // Config configures an Agentd. @@ -121,6 +122,7 @@ type Config struct { EtcdClientTLSConfig *tls.Config Watcher <-chan store.WatchEventEntityConfig BackendEntity *corev2.Entity + UserWatcher <-chan store.WatchEventUserConfig } // Option is a functional option. @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) { etcdClientTLSConfig: c.EtcdClientTLSConfig, serveWaitTime: c.ServeWaitTime, backendEntity: c.BackendEntity, + userWatcher: c.UserWatcher, } // prepare server TLS config @@ -291,6 +294,7 @@ func (a *Agentd) runWatcher() { } } +// adding the config updates to the etcd bus for watcher to consume func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { if event.Entity == nil { return errors.New("nil entity received from entity config watcher") @@ -308,6 +312,15 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { return nil } +// adding the UserConfig updates to the etcd bus for the watcher to consume +func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { + if event.User == nil { + return errors.New("nil entry received from the user config watcher") + } + + return nil +} + // Stop Agentd. func (a *Agentd) Stop() error { a.cancel() diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 034b801cfc..c2f9c51059 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -11,9 +11,9 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/metrics" "github.com/sensu/sensu-go/backend/ringv2" @@ -95,6 +95,7 @@ type Session struct { marshal agent.MarshalFunc unmarshal agent.UnmarshalFunc entityConfig *entityConfig + userConfig *userConfig mu sync.Mutex subscriptionsMap map[string]subscription } @@ -111,12 +112,23 @@ type entityConfig struct { updatesChannel chan interface{} } +// userConfig is used by a session to subscribe to entity config updates +type userConfig struct { + subscription chan messaging.Subscription + updatesChannel chan interface{} +} + // Receiver returns the channel for incoming entity updates from the entity // watcher func (e *entityConfig) Receiver() chan<- interface{} { return e.updatesChannel } +// Receiver returns the channel for incoming entity updates from the entity watcher +func (u *userConfig) Receiver() chan<- interface{} { + return u.updatesChannel +} + func newSessionHandler(s *Session) *handler.MessageHandler { handler := handler.NewMessageHandler() handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive) @@ -321,6 +333,52 @@ func (s *Session) sender() { for { var msg *transport.Message select { + //sudhanshu#2608 ---- user ----- + case u := <-s.userConfig.updatesChannel: + var usr *corev2.User + watchEvent, ok := u.(*store.WatchEventUserConfig) + if !ok { + logger.Errorf("Session received unexpected struct: %T", u) + continue + } + + // Handle the delete and unknown watch events + switch watchEvent.Action { + case store.WatchDelete: + //stop session + return + case store.WatchUnknown: + logger.Error("session received unknown watch event") + continue + } + + if watchEvent.User == nil { + logger.Error("session received nil user in watch event") + continue + } + + lager := logger.WithFields(logrus.Fields{ + "action": watchEvent.Action.String(), + "user": watchEvent.User.GetMetadata().Name, + "namespace": watchEvent.User.GetMetadata().Namespace, + }) + logger.Debug("User update received") + + bytes, err := s.marshal(watchEvent.User) + if err != nil { + lager.WithError(err).Error("session failed to serialize entity config") + continue + } + + // determine if user was disabled + if err := usr.Disabled; err { + lager.Debug("The user is now disabled ", err) + } + + msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + + // -----entity ------- + case e := <-s.entityConfig.updatesChannel: watchEvent, ok := e.(*store.WatchEventEntityConfig) if !ok { diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index dc38220a74..a7fe13ee46 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -70,3 +70,54 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan return ch } + +// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 +// over the bus as store.WatchEventUserConfig +func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { + key := etcdstorev2.StoreKey(storev2.ResourceRequest{ + Context: ctx, + StoreName: new(corev2.User).StoreName(), + }) + w := etcdstore.Watch(ctx, client, key, true) + ch := make(chan store.WatchEventUserConfig, 1) + + go func() { + defer close(ch) + for response := range w.Result() { + if response.Type == store.WatchError { + logger. + WithError(errors.New(string(response.Object))). + Error("Unexpected error while watching for the user config updates") + ch <- store.WatchEventUserConfig{ + Action: response.Type, + } + continue + } + var ( + configWrapper wrap.Wrapper + userConfig corev2.User + ) + + // Decode and unwrap the entity config + + if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { + logger.WithField("key", response.Key).WithError(err). + Error("unable to unmarshal user config from key") + continue + } + + // Remove the managed_by label if the value is sensu-agent, in case the user is disabled + if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) + } + + ch <- store.WatchEventUserConfig{ + Action: response.Type, + User: &userConfig, + } + } + }() + + logger.Println("----watch metadata----", w) + return ch +} diff --git a/backend/backend.go b/backend/backend.go index ab2ae5e95c..6785011fd8 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -521,6 +521,9 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { // Start the entity config watcher, so agentd sessions are notified of updates entityConfigWatcher := agentd.GetEntityConfigWatcher(b.ctx, b.Client) + // Start the user config watcher, so agentd sessions are notified of updates + userConfigWatcher := agentd.GetUserConfigWatcher(b.ctx, b.Client) + // Prepare the etcd client TLS config etcdClientTLSInfo := (transport.TLSInfo)(config.EtcdClientTLSInfo) etcdClientTLSConfig, err := etcdClientTLSInfo.ClientConfig() @@ -661,6 +664,7 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { Watcher: entityConfigWatcher, EtcdClientTLSConfig: b.EtcdClientTLSConfig, BackendEntity: backendEntity, + UserWatcher: userConfigWatcher, }) if err != nil { return nil, fmt.Errorf("error initializing %s: %s", agent.Name(), err) diff --git a/backend/store/store.go b/backend/store/store.go index 8953242c89..b7e222c68f 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -156,6 +156,13 @@ type WatchEventEntityConfig struct { Action WatchActionType } +// WatchEventUserConfig contains and updated entity config and the action that +// occurred during this modification +type WatchEventUserConfig struct { + User *corev2.User + Action WatchActionType +} + // Store is used to abstract the durable storage used by the Sensu backend // processses. Each Sensu resources is represented by its own interface. A // MockStore is available in order to mock a store implementation diff --git a/transport/transport.go b/transport/transport.go index a99b1c8c60..ea67356c84 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -27,6 +27,9 @@ const ( // MessageTypeEntityConfig is the message type sent for entity config updates MessageTypeEntityConfig = "entity_config" + // MessageTypeUserConfig is the message type sent for entity config updates + MessageTypeUserConfig = "user_config" + // HeaderKeyAgentName is the HTTP request header specifying the Agent name HeaderKeyAgentName = "Sensu-AgentName"