Skip to content

Commit

Permalink
working solution
Browse files Browse the repository at this point in the history
Signed-off-by: SudhanshuBawane <[email protected]>
  • Loading branch information
SudhanshuBawane committed Mar 7, 2024
1 parent e967ced commit 22d80ea
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 92 deletions.
13 changes: 7 additions & 6 deletions backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Agentd struct {
serveWaitTime time.Duration
ready func()
backendEntity *corev2.Entity
userWatcher <-chan store.WatchEventUserConfig
userWatcher <-chan *store.WatchEventUserConfig
}

// Config configures an Agentd.
Expand All @@ -122,7 +122,7 @@ type Config struct {
EtcdClientTLSConfig *tls.Config
Watcher <-chan store.WatchEventEntityConfig
BackendEntity *corev2.Entity
UserWatcher <-chan store.WatchEventUserConfig
UserWatcher <-chan *store.WatchEventUserConfig
}

// Option is a functional option.
Expand Down Expand Up @@ -321,17 +321,18 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
}

// adding the UserConfig updates to the etcd bus for the watcher to consume
func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error {
func (a *Agentd) handleUserEvent(event *store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}
topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.Username)
if err := a.bus.Publish(topic, &event); err != nil {

topic := messaging.UserConfigTopic(event.User.Username)
if err := a.bus.Publish(topic, event); err != nil {
logger.WithField("topic", topic).WithError(err).
Error("unable to publish a user config update to the bus")
return err
}
//a.bus.Publish("userChanges", event.User.Username)

logger.WithField("topic", topic).
Debug("successfully published an user config update to the bus")
return nil
Expand Down
83 changes: 12 additions & 71 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
)

const (
UserNotFound = "not found"

deletedEventSentinel = -1

// Time to wait before force close on connection.
Expand Down Expand Up @@ -101,7 +99,6 @@ type Session struct {
user string
mu sync.Mutex
subscriptionsMap map[string]subscription
//userReceiver *UserReceiver
}

// subscription is used to abstract a message.Subscription and therefore allow
Expand Down Expand Up @@ -181,20 +178,6 @@ func (b *BurialReceiver) Receiver() chan<- interface{} {
return b.ch
}

//type UserReceiver struct {
// ch chan interface{}
//}
//
//func NewUserReceiver() *UserReceiver {
// return &UserReceiver{
// ch: make(chan interface{}, 1),
// }
//}
//
//func (b *UserReceiver) Receiver() chan<- interface{} {
// return b.ch
//}

// NewSession creates a new Session object given the triple of a transport
// connection, message bus, and store.
// The Session is responsible for stopping itself, and does so when it
Expand Down Expand Up @@ -224,7 +207,6 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
unmarshal: cfg.Unmarshal,
marshal: cfg.Marshal,
user: cfg.User,
//userReceiver: NewUserReceiver(),
entityConfig: &entityConfig{
subscriptions: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
Expand All @@ -247,9 +229,11 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
}()
}

_, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userConfig)
if err != nil {
return nil, err
if len(cfg.User) > 0 {
_, err := s.bus.Subscribe(messaging.UserConfigTopic(cfg.User), cfg.AgentName, s.userConfig)
if err != nil {
return nil, err
}
}

if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil {
Expand Down Expand Up @@ -363,53 +347,23 @@ func (s *Session) sender() {
var msg *transport.Message
select {
//---- user -----//
//case u := <-s.userReceiver.ch:
// user, ok := u.(*corev2.User)
// if !ok {
// logger.WithField("unexpected user struct", ok)
// }
//
// configUser, err := s.marshal(user)
// if err != nil {
// logger.WithError(err).Error("session failed to serialize the user config")
// }
//
// if user.Disabled && user.Username == s.user {
// return
// }
//
// msg = transport.NewMessage(user.Username, configUser)

case u := <-s.userConfig.updatesChannel:
watchEvent, ok := u.(*store.WatchEventUserConfig)
fmt.Println("========== usrConfig Updates ========", watchEvent)
if !ok {
logger.Errorf("session received unexoected user struct : %T", u)
continue
}
//fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown)
//if watchEvent.User.Disabled && watchEvent.User.Username == s.user {
// return
//}
//if watchEvent.Disabled {
// fmt.Println("========= the user is now disabled =======")
// s.stop()
// return
//}
fmt.Println("========== usrConfig Updates ========", watchEvent)
// Handle the delete/disable event
switch watchEvent.Action {
case store.WatchDelete:
fmt.Println(" ======= delete =======", store.WatchDelete)
return
case store.WatchCreate:
fmt.Println("======= create user ====", store.WatchCreate)
logger.Println("New user has been created")
case store.WatchUpdate:
if watchEvent.Disabled {
fmt.Println("========= the user is now disabled =======")
logger.Warn("The user associated with the agent is now disabled")
return
}
fmt.Println("==== user update ======", store.WatchUpdate)
logger.Println("The update operation has been performed on user")
default:
panic("unhandled default case")
}
Expand All @@ -421,28 +375,15 @@ func (s *Session) sender() {
lagger := logger.WithFields(logrus.Fields{
"action": watchEvent.Action.String(),
"user": watchEvent.User.Username,
"namespace": watchEvent.User.GetMetadata().Namespace,
"namespace": watchEvent.User.GetMetadata().GetNamespace(),
})
lagger.Debug("user update received")

//configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User)
//wrapper, err := storev2.WrapResource(watchEvent.User)
//if err != nil {
// lagger.WithError(err).Error("could not warp the user config")
// continue
//}
//
//if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil {
// sessionErrorCounter.WithLabelValues(err.Error()).Inc()
// lagger.WithError(err).Error("could not update the user config")
//}

bytes, err := s.marshal(watchEvent.User)
if err != nil {
lagger.WithError(err).Error("session failed to serialize user config")
}
//msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)
msg = transport.NewMessage(corev2.UserType, bytes)
msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// ---- entity ----//
case e := <-s.entityConfig.updatesChannel:
Expand Down Expand Up @@ -600,7 +541,7 @@ func (s *Session) Start() (err error) {

// Subscribe the agent to its entity_config and user_config topic
topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
userTopic := messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
userTopic := messaging.UserConfigTopic(s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")
logger.WithField("topic", userTopic).Debug("subscribing to topic")
// Get a unique name for the agent, which will be used as the consumer of the
Expand Down Expand Up @@ -634,7 +575,7 @@ func (s *Session) Start() (err error) {
Action: store.WatchUpdate,
User: &corev2.User{},
}
err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent)
err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent)
if err != nil {
lager.WithError(err).Error("error publishing user config")
return err
Expand Down
16 changes: 5 additions & 11 deletions backend/agentd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package agentd
import (
"context"
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
Expand Down Expand Up @@ -137,24 +136,24 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan

// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806
// over the bus as store.WatchEventUserConfig
func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig {
func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan *store.WatchEventUserConfig {

key := etcdstorev2.StoreKey(storev2.ResourceRequest{
Context: ctx,
StoreName: new(corev2.User).StoreName(),
})
w := etcdstore.Watch(ctx, client, key, true)
ch := make(chan store.WatchEventUserConfig, 10)
ch := make(chan *store.WatchEventUserConfig, 1)

go func() {
defer close(ch)
for response := range w.Result() {
logger.Info("read from user watch channel")
logger.Info("read from user config watcher channel")
if response.Type == store.WatchError {
logger.
WithError(errors.New(string(response.Object))).
Error("Unexpected error while watching for the user config updates")
ch <- store.WatchEventUserConfig{
ch <- &store.WatchEventUserConfig{
Action: response.Type,
}
continue
Expand All @@ -165,18 +164,13 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s
if err := proto.Unmarshal(response.Object, &userConfig); err != nil {
continue
}
if userConfig.Disabled {
fmt.Println("======= user watch event in watcher ========", userConfig.Username, userConfig.Disabled, response.Type)
}

ch <- store.WatchEventUserConfig{
ch <- &store.WatchEventUserConfig{
User: &userConfig,
Action: response.Type,
Disabled: userConfig.Disabled,
}
}
}()

logger.Println("----watch metadata----", w)
return ch
}
4 changes: 2 additions & 2 deletions backend/messaging/message_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func EntityConfigTopic(namespace, name string) string {
return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name)
}

func UserConfigTopic(namespace, name string) string {
return fmt.Sprintf("%s:%s:%s", TopicUserConfig, namespace, name)
func UserConfigTopic(name string) string {
return fmt.Sprintf("%s:%s", TopicUserConfig, name)
}

// SubscriptionTopic is a helper to determine the proper topic name for a
Expand Down
2 changes: 0 additions & 2 deletions backend/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ type WatchEventUserConfig struct {
User *corev2.User
Action WatchActionType
Disabled bool
//Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"`
////Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"`
}

// Store is used to abstract the durable storage used by the Sensu backend
Expand Down

0 comments on commit 22d80ea

Please sign in to comment.