From fd668c86f485531b712f33b6d0e2f5ffec3486db Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Sat, 10 Feb 2024 06:05:18 +0530 Subject: [PATCH 01/16] initial Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 15 +++++++++- backend/agentd/session.go | 60 ++++++++++++++++++++++++++++++++++++++- backend/agentd/watcher.go | 51 +++++++++++++++++++++++++++++++++ backend/backend.go | 4 +++ backend/store/store.go | 7 +++++ transport/transport.go | 3 ++ 6 files changed, 138 insertions(+), 2 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 54f26eda2d..559ebc541b 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -17,8 +17,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/apid/actions" "github.com/sensu/sensu-go/backend/apid/middlewares" "github.com/sensu/sensu-go/backend/apid/routers" @@ -105,6 +105,7 @@ type Agentd struct { serveWaitTime time.Duration ready func() backendEntity *corev2.Entity + userWatcher <-chan store.WatchEventUserConfig } // Config configures an Agentd. @@ -121,6 +122,7 @@ type Config struct { EtcdClientTLSConfig *tls.Config Watcher <-chan store.WatchEventEntityConfig BackendEntity *corev2.Entity + UserWatcher <-chan store.WatchEventUserConfig } // Option is a functional option. @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) { etcdClientTLSConfig: c.EtcdClientTLSConfig, serveWaitTime: c.ServeWaitTime, backendEntity: c.BackendEntity, + userWatcher: c.UserWatcher, } // prepare server TLS config @@ -291,6 +294,7 @@ func (a *Agentd) runWatcher() { } } +// adding the config updates to the etcd bus for watcher to consume func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { if event.Entity == nil { return errors.New("nil entity received from entity config watcher") @@ -308,6 +312,15 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { return nil } +// adding the UserConfig updates to the etcd bus for the watcher to consume +func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { + if event.User == nil { + return errors.New("nil entry received from the user config watcher") + } + + return nil +} + // Stop Agentd. func (a *Agentd) Stop() error { a.cancel() diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 034b801cfc..c2f9c51059 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -11,9 +11,9 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/metrics" "github.com/sensu/sensu-go/backend/ringv2" @@ -95,6 +95,7 @@ type Session struct { marshal agent.MarshalFunc unmarshal agent.UnmarshalFunc entityConfig *entityConfig + userConfig *userConfig mu sync.Mutex subscriptionsMap map[string]subscription } @@ -111,12 +112,23 @@ type entityConfig struct { updatesChannel chan interface{} } +// userConfig is used by a session to subscribe to entity config updates +type userConfig struct { + subscription chan messaging.Subscription + updatesChannel chan interface{} +} + // Receiver returns the channel for incoming entity updates from the entity // watcher func (e *entityConfig) Receiver() chan<- interface{} { return e.updatesChannel } +// Receiver returns the channel for incoming entity updates from the entity watcher +func (u *userConfig) Receiver() chan<- interface{} { + return u.updatesChannel +} + func newSessionHandler(s *Session) *handler.MessageHandler { handler := handler.NewMessageHandler() handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive) @@ -321,6 +333,52 @@ func (s *Session) sender() { for { var msg *transport.Message select { + //sudhanshu#2608 ---- user ----- + case u := <-s.userConfig.updatesChannel: + var usr *corev2.User + watchEvent, ok := u.(*store.WatchEventUserConfig) + if !ok { + logger.Errorf("Session received unexpected struct: %T", u) + continue + } + + // Handle the delete and unknown watch events + switch watchEvent.Action { + case store.WatchDelete: + //stop session + return + case store.WatchUnknown: + logger.Error("session received unknown watch event") + continue + } + + if watchEvent.User == nil { + logger.Error("session received nil user in watch event") + continue + } + + lager := logger.WithFields(logrus.Fields{ + "action": watchEvent.Action.String(), + "user": watchEvent.User.GetMetadata().Name, + "namespace": watchEvent.User.GetMetadata().Namespace, + }) + logger.Debug("User update received") + + bytes, err := s.marshal(watchEvent.User) + if err != nil { + lager.WithError(err).Error("session failed to serialize entity config") + continue + } + + // determine if user was disabled + if err := usr.Disabled; err { + lager.Debug("The user is now disabled ", err) + } + + msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + + // -----entity ------- + case e := <-s.entityConfig.updatesChannel: watchEvent, ok := e.(*store.WatchEventEntityConfig) if !ok { diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index dc38220a74..a7fe13ee46 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -70,3 +70,54 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan return ch } + +// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 +// over the bus as store.WatchEventUserConfig +func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { + key := etcdstorev2.StoreKey(storev2.ResourceRequest{ + Context: ctx, + StoreName: new(corev2.User).StoreName(), + }) + w := etcdstore.Watch(ctx, client, key, true) + ch := make(chan store.WatchEventUserConfig, 1) + + go func() { + defer close(ch) + for response := range w.Result() { + if response.Type == store.WatchError { + logger. + WithError(errors.New(string(response.Object))). + Error("Unexpected error while watching for the user config updates") + ch <- store.WatchEventUserConfig{ + Action: response.Type, + } + continue + } + var ( + configWrapper wrap.Wrapper + userConfig corev2.User + ) + + // Decode and unwrap the entity config + + if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { + logger.WithField("key", response.Key).WithError(err). + Error("unable to unmarshal user config from key") + continue + } + + // Remove the managed_by label if the value is sensu-agent, in case the user is disabled + if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) + } + + ch <- store.WatchEventUserConfig{ + Action: response.Type, + User: &userConfig, + } + } + }() + + logger.Println("----watch metadata----", w) + return ch +} diff --git a/backend/backend.go b/backend/backend.go index ab2ae5e95c..6785011fd8 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -521,6 +521,9 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { // Start the entity config watcher, so agentd sessions are notified of updates entityConfigWatcher := agentd.GetEntityConfigWatcher(b.ctx, b.Client) + // Start the user config watcher, so agentd sessions are notified of updates + userConfigWatcher := agentd.GetUserConfigWatcher(b.ctx, b.Client) + // Prepare the etcd client TLS config etcdClientTLSInfo := (transport.TLSInfo)(config.EtcdClientTLSInfo) etcdClientTLSConfig, err := etcdClientTLSInfo.ClientConfig() @@ -661,6 +664,7 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { Watcher: entityConfigWatcher, EtcdClientTLSConfig: b.EtcdClientTLSConfig, BackendEntity: backendEntity, + UserWatcher: userConfigWatcher, }) if err != nil { return nil, fmt.Errorf("error initializing %s: %s", agent.Name(), err) diff --git a/backend/store/store.go b/backend/store/store.go index 8953242c89..b7e222c68f 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -156,6 +156,13 @@ type WatchEventEntityConfig struct { Action WatchActionType } +// WatchEventUserConfig contains and updated entity config and the action that +// occurred during this modification +type WatchEventUserConfig struct { + User *corev2.User + Action WatchActionType +} + // Store is used to abstract the durable storage used by the Sensu backend // processses. Each Sensu resources is represented by its own interface. A // MockStore is available in order to mock a store implementation diff --git a/transport/transport.go b/transport/transport.go index a99b1c8c60..ea67356c84 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -27,6 +27,9 @@ const ( // MessageTypeEntityConfig is the message type sent for entity config updates MessageTypeEntityConfig = "entity_config" + // MessageTypeUserConfig is the message type sent for entity config updates + MessageTypeUserConfig = "user_config" + // HeaderKeyAgentName is the HTTP request header specifying the Agent name HeaderKeyAgentName = "Sensu-AgentName" From 99dbc438ddec2c2226aebbd820583e72c9df2548 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Tue, 20 Feb 2024 17:24:17 +0530 Subject: [PATCH 02/16] inital Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 7 +++++++ backend/agentd/session.go | 13 ++++++------- backend/agentd/watcher.go | 9 ++++++++- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 559ebc541b..e7d5ed7301 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -290,6 +290,13 @@ func (a *Agentd) runWatcher() { if err := a.handleEvent(event); err != nil { logger.WithError(err).Error("error handling entity config watch event") } + case userEvent, ok := <-a.userWatcher: + if !ok { + return + } + if err := a.handleUserEvent(userEvent); err != nil { + logger.WithError(err).Error("error handling user config watch event") + } } } } diff --git a/backend/agentd/session.go b/backend/agentd/session.go index c2f9c51059..ef72cc0cb0 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -364,18 +364,17 @@ func (s *Session) sender() { }) logger.Debug("User update received") - bytes, err := s.marshal(watchEvent.User) - if err != nil { - lager.WithError(err).Error("session failed to serialize entity config") - continue - } + //bytes, err := s.marshal(watchEvent.User) + //if err != nil { + // lager.WithError(err).Error("session failed to serialize entity config") + // continue + //} // determine if user was disabled if err := usr.Disabled; err { lager.Debug("The user is now disabled ", err) } - - msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) // -----entity ------- diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index a7fe13ee46..ac4418884c 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -3,10 +3,10 @@ package agentd import ( "context" "errors" - "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/store" etcdstore "github.com/sensu/sensu-go/backend/store/etcd" storev2 "github.com/sensu/sensu-go/backend/store/v2" @@ -74,6 +74,9 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + key := etcdstorev2.StoreKey(storev2.ResourceRequest{ Context: ctx, StoreName: new(corev2.User).StoreName(), @@ -111,6 +114,10 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) } + if userConfig.Disabled { + agent.GracefulShutdown(cancel) + } + ch <- store.WatchEventUserConfig{ Action: response.Type, User: &userConfig, From c42375daa47aa59c68b49ada044b6ff2b0001e5d Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 22 Feb 2024 15:28:32 +0530 Subject: [PATCH 03/16] change Signed-off-by: SudhanshuBawane --- backend/agentd/session.go | 2 +- backend/agentd/watcher.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index ef72cc0cb0..1dd795e461 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -333,7 +333,7 @@ func (s *Session) sender() { for { var msg *transport.Message select { - //sudhanshu#2608 ---- user ----- + //2608 ---- user ----- case u := <-s.userConfig.updatesChannel: var usr *corev2.User watchEvent, ok := u.(*store.WatchEventUserConfig) diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index ac4418884c..ae33b0b246 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -110,17 +110,17 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } // Remove the managed_by label if the value is sensu-agent, in case the user is disabled - if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) - } + //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) + //} if userConfig.Disabled { agent.GracefulShutdown(cancel) } ch <- store.WatchEventUserConfig{ - Action: response.Type, User: &userConfig, + Action: response.Type, } } }() From c091e1f846dc5dcee245139e97ffe62836b212c4 Mon Sep 17 00:00:00 2001 From: Francis Guimond Date: Tue, 27 Feb 2024 11:44:12 -0500 Subject: [PATCH 04/16] Sample code to use the wizard bus to tie the user watcher with the session shutdown Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 1 + backend/agentd/session.go | 65 ++++++++++++++++----------------------- backend/agentd/watcher.go | 2 +- 3 files changed, 29 insertions(+), 39 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index e7d5ed7301..0af79565c9 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -324,6 +324,7 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { if event.User == nil { return errors.New("nil entry received from the user config watcher") } + a.bus.Publish("userChanges", event.User.Username) return nil } diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 1dd795e461..3f96bb3361 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -96,8 +96,10 @@ type Session struct { unmarshal agent.UnmarshalFunc entityConfig *entityConfig userConfig *userConfig + user string mu sync.Mutex subscriptionsMap map[string]subscription + userReceiver *UserReceiver } // subscription is used to abstract a message.Subscription and therefore allow @@ -177,6 +179,20 @@ func (b *BurialReceiver) Receiver() chan<- interface{} { return b.ch } +type UserReceiver struct { + ch chan interface{} +} + +func NewUserReceiver() *UserReceiver { + return &UserReceiver{ + ch: make(chan interface{}, 1), + } +} + +func (b *UserReceiver) Receiver() chan<- interface{} { + return b.ch +} + // NewSession creates a new Session object given the triple of a transport // connection, message bus, and store. // The Session is responsible for stopping itself, and does so when it @@ -205,6 +221,8 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { ringPool: cfg.RingPool, unmarshal: cfg.Unmarshal, marshal: cfg.Marshal, + user: cfg.User, + userReceiver: NewUserReceiver(), entityConfig: &entityConfig{ subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), @@ -223,6 +241,11 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { }() } + _, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userReceiver) + if err != nil { + return nil, err + } + if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil { return nil, err } @@ -334,48 +357,14 @@ func (s *Session) sender() { var msg *transport.Message select { //2608 ---- user ----- - case u := <-s.userConfig.updatesChannel: - var usr *corev2.User - watchEvent, ok := u.(*store.WatchEventUserConfig) + case u := <-s.userReceiver.ch: + user, ok := u.(corev2.User) if !ok { - logger.Errorf("Session received unexpected struct: %T", u) - continue - } - - // Handle the delete and unknown watch events - switch watchEvent.Action { - case store.WatchDelete: - //stop session - return - case store.WatchUnknown: - logger.Error("session received unknown watch event") - continue - } - if watchEvent.User == nil { - logger.Error("session received nil user in watch event") - continue } - - lager := logger.WithFields(logrus.Fields{ - "action": watchEvent.Action.String(), - "user": watchEvent.User.GetMetadata().Name, - "namespace": watchEvent.User.GetMetadata().Namespace, - }) - logger.Debug("User update received") - - //bytes, err := s.marshal(watchEvent.User) - //if err != nil { - // lager.WithError(err).Error("session failed to serialize entity config") - // continue - //} - - // determine if user was disabled - if err := usr.Disabled; err { - lager.Debug("The user is now disabled ", err) + if user.Disabled && user.Username == s.user { + return } - //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) - // -----entity ------- case e := <-s.entityConfig.updatesChannel: diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index ae33b0b246..ee2e413307 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -81,7 +81,7 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s Context: ctx, StoreName: new(corev2.User).StoreName(), }) - w := etcdstore.Watch(ctx, client, key, true) + w := etcdstore.Watch(ctx, client, key, true, clientv3.WithFilterPut()) ch := make(chan store.WatchEventUserConfig, 1) go func() { From 592ec7319befe26060131de3fb12d8cf0a5d2058 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 29 Feb 2024 20:58:45 +0530 Subject: [PATCH 05/16] WIP Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 9 +- backend/agentd/session.go | 140 +++++++++++++++++++++++++++++-- backend/agentd/watcher.go | 9 +- backend/messaging/message_bus.go | 6 ++ backend/store/store.go | 7 +- 5 files changed, 153 insertions(+), 18 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 0af79565c9..beb7e84b6b 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -324,8 +324,13 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { if event.User == nil { return errors.New("nil entry received from the user config watcher") } - a.bus.Publish("userChanges", event.User.Username) - + topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.GetMetadata().Name) + if err := a.bus.Publish(topic, &event); err != nil { + return err + } + //a.bus.Publish("userChanges", event.User.Username) + logger.WithField("topic", topic). + Debug("successfully published an user config update to the bus") return nil } diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 3f96bb3361..aea91dbc77 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -25,6 +25,8 @@ import ( ) const ( + UserNotFound = "not found" + deletedEventSentinel = -1 // Time to wait before force close on connection. @@ -227,6 +229,10 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), }, + userConfig: &userConfig{ + subscription: make(chan messaging.Subscription, 1), + updatesChannel: make(chan interface{}, 10), + }, } // Optionally subscribe to burial notifications @@ -356,17 +362,65 @@ func (s *Session) sender() { for { var msg *transport.Message select { - //2608 ---- user ----- + //---- user -----// case u := <-s.userReceiver.ch: - user, ok := u.(corev2.User) + user, ok := u.(*corev2.User) if !ok { - + logger.WithField("key", ok) } + if user.Disabled && user.Username == s.user { return } - // -----entity ------- + //case u := <-s.userConfig.updatesChannel: + // watchEvent, ok := u.(*store.WatchEventUserConfig) + // fmt.Println("========== usrConfig Updates ========", watchEvent) + // if !ok { + // logger.Errorf("session received unexoected struct : %T", u) + // continue + // } + // + // if watchEvent.User.Disabled && watchEvent.User.Username == s.user { + // return + // } + // //fmt.Println("========== usrConfig Updates ========", watchEvent) + ////// Handle the delete/disable event + ////switch watchEvent.Action { + ////case store.WatchDelete: + //// return + ////} + // + //if watchEvent.User == nil { + // logger.Error("session received nil user in watch event") + //} + //// + //lagger := logger.WithFields(logrus.Fields{ + // "action": watchEvent.Action.String(), + // "user": watchEvent.User.GetMetadata().Name, + // "namespace": watchEvent.User.GetMetadata().Namespace, + //}) + //lagger.Debug("user update received") + // + //configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) + //wrapper, err := storev2.WrapResource(watchEvent.User) + //if err != nil { + // lagger.WithError(err).Error("could not warp the user config") + // continue + //} + // + //if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { + // sessionErrorCounter.WithLabelValues(err.Error()).Inc() + // lagger.WithError(err).Error("could not update the user config") + //} + + //bytes, err := s.marshal(watchEvent.User) + //if err != nil { + // lagger.WithError(err).Error("session failed to serialize user config") + //} + //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + + // ---- entity ----// case e := <-s.entityConfig.updatesChannel: watchEvent, ok := e.(*store.WatchEventEntityConfig) if !ok { @@ -495,6 +549,8 @@ func (s *Session) sender() { // 3. Start goroutine that waits for context cancellation, and shuts down service. func (s *Session) Start() (err error) { defer close(s.entityConfig.subscriptions) + defer close(s.userConfig.subscription) + sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} s.wg.Add(2) @@ -518,21 +574,84 @@ func (s *Session) Start() (err error) { "namespace": s.cfg.Namespace, }) - // Subscribe the agent to its entity_config topic + // Subscribe the agent to its entity_config and user_config topic topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) + userTopic := messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName) lager.WithField("topic", topic).Debug("subscribing to topic") + logger.WithField("topic", userTopic).Debug("subscribing to topic") // Get a unique name for the agent, which will be used as the consumer of the // bus, in order to avoid problems with an agent reconnecting before its // session is ended agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName) + + // Determine if user already exits + userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig) + if usrErr != nil { + lager.WithError(err).Error("error starting subscription") + return err + } + s.userConfig.subscription <- userSubscription + usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName()) + usrWrapper, err := s.storev2.Get(usrReq) + if err != nil { + // Just exit but don't send error about absence of user config + var errNotFound *store.ErrNotFound + if !errors.As(err, &errNotFound) { + lager.WithError(err).Error("error querying the user config") + return err + } + lager.Debug("no user config found") + + // Indicate to the agent that this user does not exist + meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) + watchEvent := &store.WatchEventUserConfig{ + User: &corev2.User{ + Username: s.user, + }, + Action: store.WatchCreate, + Metadata: &meta, + } + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + if err != nil { + lager.WithError(err).Error("error publishing user config") + return err + } + } else { + // A user config already exists, therefore we should the stored user subscriptions + // rather than what the agent provided us for the subscriptions + lager.Debug("an user config was found") + + var storedUserConfig corev2.User + err = usrWrapper.UnwrapInto(&storedUserConfig) + if err != nil { + lager.WithError(err).Error("error unwrapping user config") + return err + } + + // Remove the managed_by label if the value is sensu-agent, in case of disabled user + if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) + } + + // Send back this user config to the agent so it uses that rather than it's local config + watchEvent := &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &storedUserConfig, + } + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + if err != nil { + lager.WithError(err).Error("error publishing user config") + return err + } + } + + // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) if err != nil { lager.WithError(err).Error("error starting subscription") return err } s.entityConfig.subscriptions <- subscription - - // Determine if the entity already exists req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName()) wrapper, err := s.storev2.Get(req) if err != nil { @@ -624,6 +743,7 @@ func (s *Session) stop() { } }() defer close(s.entityConfig.updatesChannel) + defer close(s.userConfig.updatesChannel) defer close(s.checkChannel) sessionCounter.WithLabelValues(s.cfg.Namespace).Dec() @@ -648,6 +768,12 @@ func (s *Session) stop() { } } + for sub := range s.userConfig.subscription { + if err := sub.Cancel(); err != nil { + logger.WithError(err).Error("unable to unsubscribe from message bus") + } + } + // Unsubscribe the session from every configured check subscriptions s.unsubscribe(s.cfg.Subscriptions) } diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index ee2e413307..68b703b02e 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -6,7 +6,6 @@ import ( "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" - "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/store" etcdstore "github.com/sensu/sensu-go/backend/store/etcd" storev2 "github.com/sensu/sensu-go/backend/store/v2" @@ -110,12 +109,8 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } // Remove the managed_by label if the value is sensu-agent, in case the user is disabled - //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) - //} - - if userConfig.Disabled { - agent.GracefulShutdown(cancel) + if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) } ch <- store.WatchEventUserConfig{ diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 07823b25d2..5e42ff99ec 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -14,6 +14,8 @@ const ( // to agents TopicEntityConfig = "sensu:entity-config" + TopicUserConfig = "sensu:user-config" + // TopicEvent is the topic for events that have been written to Etcd and // normalized by eventd. TopicEvent = "sensu:event" @@ -104,6 +106,10 @@ func EntityConfigTopic(namespace, name string) string { return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name) } +func UserConfigTopic(namespace, name string) string { + return fmt.Sprintf("%s:%s:%s", TopicUserConfig, namespace, name) +} + // SubscriptionTopic is a helper to determine the proper topic name for a // subscription based on the namespace func SubscriptionTopic(namespace, sub string) string { diff --git a/backend/store/store.go b/backend/store/store.go index b7e222c68f..fb01ae130b 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -6,6 +6,7 @@ import ( "fmt" corev2 "github.com/sensu/core/v2" + v2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" "github.com/sensu/sensu-go/backend/store/patch" "github.com/sensu/sensu-go/types" @@ -159,8 +160,10 @@ type WatchEventEntityConfig struct { // WatchEventUserConfig contains and updated entity config and the action that // occurred during this modification type WatchEventUserConfig struct { - User *corev2.User - Action WatchActionType + User *corev2.User + Action WatchActionType + Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` + //Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` } // Store is used to abstract the durable storage used by the Sensu backend From 016b699b2574e67b5a7dfdd7e2475f6e21309bdd Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Wed, 6 Mar 2024 03:53:17 +0530 Subject: [PATCH 06/16] unmarshal issue Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 7 ++- backend/agentd/session.go | 99 +++++++++++++++++++--------------- backend/agentd/watcher.go | 31 +++++++---- backend/agentd/watcher_test.go | 10 ++++ backend/store/store.go | 9 ++-- 5 files changed, 95 insertions(+), 61 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index beb7e84b6b..4b2abc0736 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -278,6 +278,7 @@ func (a *Agentd) Start() error { func (a *Agentd) runWatcher() { defer func() { logger.Warn("shutting down entity config watcher") + logger.Warn("shutting down user config watcher") }() for { select { @@ -324,8 +325,10 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { if event.User == nil { return errors.New("nil entry received from the user config watcher") } - topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.GetMetadata().Name) - if err := a.bus.Publish(topic, &event); err != nil { + topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.Username) + if err := a.bus.Publish(topic, event.User.Username); err != nil { + logger.WithField("topic", topic).WithError(err). + Error("unable to publish a user config update to the bus") return err } //a.bus.Publish("userChanges", event.User.Username) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index aea91dbc77..25e925bcd1 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -366,59 +366,72 @@ func (s *Session) sender() { case u := <-s.userReceiver.ch: user, ok := u.(*corev2.User) if !ok { - logger.WithField("key", ok) + logger.WithField("unexpected user struct", ok) + } + + configUser, err := s.marshal(user) + if err != nil { + logger.WithError(err).Error("session failed to serialize the user config") } if user.Disabled && user.Username == s.user { return } + msg = transport.NewMessage(user.Username, configUser) + //case u := <-s.userConfig.updatesChannel: // watchEvent, ok := u.(*store.WatchEventUserConfig) // fmt.Println("========== usrConfig Updates ========", watchEvent) // if !ok { - // logger.Errorf("session received unexoected struct : %T", u) + // logger.Errorf("session received unexoected user struct : %T", u) // continue // } - // - // if watchEvent.User.Disabled && watchEvent.User.Username == s.user { + // fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown) + // //if watchEvent.User.Disabled && watchEvent.User.Username == s.user { + // // return + // //} + // fmt.Println("========== usrConfig Updates ========", watchEvent) + // //// Handle the delete/disable event + // switch watchEvent.Action { + // case store.WatchDelete: + // fmt.Println(" ======= delete =======", store.WatchDelete) // return + // case store.WatchCreate: + // fmt.Println("======= create user ====", store.WatchCreate) + // case store.WatchUpdate: + // fmt.Println("==== user update ======", store.WatchUpdate) + // // } - // //fmt.Println("========== usrConfig Updates ========", watchEvent) - ////// Handle the delete/disable event - ////switch watchEvent.Action { - ////case store.WatchDelete: - //// return - ////} // - //if watchEvent.User == nil { - // logger.Error("session received nil user in watch event") - //} - //// - //lagger := logger.WithFields(logrus.Fields{ - // "action": watchEvent.Action.String(), - // "user": watchEvent.User.GetMetadata().Name, - // "namespace": watchEvent.User.GetMetadata().Namespace, - //}) - //lagger.Debug("user update received") + // if watchEvent.User == nil { + // logger.Error("session received nil user in watch event") + // } + // // + // lagger := logger.WithFields(logrus.Fields{ + // "action": watchEvent.Action.String(), + // "user": watchEvent.User.Username, + // "namespace": watchEvent.User.GetMetadata().Namespace, + // }) + // lagger.Debug("user update received") // - //configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) - //wrapper, err := storev2.WrapResource(watchEvent.User) - //if err != nil { - // lagger.WithError(err).Error("could not warp the user config") - // continue - //} + // configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) + // wrapper, err := storev2.WrapResource(watchEvent.User) + // if err != nil { + // lagger.WithError(err).Error("could not warp the user config") + // continue + // } // - //if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { - // sessionErrorCounter.WithLabelValues(err.Error()).Inc() - // lagger.WithError(err).Error("could not update the user config") - //} - - //bytes, err := s.marshal(watchEvent.User) - //if err != nil { - // lagger.WithError(err).Error("session failed to serialize user config") - //} - //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + // if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { + // sessionErrorCounter.WithLabelValues(err.Error()).Inc() + // lagger.WithError(err).Error("could not update the user config") + // } + // + // bytes, err := s.marshal(watchEvent.User) + // if err != nil { + // lagger.WithError(err).Error("session failed to serialize user config") + // } + // msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) // ---- entity ----// case e := <-s.entityConfig.updatesChannel: @@ -548,8 +561,8 @@ func (s *Session) sender() { // 2. Start receiver // 3. Start goroutine that waits for context cancellation, and shuts down service. func (s *Session) Start() (err error) { - defer close(s.entityConfig.subscriptions) defer close(s.userConfig.subscription) + defer close(s.entityConfig.subscriptions) sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} @@ -603,13 +616,11 @@ func (s *Session) Start() (err error) { lager.Debug("no user config found") // Indicate to the agent that this user does not exist - meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) + //meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) watchEvent := &store.WatchEventUserConfig{ - User: &corev2.User{ - Username: s.user, - }, - Action: store.WatchCreate, - Metadata: &meta, + User: &corev2.User{}, + Action: store.WatchCreate, + //Metadata: &meta, } err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) if err != nil { @@ -742,8 +753,8 @@ func (s *Session) stop() { logger.WithError(err).Error("error closing session") } }() - defer close(s.entityConfig.updatesChannel) defer close(s.userConfig.updatesChannel) + defer close(s.entityConfig.updatesChannel) defer close(s.checkChannel) sessionCounter.WithLabelValues(s.cfg.Namespace).Dec() diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index 68b703b02e..7e93ffc79f 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -3,6 +3,7 @@ package agentd import ( "context" "errors" + "fmt" "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" @@ -73,19 +74,21 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + //ctx, cancel := context.WithCancel(context.Background()) + //defer cancel() key := etcdstorev2.StoreKey(storev2.ResourceRequest{ Context: ctx, StoreName: new(corev2.User).StoreName(), }) - w := etcdstore.Watch(ctx, client, key, true, clientv3.WithFilterPut()) + w := etcdstore.Watch(ctx, client, key, true) + fmt.Sprintf("======= user metadata ========= w : %v, ctx : %v, client: %v, key: %v", w, ctx, client, key) ch := make(chan store.WatchEventUserConfig, 1) go func() { defer close(ch) for response := range w.Result() { + fmt.Println("======= user config response ========", response) if response.Type == store.WatchError { logger. WithError(errors.New(string(response.Object))). @@ -100,18 +103,26 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s userConfig corev2.User ) - // Decode and unwrap the entity config + // Decode and unwrap the user config - if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { + //if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { + // fmt.Println("====== unmarshaled user =======", configWrapper) + // logger.WithField("key", response.Key).WithError(err). + // Error("unable to unmarshal user config from key") + // continue + //} + + if err := configWrapper.UnwrapInto(&userConfig); err != nil { + fmt.Println("====== unmarshaled user =======", userConfig) logger.WithField("key", response.Key).WithError(err). - Error("unable to unmarshal user config from key") + Error("unable to unwrap entity config from key") continue } // Remove the managed_by label if the value is sensu-agent, in case the user is disabled - if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) - } + //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) + //} ch <- store.WatchEventUserConfig{ User: &userConfig, @@ -120,6 +131,6 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } }() - logger.Println("----watch metadata----", w) + logger.Println("========= watch metadata ========", w) return ch } diff --git a/backend/agentd/watcher_test.go b/backend/agentd/watcher_test.go index afcf80bc60..980a2ca8f7 100644 --- a/backend/agentd/watcher_test.go +++ b/backend/agentd/watcher_test.go @@ -17,3 +17,13 @@ func TestGetEntityConfigWatcher(t *testing.T) { ch := GetEntityConfigWatcher(context.Background(), client) assert.NotNil(t, ch) } + +func TestGetUserConfigWatcher(t *testing.T) { + e, cleanup := etcd.NewTestEtcd(t) + defer cleanup() + client := e.NewEmbeddedClient() + defer client.Close() + + ch := GetUserConfigWatcher(context.Background(), client) + assert.NotNil(t, ch) +} diff --git a/backend/store/store.go b/backend/store/store.go index fb01ae130b..da94e2d9b9 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -6,7 +6,6 @@ import ( "fmt" corev2 "github.com/sensu/core/v2" - v2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" "github.com/sensu/sensu-go/backend/store/patch" "github.com/sensu/sensu-go/types" @@ -160,10 +159,10 @@ type WatchEventEntityConfig struct { // WatchEventUserConfig contains and updated entity config and the action that // occurred during this modification type WatchEventUserConfig struct { - User *corev2.User - Action WatchActionType - Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` - //Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` + User *corev2.User + Action WatchActionType + //Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` + ////Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` } // Store is used to abstract the durable storage used by the Sensu backend From 76e1bda0b19c98b2d14dbed843bcab73e752fb5c Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Wed, 6 Mar 2024 21:42:22 +0530 Subject: [PATCH 07/16] testing Signed-off-by: SudhanshuBawane --- backend/agentd/session.go | 285 +++++++++++++++++++++----------------- backend/agentd/watcher.go | 106 ++++++++++---- backend/store/store.go | 5 +- 3 files changed, 237 insertions(+), 159 deletions(-) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 25e925bcd1..e19002a03c 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -101,7 +101,7 @@ type Session struct { user string mu sync.Mutex subscriptionsMap map[string]subscription - userReceiver *UserReceiver + //userReceiver *UserReceiver } // subscription is used to abstract a message.Subscription and therefore allow @@ -181,19 +181,19 @@ func (b *BurialReceiver) Receiver() chan<- interface{} { return b.ch } -type UserReceiver struct { - ch chan interface{} -} - -func NewUserReceiver() *UserReceiver { - return &UserReceiver{ - ch: make(chan interface{}, 1), - } -} - -func (b *UserReceiver) Receiver() chan<- interface{} { - return b.ch -} +//type UserReceiver struct { +// ch chan interface{} +//} +// +//func NewUserReceiver() *UserReceiver { +// return &UserReceiver{ +// ch: make(chan interface{}, 1), +// } +//} +// +//func (b *UserReceiver) Receiver() chan<- interface{} { +// return b.ch +//} // NewSession creates a new Session object given the triple of a transport // connection, message bus, and store. @@ -224,7 +224,7 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { unmarshal: cfg.Unmarshal, marshal: cfg.Marshal, user: cfg.User, - userReceiver: NewUserReceiver(), + //userReceiver: NewUserReceiver(), entityConfig: &entityConfig{ subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), @@ -247,7 +247,7 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { }() } - _, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userReceiver) + _, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userConfig) if err != nil { return nil, err } @@ -363,75 +363,84 @@ func (s *Session) sender() { var msg *transport.Message select { //---- user -----// - case u := <-s.userReceiver.ch: - user, ok := u.(*corev2.User) - if !ok { - logger.WithField("unexpected user struct", ok) - } - - configUser, err := s.marshal(user) - if err != nil { - logger.WithError(err).Error("session failed to serialize the user config") - } - - if user.Disabled && user.Username == s.user { - return - } - - msg = transport.NewMessage(user.Username, configUser) - - //case u := <-s.userConfig.updatesChannel: - // watchEvent, ok := u.(*store.WatchEventUserConfig) - // fmt.Println("========== usrConfig Updates ========", watchEvent) + //case u := <-s.userReceiver.ch: + // user, ok := u.(*corev2.User) // if !ok { - // logger.Errorf("session received unexoected user struct : %T", u) - // continue - // } - // fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown) - // //if watchEvent.User.Disabled && watchEvent.User.Username == s.user { - // // return - // //} - // fmt.Println("========== usrConfig Updates ========", watchEvent) - // //// Handle the delete/disable event - // switch watchEvent.Action { - // case store.WatchDelete: - // fmt.Println(" ======= delete =======", store.WatchDelete) - // return - // case store.WatchCreate: - // fmt.Println("======= create user ====", store.WatchCreate) - // case store.WatchUpdate: - // fmt.Println("==== user update ======", store.WatchUpdate) - // - // } - // - // if watchEvent.User == nil { - // logger.Error("session received nil user in watch event") + // logger.WithField("unexpected user struct", ok) // } - // // - // lagger := logger.WithFields(logrus.Fields{ - // "action": watchEvent.Action.String(), - // "user": watchEvent.User.Username, - // "namespace": watchEvent.User.GetMetadata().Namespace, - // }) - // lagger.Debug("user update received") // - // configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) - // wrapper, err := storev2.WrapResource(watchEvent.User) + // configUser, err := s.marshal(user) // if err != nil { - // lagger.WithError(err).Error("could not warp the user config") - // continue + // logger.WithError(err).Error("session failed to serialize the user config") // } // - // if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { - // sessionErrorCounter.WithLabelValues(err.Error()).Inc() - // lagger.WithError(err).Error("could not update the user config") + // if user.Disabled && user.Username == s.user { + // return // } // - // bytes, err := s.marshal(watchEvent.User) - // if err != nil { - // lagger.WithError(err).Error("session failed to serialize user config") - // } - // msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + // msg = transport.NewMessage(user.Username, configUser) + + case u := <-s.userConfig.updatesChannel: + + watchEvent, ok := u.(*store.WatchEventUserConfig) + fmt.Println("========== usrConfig Updates ========", watchEvent) + if !ok { + logger.Errorf("session received unexoected user struct : %T", u) + continue + } + //fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown) + //if watchEvent.User.Disabled && watchEvent.User.Username == s.user { + // return + //} + if watchEvent.Disabled { + fmt.Println("========= the user is now disabled =======") + s.stop() + return + } + + fmt.Println("========== usrConfig Updates ========", watchEvent) + //// Handle the delete/disable event + //switch userConfig { + //case store.WatchDelete: + // fmt.Println(" ======= delete =======", store.WatchDelete) + // return + //case store.WatchCreate: + // fmt.Println("======= create user ====", store.WatchCreate) + //case store.WatchUpdate: + // fmt.Println("==== user update ======", store.WatchUpdate) + //default: + // panic("unhandled default case") + //} + + if watchEvent.User == nil { + logger.Error("session received nil user in watch event") + } + // + lagger := logger.WithFields(logrus.Fields{ + "action": watchEvent.Action.String(), + "user": watchEvent.User.Username, + "namespace": watchEvent.User.GetMetadata().Namespace, + }) + lagger.Debug("user update received") + + //configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) + //wrapper, err := storev2.WrapResource(watchEvent.User) + //if err != nil { + // lagger.WithError(err).Error("could not warp the user config") + // continue + //} + // + //if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { + // sessionErrorCounter.WithLabelValues(err.Error()).Inc() + // lagger.WithError(err).Error("could not update the user config") + //} + + bytes, err := s.marshal(watchEvent.User) + if err != nil { + lagger.WithError(err).Error("session failed to serialize user config") + } + //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + msg = transport.NewMessage(corev2.UserType, bytes) // ---- entity ----// case e := <-s.entityConfig.updatesChannel: @@ -567,7 +576,7 @@ func (s *Session) Start() (err error) { sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} s.wg.Add(2) - s.stopWG.Add(1) + s.stopWG.Add(2) go s.sender() go s.receiver() go func() { @@ -604,57 +613,79 @@ func (s *Session) Start() (err error) { return err } s.userConfig.subscription <- userSubscription - usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName()) - usrWrapper, err := s.storev2.Get(usrReq) + //usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName()) + //usrWrapper, err := s.storev2.Get(usrReq) + + //err = usrWrapper.UnwrapInto(&storedUserConfig) + //if err != nil { + // lager.WithError(err).Error("error unwrapping user config") + // return err + //} + + // Remove the managed_by label if the value is sensu-agent, in case of disabled user + //if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + // delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) + //} + + // Send back this user config to the agent so it uses that rather than it's local config + watchEvent := &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &corev2.User{}, + } + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) if err != nil { - // Just exit but don't send error about absence of user config - var errNotFound *store.ErrNotFound - if !errors.As(err, &errNotFound) { - lager.WithError(err).Error("error querying the user config") - return err - } - lager.Debug("no user config found") - - // Indicate to the agent that this user does not exist - //meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) - watchEvent := &store.WatchEventUserConfig{ - User: &corev2.User{}, - Action: store.WatchCreate, - //Metadata: &meta, - } - err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) - if err != nil { - lager.WithError(err).Error("error publishing user config") - return err - } - } else { - // A user config already exists, therefore we should the stored user subscriptions - // rather than what the agent provided us for the subscriptions - lager.Debug("an user config was found") - - var storedUserConfig corev2.User - err = usrWrapper.UnwrapInto(&storedUserConfig) - if err != nil { - lager.WithError(err).Error("error unwrapping user config") - return err - } - - // Remove the managed_by label if the value is sensu-agent, in case of disabled user - if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) - } - - // Send back this user config to the agent so it uses that rather than it's local config - watchEvent := &store.WatchEventUserConfig{ - Action: store.WatchUpdate, - User: &storedUserConfig, - } - err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) - if err != nil { - lager.WithError(err).Error("error publishing user config") - return err - } + lager.WithError(err).Error("error publishing user config") + return err } + //if err != nil { + // // Just exit but don't send error about absence of user config + // var errNotFound *store.ErrNotFound + // if !errors.As(err, &errNotFound) { + // lager.WithError(err).Error("error querying the user config") + // return err + // } + // lager.Debug("no user config found") + // + // // Indicate to the agent that this user does not exist + // //meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) + // watchEvent := &store.WatchEventUserConfig{ + // User: &corev2.User{}, + // Action: store.WatchCreate, + // //Metadata: &meta, + // } + // err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + // if err != nil { + // lager.WithError(err).Error("error publishing user config") + // return err + // } + //} else { + // // A user config already exists, therefore we should the stored user subscriptions + // // rather than what the agent provided us for the subscriptions + // lager.Debug("an user config was found") + // + // var storedUserConfig corev2.User + // err = usrWrapper.UnwrapInto(&storedUserConfig) + // if err != nil { + // lager.WithError(err).Error("error unwrapping user config") + // return err + // } + // + // // Remove the managed_by label if the value is sensu-agent, in case of disabled user + // if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + // delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) + // } + // + // // Send back this user config to the agent so it uses that rather than it's local config + // watchEvent := &store.WatchEventUserConfig{ + // Action: store.WatchUpdate, + // User: &storedUserConfig, + // } + // err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + // if err != nil { + // lager.WithError(err).Error("error publishing user config") + // return err + // } + //} // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index 7e93ffc79f..0d05d92b76 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -73,22 +73,83 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig -func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { - //ctx, cancel := context.WithCancel(context.Background()) - //defer cancel() +//func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { +// //ctx, cancel := context.WithCancel(context.Background()) +// //defer cancel() +// +// key := etcdstorev2.StoreKey(storev2.ResourceRequest{ +// Context: ctx, +// StoreName: new(corev2.User).StoreName(), +// }) +// w := etcdstore.Watch(ctx, client, key, true) +// fmt.Sprintf("======= user metadata ========= w : %v, ctx : %v, client: %v, key: %v", w, ctx, client, key) +// ch := make(chan store.WatchEventUserConfig, 1) +// +// go func() { +// defer close(ch) +// for response := range w.Result() { +// fmt.Println("======= user config response ========", response) +// if response.Type == store.WatchError { +// logger. +// WithError(errors.New(string(response.Object))). +// Error("Unexpected error while watching for the user config updates") +// ch <- store.WatchEventUserConfig{ +// Action: response.Type, +// } +// continue +// } +// //var ( +// // configWrapper wrap.Wrapper +// // userConfig corev2.User +// //) +// +// // Decode and unwrap the user config +// +// //if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { +// // fmt.Println("====== unmarshaled user =======", configWrapper) +// // logger.WithField("key", response.Key).WithError(err). +// // Error("unable to unmarshal user config from key") +// // continue +// //} +// +// //if err := configWrapper.UnwrapInto(&userConfig); err != nil { +// // fmt.Println("====== unmarshaled user =======", userConfig) +// // logger.WithField("key", response.Key).WithError(err). +// // Error("unable to unwrap entity config from key") +// // continue +// //} +// +// // Remove the managed_by label if the value is sensu-agent, in case the user is disabled +// //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { +// // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) +// //} +// +// ch <- store.WatchEventUserConfig{ +// User: &corev2.User{}, +// Action: response.Type, +// } +// } +// }() +// +// logger.Println("========= watch metadata ========", w) +// return ch +//} +// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 +// over the bus as store.WatchEventUserConfig +func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { + var userConfig corev2.User key := etcdstorev2.StoreKey(storev2.ResourceRequest{ Context: ctx, StoreName: new(corev2.User).StoreName(), }) w := etcdstore.Watch(ctx, client, key, true) - fmt.Sprintf("======= user metadata ========= w : %v, ctx : %v, client: %v, key: %v", w, ctx, client, key) - ch := make(chan store.WatchEventUserConfig, 1) + ch := make(chan store.WatchEventUserConfig, 10) go func() { defer close(ch) for response := range w.Result() { - fmt.Println("======= user config response ========", response) + logger.Info("read from user watch channel") if response.Type == store.WatchError { logger. WithError(errors.New(string(response.Object))). @@ -98,39 +159,24 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } continue } - var ( - configWrapper wrap.Wrapper - userConfig corev2.User - ) - // Decode and unwrap the user config + // unmarshal the user config - //if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { - // fmt.Println("====== unmarshaled user =======", configWrapper) - // logger.WithField("key", response.Key).WithError(err). - // Error("unable to unmarshal user config from key") - // continue - //} - - if err := configWrapper.UnwrapInto(&userConfig); err != nil { - fmt.Println("====== unmarshaled user =======", userConfig) - logger.WithField("key", response.Key).WithError(err). - Error("unable to unwrap entity config from key") + if err := proto.Unmarshal(response.Object, &userConfig); err != nil { continue } - - // Remove the managed_by label if the value is sensu-agent, in case the user is disabled - //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) - //} + if userConfig.Disabled { + fmt.Println("======= user watch event in watcher ========", userConfig, string(rune(response.Type))) + } ch <- store.WatchEventUserConfig{ - User: &userConfig, - Action: response.Type, + User: &userConfig, + Action: response.Type, + Disabled: userConfig.Disabled, } } }() - logger.Println("========= watch metadata ========", w) + logger.Println("----watch metadata----", w) return ch } diff --git a/backend/store/store.go b/backend/store/store.go index da94e2d9b9..0bcf4cf6e8 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -159,8 +159,9 @@ type WatchEventEntityConfig struct { // WatchEventUserConfig contains and updated entity config and the action that // occurred during this modification type WatchEventUserConfig struct { - User *corev2.User - Action WatchActionType + User *corev2.User + Action WatchActionType + Disabled bool //Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` ////Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` } From e967cedbdd29c65bbdbe95768cd684f8e3ca0e1b Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 7 Mar 2024 00:59:29 +0530 Subject: [PATCH 08/16] testing Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 2 +- backend/agentd/session.go | 38 ++++++++++++++++++++------------------ backend/agentd/watcher.go | 6 +++--- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 4b2abc0736..b47f29cc24 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -326,7 +326,7 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { return errors.New("nil entry received from the user config watcher") } topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.Username) - if err := a.bus.Publish(topic, event.User.Username); err != nil { + if err := a.bus.Publish(topic, &event); err != nil { logger.WithField("topic", topic).WithError(err). Error("unable to publish a user config update to the bus") return err diff --git a/backend/agentd/session.go b/backend/agentd/session.go index e19002a03c..285ddee334 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -381,7 +381,6 @@ func (s *Session) sender() { // msg = transport.NewMessage(user.Username, configUser) case u := <-s.userConfig.updatesChannel: - watchEvent, ok := u.(*store.WatchEventUserConfig) fmt.Println("========== usrConfig Updates ========", watchEvent) if !ok { @@ -392,25 +391,28 @@ func (s *Session) sender() { //if watchEvent.User.Disabled && watchEvent.User.Username == s.user { // return //} - if watchEvent.Disabled { - fmt.Println("========= the user is now disabled =======") - s.stop() - return - } - - fmt.Println("========== usrConfig Updates ========", watchEvent) - //// Handle the delete/disable event - //switch userConfig { - //case store.WatchDelete: - // fmt.Println(" ======= delete =======", store.WatchDelete) + //if watchEvent.Disabled { + // fmt.Println("========= the user is now disabled =======") + // s.stop() // return - //case store.WatchCreate: - // fmt.Println("======= create user ====", store.WatchCreate) - //case store.WatchUpdate: - // fmt.Println("==== user update ======", store.WatchUpdate) - //default: - // panic("unhandled default case") //} + fmt.Println("========== usrConfig Updates ========", watchEvent) + // Handle the delete/disable event + switch watchEvent.Action { + case store.WatchDelete: + fmt.Println(" ======= delete =======", store.WatchDelete) + return + case store.WatchCreate: + fmt.Println("======= create user ====", store.WatchCreate) + case store.WatchUpdate: + if watchEvent.Disabled { + fmt.Println("========= the user is now disabled =======") + return + } + fmt.Println("==== user update ======", store.WatchUpdate) + default: + panic("unhandled default case") + } if watchEvent.User == nil { logger.Error("session received nil user in watch event") diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index 0d05d92b76..42940ef3f5 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -138,7 +138,7 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { - var userConfig corev2.User + key := etcdstorev2.StoreKey(storev2.ResourceRequest{ Context: ctx, StoreName: new(corev2.User).StoreName(), @@ -161,12 +161,12 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } // unmarshal the user config - + var userConfig corev2.User if err := proto.Unmarshal(response.Object, &userConfig); err != nil { continue } if userConfig.Disabled { - fmt.Println("======= user watch event in watcher ========", userConfig, string(rune(response.Type))) + fmt.Println("======= user watch event in watcher ========", userConfig.Username, userConfig.Disabled, response.Type) } ch <- store.WatchEventUserConfig{ From 22d80ea3204bd5786e5fed45d0e6fea81b501d1c Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 7 Mar 2024 13:51:42 +0530 Subject: [PATCH 09/16] working solution Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 13 ++--- backend/agentd/session.go | 83 +++++--------------------------- backend/agentd/watcher.go | 16 ++---- backend/messaging/message_bus.go | 4 +- backend/store/store.go | 2 - 5 files changed, 26 insertions(+), 92 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index b47f29cc24..38cf207441 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -105,7 +105,7 @@ type Agentd struct { serveWaitTime time.Duration ready func() backendEntity *corev2.Entity - userWatcher <-chan store.WatchEventUserConfig + userWatcher <-chan *store.WatchEventUserConfig } // Config configures an Agentd. @@ -122,7 +122,7 @@ type Config struct { EtcdClientTLSConfig *tls.Config Watcher <-chan store.WatchEventEntityConfig BackendEntity *corev2.Entity - UserWatcher <-chan store.WatchEventUserConfig + UserWatcher <-chan *store.WatchEventUserConfig } // Option is a functional option. @@ -321,17 +321,18 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error { } // adding the UserConfig updates to the etcd bus for the watcher to consume -func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { +func (a *Agentd) handleUserEvent(event *store.WatchEventUserConfig) error { if event.User == nil { return errors.New("nil entry received from the user config watcher") } - topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.Username) - if err := a.bus.Publish(topic, &event); err != nil { + + topic := messaging.UserConfigTopic(event.User.Username) + if err := a.bus.Publish(topic, event); err != nil { logger.WithField("topic", topic).WithError(err). Error("unable to publish a user config update to the bus") return err } - //a.bus.Publish("userChanges", event.User.Username) + logger.WithField("topic", topic). Debug("successfully published an user config update to the bus") return nil diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 285ddee334..ffc877df62 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -25,8 +25,6 @@ import ( ) const ( - UserNotFound = "not found" - deletedEventSentinel = -1 // Time to wait before force close on connection. @@ -101,7 +99,6 @@ type Session struct { user string mu sync.Mutex subscriptionsMap map[string]subscription - //userReceiver *UserReceiver } // subscription is used to abstract a message.Subscription and therefore allow @@ -181,20 +178,6 @@ func (b *BurialReceiver) Receiver() chan<- interface{} { return b.ch } -//type UserReceiver struct { -// ch chan interface{} -//} -// -//func NewUserReceiver() *UserReceiver { -// return &UserReceiver{ -// ch: make(chan interface{}, 1), -// } -//} -// -//func (b *UserReceiver) Receiver() chan<- interface{} { -// return b.ch -//} - // NewSession creates a new Session object given the triple of a transport // connection, message bus, and store. // The Session is responsible for stopping itself, and does so when it @@ -224,7 +207,6 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { unmarshal: cfg.Unmarshal, marshal: cfg.Marshal, user: cfg.User, - //userReceiver: NewUserReceiver(), entityConfig: &entityConfig{ subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), @@ -247,9 +229,11 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { }() } - _, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userConfig) - if err != nil { - return nil, err + if len(cfg.User) > 0 { + _, err := s.bus.Subscribe(messaging.UserConfigTopic(cfg.User), cfg.AgentName, s.userConfig) + if err != nil { + return nil, err + } } if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil { @@ -363,53 +347,23 @@ func (s *Session) sender() { var msg *transport.Message select { //---- user -----// - //case u := <-s.userReceiver.ch: - // user, ok := u.(*corev2.User) - // if !ok { - // logger.WithField("unexpected user struct", ok) - // } - // - // configUser, err := s.marshal(user) - // if err != nil { - // logger.WithError(err).Error("session failed to serialize the user config") - // } - // - // if user.Disabled && user.Username == s.user { - // return - // } - // - // msg = transport.NewMessage(user.Username, configUser) case u := <-s.userConfig.updatesChannel: watchEvent, ok := u.(*store.WatchEventUserConfig) - fmt.Println("========== usrConfig Updates ========", watchEvent) if !ok { logger.Errorf("session received unexoected user struct : %T", u) continue } - //fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown) - //if watchEvent.User.Disabled && watchEvent.User.Username == s.user { - // return - //} - //if watchEvent.Disabled { - // fmt.Println("========= the user is now disabled =======") - // s.stop() - // return - //} - fmt.Println("========== usrConfig Updates ========", watchEvent) // Handle the delete/disable event switch watchEvent.Action { - case store.WatchDelete: - fmt.Println(" ======= delete =======", store.WatchDelete) - return case store.WatchCreate: - fmt.Println("======= create user ====", store.WatchCreate) + logger.Println("New user has been created") case store.WatchUpdate: if watchEvent.Disabled { - fmt.Println("========= the user is now disabled =======") + logger.Warn("The user associated with the agent is now disabled") return } - fmt.Println("==== user update ======", store.WatchUpdate) + logger.Println("The update operation has been performed on user") default: panic("unhandled default case") } @@ -421,28 +375,15 @@ func (s *Session) sender() { lagger := logger.WithFields(logrus.Fields{ "action": watchEvent.Action.String(), "user": watchEvent.User.Username, - "namespace": watchEvent.User.GetMetadata().Namespace, + "namespace": watchEvent.User.GetMetadata().GetNamespace(), }) lagger.Debug("user update received") - //configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) - //wrapper, err := storev2.WrapResource(watchEvent.User) - //if err != nil { - // lagger.WithError(err).Error("could not warp the user config") - // continue - //} - // - //if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { - // sessionErrorCounter.WithLabelValues(err.Error()).Inc() - // lagger.WithError(err).Error("could not update the user config") - //} - bytes, err := s.marshal(watchEvent.User) if err != nil { lagger.WithError(err).Error("session failed to serialize user config") } - //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) - msg = transport.NewMessage(corev2.UserType, bytes) + msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) // ---- entity ----// case e := <-s.entityConfig.updatesChannel: @@ -600,7 +541,7 @@ func (s *Session) Start() (err error) { // Subscribe the agent to its entity_config and user_config topic topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) - userTopic := messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName) + userTopic := messaging.UserConfigTopic(s.cfg.AgentName) lager.WithField("topic", topic).Debug("subscribing to topic") logger.WithField("topic", userTopic).Debug("subscribing to topic") // Get a unique name for the agent, which will be used as the consumer of the @@ -634,7 +575,7 @@ func (s *Session) Start() (err error) { Action: store.WatchUpdate, User: &corev2.User{}, } - err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent) if err != nil { lager.WithError(err).Error("error publishing user config") return err diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index 42940ef3f5..973b2e897d 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -3,7 +3,6 @@ package agentd import ( "context" "errors" - "fmt" "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" @@ -137,24 +136,24 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig -func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { +func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan *store.WatchEventUserConfig { key := etcdstorev2.StoreKey(storev2.ResourceRequest{ Context: ctx, StoreName: new(corev2.User).StoreName(), }) w := etcdstore.Watch(ctx, client, key, true) - ch := make(chan store.WatchEventUserConfig, 10) + ch := make(chan *store.WatchEventUserConfig, 1) go func() { defer close(ch) for response := range w.Result() { - logger.Info("read from user watch channel") + logger.Info("read from user config watcher channel") if response.Type == store.WatchError { logger. WithError(errors.New(string(response.Object))). Error("Unexpected error while watching for the user config updates") - ch <- store.WatchEventUserConfig{ + ch <- &store.WatchEventUserConfig{ Action: response.Type, } continue @@ -165,18 +164,13 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s if err := proto.Unmarshal(response.Object, &userConfig); err != nil { continue } - if userConfig.Disabled { - fmt.Println("======= user watch event in watcher ========", userConfig.Username, userConfig.Disabled, response.Type) - } - ch <- store.WatchEventUserConfig{ + ch <- &store.WatchEventUserConfig{ User: &userConfig, Action: response.Type, Disabled: userConfig.Disabled, } } }() - - logger.Println("----watch metadata----", w) return ch } diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 5e42ff99ec..9f8095326d 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -106,8 +106,8 @@ func EntityConfigTopic(namespace, name string) string { return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name) } -func UserConfigTopic(namespace, name string) string { - return fmt.Sprintf("%s:%s:%s", TopicUserConfig, namespace, name) +func UserConfigTopic(name string) string { + return fmt.Sprintf("%s:%s", TopicUserConfig, name) } // SubscriptionTopic is a helper to determine the proper topic name for a diff --git a/backend/store/store.go b/backend/store/store.go index 0bcf4cf6e8..3ce4e225d1 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -162,8 +162,6 @@ type WatchEventUserConfig struct { User *corev2.User Action WatchActionType Disabled bool - //Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` - ////Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` } // Store is used to abstract the durable storage used by the Sensu backend From af8961250ecb074642c4511f02fa0c54abf766d4 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 7 Mar 2024 18:46:28 +0530 Subject: [PATCH 10/16] with test cases Signed-off-by: SudhanshuBawane --- backend/agentd/agentd_test.go | 9 ++- backend/agentd/session.go | 52 +------------- backend/agentd/session_test.go | 127 ++++++++++++++++++++++++++++++++- 3 files changed, 130 insertions(+), 58 deletions(-) diff --git a/backend/agentd/agentd_test.go b/backend/agentd/agentd_test.go index 27a7549779..c02c1874db 100644 --- a/backend/agentd/agentd_test.go +++ b/backend/agentd/agentd_test.go @@ -6,11 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" "github.com/sensu/sensu-go/backend/apid/middlewares" @@ -22,6 +17,10 @@ import ( "github.com/sensu/sensu-go/transport" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" ) func TestAgentdMiddlewares(t *testing.T) { diff --git a/backend/agentd/session.go b/backend/agentd/session.go index ffc877df62..79b0b92f0a 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -162,6 +162,7 @@ type SessionConfig struct { // with the session has been buried. Necessary when running parallel keepalived // workers. BurialReceiver *BurialReceiver + userConfig *userConfig } type BurialReceiver struct { @@ -356,8 +357,6 @@ func (s *Session) sender() { } // Handle the delete/disable event switch watchEvent.Action { - case store.WatchCreate: - logger.Println("New user has been created") case store.WatchUpdate: if watchEvent.Disabled { logger.Warn("The user associated with the agent is now disabled") @@ -580,55 +579,6 @@ func (s *Session) Start() (err error) { lager.WithError(err).Error("error publishing user config") return err } - //if err != nil { - // // Just exit but don't send error about absence of user config - // var errNotFound *store.ErrNotFound - // if !errors.As(err, &errNotFound) { - // lager.WithError(err).Error("error querying the user config") - // return err - // } - // lager.Debug("no user config found") - // - // // Indicate to the agent that this user does not exist - // //meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) - // watchEvent := &store.WatchEventUserConfig{ - // User: &corev2.User{}, - // Action: store.WatchCreate, - // //Metadata: &meta, - // } - // err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) - // if err != nil { - // lager.WithError(err).Error("error publishing user config") - // return err - // } - //} else { - // // A user config already exists, therefore we should the stored user subscriptions - // // rather than what the agent provided us for the subscriptions - // lager.Debug("an user config was found") - // - // var storedUserConfig corev2.User - // err = usrWrapper.UnwrapInto(&storedUserConfig) - // if err != nil { - // lager.WithError(err).Error("error unwrapping user config") - // return err - // } - // - // // Remove the managed_by label if the value is sensu-agent, in case of disabled user - // if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - // delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) - // } - // - // // Send back this user config to the agent so it uses that rather than it's local config - // watchEvent := &store.WatchEventUserConfig{ - // Action: store.WatchUpdate, - // User: &storedUserConfig, - // } - // err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) - // if err != nil { - // lager.WithError(err).Error("error publishing user config") - // return err - // } - //} // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) diff --git a/backend/agentd/session_test.go b/backend/agentd/session_test.go index 35d18b3acf..8e29f96b2a 100644 --- a/backend/agentd/session_test.go +++ b/backend/agentd/session_test.go @@ -1,18 +1,20 @@ package agentd import ( + "bytes" "context" "errors" "fmt" + "github.com/sirupsen/logrus" "reflect" "sync" "testing" "time" "github.com/gogo/protobuf/proto" - "github.com/sensu/sensu-go/agent" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" + "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/store" storev2 "github.com/sensu/sensu-go/backend/store/v2" @@ -75,11 +77,126 @@ func TestMakeEntitySwitchBurialEvent(t *testing.T) { } } +type UserConfig struct { + updatesChannel chan interface{} +} + func TestSession_sender(t *testing.T) { type busFunc func(*messaging.WizardBus, *sync.WaitGroup) type connFunc func(*mocktransport.MockTransport, *sync.WaitGroup) type storeFunc func(*storetest.Store, *sync.WaitGroup) + userTests := []struct { + name string + connFunc connFunc + watchEvent *store.WatchEventUserConfig + expectedLog string + expectedError bool + busFunc busFunc + storeFunc storeFunc + subscriptions []string + }{ + { + name: "valid watchEvent received", + watchEvent: &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &corev2.User{ + Username: "testUser", + }, + }, + expectedLog: "user update received", + }, + { + name: "watchEvent with nil user", + watchEvent: &store.WatchEventUserConfig{ + Action: store.WatchCreate, + User: nil, + }, + expectedLog: "session received nil user in watch event", + expectedError: true, + }, + } + + for _, tt := range userTests { + t.Run(tt.name, func(t *testing.T) { + wg := &sync.WaitGroup{} + + // Mock our transport + conn := new(mocktransport.MockTransport) + if tt.connFunc != nil { + tt.connFunc(conn, wg) + } + + // Mock our store + st := &mockstore.MockStore{} + storev2 := &storetest.Store{} + if tt.storeFunc != nil { + tt.storeFunc(storev2, wg) + } + + // Mock our bus + bus, err := messaging.NewWizardBus(messaging.WizardBusConfig{}) + if err != nil { + t.Fatal(err) + } + if err := bus.Start(); err != nil { + t.Fatal(err) + } + + // Mocking logger + var logBuffer bytes.Buffer + logger := logrus.New() + logger.SetOutput(&logBuffer) + + s := SessionConfig{ + AgentName: "testing", + Namespace: "default", + Conn: conn, + Bus: bus, + Store: st, + Storev2: storev2, + Unmarshal: agent.UnmarshalJSON, + Marshal: agent.MarshalJSON, + userConfig: &userConfig{ + updatesChannel: make(chan interface{}), + }, + } + + session, err := NewSession(context.Background(), s) + if err != nil { + t.Fatal(err) + } + session.wg = &sync.WaitGroup{} + session.wg.Add(1) + + userTopic := messaging.UserConfigTopic(session.cfg.AgentName) + _, err = session.bus.Subscribe(userTopic, session.cfg.AgentName, session.userConfig) + if err != nil { + t.Fatal(err) + } + + go session.sender() + // Send our watch events over the wizard bus + if tt.busFunc != nil { + tt.busFunc(bus, wg) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-session.ctx.Done(): + case <-done: + session.Stop() + case <-time.After(5 * time.Second): + t.Fatal("session never stopped, we probably never received an user update over the channel") + } + }) + } + tests := []struct { name string busFunc busFunc @@ -299,6 +416,12 @@ func TestSession_sender(t *testing.T) { if err != nil { t.Fatal(err) } + // + //userTopic := messaging.UserConfigTopic(session.cfg.AgentName) + //_, err = session.bus.Subscribe(userTopic, session.cfg.AgentName, session.userConfig) + //if err != nil { + // t.Fatal(err) + //} go session.sender() @@ -318,7 +441,7 @@ func TestSession_sender(t *testing.T) { case <-done: session.Stop() case <-time.After(5 * time.Second): - t.Fatal("session never stopped, we probably never received an entity update over the channel") + t.Fatal("session never stopped, we probably never received an entity/user update over the channel") } }) } From 8c10bd42d535544b07a09b5a0497ced5d9d3fcc2 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 7 Mar 2024 18:50:54 +0530 Subject: [PATCH 11/16] with test cases Signed-off-by: SudhanshuBawane --- backend/agentd/session_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/agentd/session_test.go b/backend/agentd/session_test.go index 8e29f96b2a..2c9d6f4fb9 100644 --- a/backend/agentd/session_test.go +++ b/backend/agentd/session_test.go @@ -77,10 +77,6 @@ func TestMakeEntitySwitchBurialEvent(t *testing.T) { } } -type UserConfig struct { - updatesChannel chan interface{} -} - func TestSession_sender(t *testing.T) { type busFunc func(*messaging.WizardBus, *sync.WaitGroup) type connFunc func(*mocktransport.MockTransport, *sync.WaitGroup) From 1fbe73b1f5f0cc2e0d4bed97cd92d652f04d0345 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 7 Mar 2024 19:28:00 +0530 Subject: [PATCH 12/16] final Signed-off-by: SudhanshuBawane --- CHANGELOG-6.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG-6.md b/CHANGELOG-6.md index 5551cd6094..538cd1b8f2 100644 --- a/CHANGELOG-6.md +++ b/CHANGELOG-6.md @@ -8,6 +8,17 @@ Versioning](http://semver.org/spec/v2.0.0.html). ## Unreleased +### 2024-03-07 + +### Added +- Addition of a new watcher configuration to monitor the user updates +- Added the exit mechanism to disconnect agent when user is disabled +- Added a struct in store which will get passed down for userConfigs + +### Changed +- The session config to watch over the user updated and the wizzard bus + + ### Changed - Upgraded CI Go version to 1.21.3 - Upgraded jwt version to 4.4.3 From cc0df4bd2f70cc59bb582b763173cc7acd4cb5e4 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Fri, 8 Mar 2024 13:58:04 +0530 Subject: [PATCH 13/16] testing Signed-off-by: SudhanshuBawane --- backend/agentd/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 79b0b92f0a..cc54468ba6 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -518,7 +518,7 @@ func (s *Session) Start() (err error) { sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} s.wg.Add(2) - s.stopWG.Add(2) + s.stopWG.Add(1) go s.sender() go s.receiver() go func() { From 21755434b3a08b406e905738aff4b9ffb88453ec Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Fri, 8 Mar 2024 14:10:00 +0530 Subject: [PATCH 14/16] testing Signed-off-by: SudhanshuBawane --- backend/agentd/watcher.go | 64 --------------------------------------- 1 file changed, 64 deletions(-) diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index 973b2e897d..c3c52185ab 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -70,70 +70,6 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan return ch } -// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 -// over the bus as store.WatchEventUserConfig -//func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig { -// //ctx, cancel := context.WithCancel(context.Background()) -// //defer cancel() -// -// key := etcdstorev2.StoreKey(storev2.ResourceRequest{ -// Context: ctx, -// StoreName: new(corev2.User).StoreName(), -// }) -// w := etcdstore.Watch(ctx, client, key, true) -// fmt.Sprintf("======= user metadata ========= w : %v, ctx : %v, client: %v, key: %v", w, ctx, client, key) -// ch := make(chan store.WatchEventUserConfig, 1) -// -// go func() { -// defer close(ch) -// for response := range w.Result() { -// fmt.Println("======= user config response ========", response) -// if response.Type == store.WatchError { -// logger. -// WithError(errors.New(string(response.Object))). -// Error("Unexpected error while watching for the user config updates") -// ch <- store.WatchEventUserConfig{ -// Action: response.Type, -// } -// continue -// } -// //var ( -// // configWrapper wrap.Wrapper -// // userConfig corev2.User -// //) -// -// // Decode and unwrap the user config -// -// //if err := proto.Unmarshal(response.Object, &configWrapper); err != nil { -// // fmt.Println("====== unmarshaled user =======", configWrapper) -// // logger.WithField("key", response.Key).WithError(err). -// // Error("unable to unmarshal user config from key") -// // continue -// //} -// -// //if err := configWrapper.UnwrapInto(&userConfig); err != nil { -// // fmt.Println("====== unmarshaled user =======", userConfig) -// // logger.WithField("key", response.Key).WithError(err). -// // Error("unable to unwrap entity config from key") -// // continue -// //} -// -// // Remove the managed_by label if the value is sensu-agent, in case the user is disabled -// //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { -// // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) -// //} -// -// ch <- store.WatchEventUserConfig{ -// User: &corev2.User{}, -// Action: response.Type, -// } -// } -// }() -// -// logger.Println("========= watch metadata ========", w) -// return ch -//} - // GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806 // over the bus as store.WatchEventUserConfig func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan *store.WatchEventUserConfig { From 901c11f46d3133611160812b4cee065e359dd076 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Tue, 12 Mar 2024 10:08:32 +0530 Subject: [PATCH 15/16] final Signed-off-by: SudhanshuBawane --- backend/agentd/session.go | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index cc54468ba6..a3f12b6741 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -25,6 +25,8 @@ import ( ) const ( + userNotFound = "not found" + deletedEventSentinel = -1 // Time to wait before force close on connection. @@ -537,49 +539,38 @@ func (s *Session) Start() (err error) { "agent": s.cfg.AgentName, "namespace": s.cfg.Namespace, }) - - // Subscribe the agent to its entity_config and user_config topic - topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) - userTopic := messaging.UserConfigTopic(s.cfg.AgentName) - lager.WithField("topic", topic).Debug("subscribing to topic") - logger.WithField("topic", userTopic).Debug("subscribing to topic") // Get a unique name for the agent, which will be used as the consumer of the // bus, in order to avoid problems with an agent reconnecting before its // session is ended agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName) - // Determine if user already exits + // Subscribe the agent to its user_config topic + + userTopic := messaging.UserConfigTopic(s.cfg.User) + logger.WithField("topic", userTopic).Debug("subscribing to topic") userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig) if usrErr != nil { lager.WithError(err).Error("error starting subscription") return err } s.userConfig.subscription <- userSubscription - //usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName()) - //usrWrapper, err := s.storev2.Get(usrReq) - - //err = usrWrapper.UnwrapInto(&storedUserConfig) - //if err != nil { - // lager.WithError(err).Error("error unwrapping user config") - // return err - //} - - // Remove the managed_by label if the value is sensu-agent, in case of disabled user - //if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - // delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) - //} // Send back this user config to the agent so it uses that rather than it's local config watchEvent := &store.WatchEventUserConfig{ Action: store.WatchUpdate, User: &corev2.User{}, } - err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent) - if err != nil { + usrErr = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent) + if usrErr != nil { lager.WithError(err).Error("error publishing user config") return err } + // Subscribe the agent to its entity_config topic + + topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) + lager.WithField("topic", topic).Debug("subscribing to topic") + // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) if err != nil { @@ -703,6 +694,7 @@ func (s *Session) stop() { } } + // Remove the user config subscription for sub := range s.userConfig.subscription { if err := sub.Cancel(); err != nil { logger.WithError(err).Error("unable to unsubscribe from message bus") From aae8558605c0f3a97e1915589d685f12575ce6eb Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Tue, 12 Mar 2024 10:22:31 +0530 Subject: [PATCH 16/16] final Signed-off-by: SudhanshuBawane --- backend/agentd/session.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/agentd/session.go b/backend/agentd/session.go index a3f12b6741..bd0c24524a 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -25,8 +25,6 @@ import ( ) const ( - userNotFound = "not found" - deletedEventSentinel = -1 // Time to wait before force close on connection.