Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
Signed-off-by: SudhanshuBawane <[email protected]>
  • Loading branch information
SudhanshuBawane committed Feb 10, 2024
1 parent 4731b2b commit 5b0a1b5
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 2 deletions.
15 changes: 14 additions & 1 deletion backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/apid/actions"
"github.com/sensu/sensu-go/backend/apid/middlewares"
"github.com/sensu/sensu-go/backend/apid/routers"
Expand Down Expand Up @@ -105,6 +105,7 @@ type Agentd struct {
serveWaitTime time.Duration
ready func()
backendEntity *corev2.Entity
userWatcher <-chan store.WatchEventUserConfig
}

// Config configures an Agentd.
Expand All @@ -121,6 +122,7 @@ type Config struct {
EtcdClientTLSConfig *tls.Config
Watcher <-chan store.WatchEventEntityConfig
BackendEntity *corev2.Entity
UserWatcher <-chan store.WatchEventUserConfig
}

// Option is a functional option.
Expand Down Expand Up @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) {
etcdClientTLSConfig: c.EtcdClientTLSConfig,
serveWaitTime: c.ServeWaitTime,
backendEntity: c.BackendEntity,
userWatcher: c.UserWatcher,
}

// prepare server TLS config
Expand Down Expand Up @@ -291,6 +294,7 @@ func (a *Agentd) runWatcher() {
}
}

// adding the config updates to the etcd bus for watcher to consume
func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
if event.Entity == nil {
return errors.New("nil entity received from entity config watcher")
Expand All @@ -308,6 +312,15 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
return nil
}

// adding the UserConfig updates to the etcd bus for the watcher to consume
func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error {

Check failure on line 316 in backend/agentd/agentd.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

func (*Agentd).handleUserEvent is unused (U1000)
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}

return nil
}

// Stop Agentd.
func (a *Agentd) Stop() error {
a.cancel()
Expand Down
60 changes: 59 additions & 1 deletion backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/metrics"
"github.com/sensu/sensu-go/backend/ringv2"
Expand Down Expand Up @@ -95,6 +95,7 @@ type Session struct {
marshal agent.MarshalFunc
unmarshal agent.UnmarshalFunc
entityConfig *entityConfig
userConfig *userConfig
mu sync.Mutex
subscriptionsMap map[string]subscription
}
Expand All @@ -111,12 +112,23 @@ type entityConfig struct {
updatesChannel chan interface{}
}

// userConfig is used by a session to subscribe to entity config updates
type userConfig struct {
subscription chan messaging.Subscription

Check failure on line 117 in backend/agentd/session.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

field subscription is unused (U1000)
updatesChannel chan interface{}
}

// Receiver returns the channel for incoming entity updates from the entity
// watcher
func (e *entityConfig) Receiver() chan<- interface{} {
return e.updatesChannel
}

// Receiver returns the channel for incoming entity updates from the entity watcher
func (u *userConfig) Receiver() chan<- interface{} {
return u.updatesChannel
}

func newSessionHandler(s *Session) *handler.MessageHandler {
handler := handler.NewMessageHandler()
handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive)
Expand Down Expand Up @@ -321,6 +333,52 @@ func (s *Session) sender() {
for {
var msg *transport.Message
select {
//sudhanshu#2608 ---- user -----
case u := <-s.userConfig.updatesChannel:
var usr *corev2.User
watchEvent, ok := u.(*store.WatchEventUserConfig)
if !ok {
logger.Errorf("Session received unexpected struct: %T", u)
continue
}

// Handle the delete and unknown watch events
switch watchEvent.Action {
case store.WatchDelete:
//stop session
return
case store.WatchUnknown:
logger.Error("session received unknown watch event")
continue
}

if watchEvent.User == nil {
logger.Error("session received nil user in watch event")
continue
}

lager := logger.WithFields(logrus.Fields{
"action": watchEvent.Action.String(),
"user": watchEvent.User.GetMetadata().Name,
"namespace": watchEvent.User.GetMetadata().Namespace,
})
logger.Debug("User update received")

bytes, err := s.marshal(watchEvent.User)
if err != nil {
lager.WithError(err).Error("session failed to serialize entity config")
continue
}

// determine if user was disabled
if err := usr.Disabled; err {
lager.Debug("The user is now disabled ", err)
}

msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// -----entity -------

case e := <-s.entityConfig.updatesChannel:
watchEvent, ok := e.(*store.WatchEventEntityConfig)
if !ok {
Expand Down
51 changes: 51 additions & 0 deletions backend/agentd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,54 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan

return ch
}

// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806
// over the bus as store.WatchEventUserConfig
func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig {
key := etcdstorev2.StoreKey(storev2.ResourceRequest{
Context: ctx,
StoreName: new(corev2.User).StoreName(),
})
w := etcdstore.Watch(ctx, client, key, true)
ch := make(chan store.WatchEventUserConfig, 1)

go func() {
defer close(ch)
for response := range w.Result() {
if response.Type == store.WatchError {
logger.
WithError(errors.New(string(response.Object))).
Error("Unexpected error while watching for the user config updates")
ch <- store.WatchEventUserConfig{
Action: response.Type,
}
continue
}
var (
configWrapper wrap.Wrapper
userConfig corev2.User
)

// Decode and unwrap the entity config

if err := proto.Unmarshal(response.Object, &configWrapper); err != nil {
logger.WithField("key", response.Key).WithError(err).
Error("unable to unmarshal user config from key")
continue
}

// Remove the managed_by label if the value is sensu-agent, in case the user is disabled
if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel)
}

ch <- store.WatchEventUserConfig{
Action: response.Type,
User: &userConfig,
}
}
}()

logger.Println("----watch metadata----", w)
return ch
}
4 changes: 4 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
// Start the entity config watcher, so agentd sessions are notified of updates
entityConfigWatcher := agentd.GetEntityConfigWatcher(b.ctx, b.Client)

// Start the user config watcher, so agentd sessions are notified of updates
userConfigWatcher := agentd.GetUserConfigWatcher(b.ctx, b.Client)

// Prepare the etcd client TLS config
etcdClientTLSInfo := (transport.TLSInfo)(config.EtcdClientTLSInfo)
etcdClientTLSConfig, err := etcdClientTLSInfo.ClientConfig()
Expand Down Expand Up @@ -661,6 +664,7 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
Watcher: entityConfigWatcher,
EtcdClientTLSConfig: b.EtcdClientTLSConfig,
BackendEntity: backendEntity,
UserWatcher: userConfigWatcher,
})
if err != nil {
return nil, fmt.Errorf("error initializing %s: %s", agent.Name(), err)
Expand Down
7 changes: 7 additions & 0 deletions backend/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ type WatchEventEntityConfig struct {
Action WatchActionType
}

// WatchEventUserConfig contains and updated entity config and the action that
// occurred during this modification
type WatchEventUserConfig struct {
User *corev2.User
Action WatchActionType
}

// Store is used to abstract the durable storage used by the Sensu backend
// processses. Each Sensu resources is represented by its own interface. A
// MockStore is available in order to mock a store implementation
Expand Down
3 changes: 3 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
// MessageTypeEntityConfig is the message type sent for entity config updates
MessageTypeEntityConfig = "entity_config"

// MessageTypeUserConfig is the message type sent for entity config updates
MessageTypeUserConfig = "user_config"

// HeaderKeyAgentName is the HTTP request header specifying the Agent name
HeaderKeyAgentName = "Sensu-AgentName"

Expand Down

0 comments on commit 5b0a1b5

Please sign in to comment.