diff --git a/pkg/agent/config.go b/pkg/agent/config.go index a555ef8..de80dd7 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -167,6 +167,7 @@ func (cfg Config) Manager() (*Manager, error) { tm, err := strongswan.New( strongswan.StartAction("clear"), strongswan.DpdDelay("10s"), + strongswan.DpdAction("trap"), strongswan.InitTimeout(cfg.TunnelInitTimeout), ) if err != nil { diff --git a/pkg/cloud-agent/cloud_agent.go b/pkg/cloud-agent/cloud_agent.go index 1f75f63..49c92aa 100644 --- a/pkg/cloud-agent/cloud_agent.go +++ b/pkg/cloud-agent/cloud_agent.go @@ -44,7 +44,7 @@ var ( errAtLeaseOneConnector = fmt.Errorf("at least one connector node address is needed") ) -type cloudAgent struct { +type CloudAgent struct { debounce func(f func()) iph *IptablesHandler iph6 *IptablesHandler @@ -71,7 +71,7 @@ func Execute() { os.Exit(1) } - agent, err := newCloudAgent() + agent, err := NewCloudAgent() if err != nil { logger.Error(err, "failed to create cloud agent") os.Exit(1) @@ -79,7 +79,7 @@ func Execute() { var mc *memberlist.Client for { - mc, err = memberlist.New(initMembers, agent.handleMessage, agent.handleNodeLeave) + mc, err = memberlist.New(initMembers, agent.HandleMessage, agent.HandleNodeLeave) if err == nil { break } @@ -107,7 +107,7 @@ func Execute() { } } -func newCloudAgent() (*cloudAgent, error) { +func NewCloudAgent() (*CloudAgent, error) { iph, err := newIptableHandler(iptables.ProtocolIPv4) if err != nil { return nil, err @@ -122,7 +122,7 @@ func newCloudAgent() (*cloudAgent, error) { return nil, fmt.Errorf("at lease one iptablesHandler is required") } - return &cloudAgent{ + return &CloudAgent{ iph: iph, iph6: iph6, debounce: debounce.New(10 * time.Second), @@ -130,7 +130,7 @@ func newCloudAgent() (*cloudAgent, error) { }, nil } -func (a *cloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) { +func (a *CloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) { if a.iph != nil { go a.iph.maintainRules(cp.RemotePrefixes) } @@ -162,7 +162,7 @@ func (a *cloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) { logger.V(5).Info("routes are synced", "routes", routes) } -func (a *cloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string) []netlink.Route { +func (a *CloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string) []netlink.Route { if len(localPrefixes) == 0 || len(remotePrefixes) == 0 { logger.V(5).Info("no localPrefixes or no remotePrefixes, skip synchronizing routes") return nil @@ -202,7 +202,7 @@ func (a *cloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string) return routes } -func (a *cloudAgent) handleMessage(msgBytes []byte) { +func (a *CloudAgent) HandleMessage(msgBytes []byte) { a.debounce(func() { var cp routing.ConnectorPrefixes if err := json.Unmarshal(msgBytes, &cp); err != nil { @@ -216,7 +216,7 @@ func (a *cloudAgent) handleMessage(msgBytes []byte) { }) } -func (a *cloudAgent) deleteRoutesByHost(host string) { +func (a *CloudAgent) deleteRoutesByHost(host string) { routes := func() []netlink.Route { a.routesLock.Lock() a.routesLock.Unlock() @@ -236,14 +236,39 @@ func (a *cloudAgent) deleteRoutesByHost(host string) { } } -func (a *cloudAgent) handleNodeLeave(name string) { +func (a *CloudAgent) HandleNodeLeave(name string) { logger.V(5).Info("A node has left, to delete all routes via it", "node", name) go a.deleteRoutesByHost(name) } +func (a *CloudAgent) CleanAll() { + if a.iph != nil { + if err := a.iph.clearRules(); err != nil { + logger.Error(err, "failed to clear iptables rules") + } + } + + if a.iph6 != nil { + if err := a.iph6.clearRules(); err != nil { + logger.Error(err, "failed to clear iptables rules") + } + } + + var hosts []string + a.routesLock.Lock() + for host := range a.routesByHost { + hosts = append(hosts, host) + } + + a.routesLock.Unlock() + for _, host := range hosts { + a.deleteRoutesByHost(host) + } +} + // if there is data in routesByHost, this cloud-agent must have lost // connection to connector -func (a *cloudAgent) isConnectorLost() bool { +func (a *CloudAgent) isConnectorLost() bool { a.routesLock.RLock() defer a.routesLock.RUnlock() diff --git a/pkg/cloud-agent/iptables.go b/pkg/cloud-agent/iptables.go index 2cb5e34..d11166e 100644 --- a/pkg/cloud-agent/iptables.go +++ b/pkg/cloud-agent/iptables.go @@ -151,3 +151,11 @@ func (h IptablesHandler) syncRemotePodCIDRSet(remotePodCIDRs []string) error { return h.ipset.EnsureIPSet(set, sets.NewString(remotePodCIDRs...)) } + +func (h IptablesHandler) clearRules() error { + if err := h.ipt.ClearChain(TableNat, ChainFabEdgePostRouting); err != nil { + return err + } + + return h.ipt.ClearChain(TableFilter, ChainFabEdgeForward) +} diff --git a/pkg/connector/manager.go b/pkg/connector/manager.go index 4e91d53..700dad8 100644 --- a/pkg/connector/manager.go +++ b/pkg/connector/manager.go @@ -15,17 +15,28 @@ package connector import ( + "context" "encoding/json" + "net/http" "os" "os/signal" "syscall" "time" debpkg "github.com/bep/debounce" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" "github.com/go-logr/logr" "github.com/spf13/pflag" + "go.uber.org/atomic" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2/klogr" + cloud_agent "github.com/fabedge/fabedge/pkg/cloud-agent" "github.com/fabedge/fabedge/pkg/common/about" "github.com/fabedge/fabedge/pkg/connector/routing" "github.com/fabedge/fabedge/pkg/tunnel" @@ -44,6 +55,11 @@ type Manager struct { mc *memberlist.Client log logr.Logger + kubeClient *clientset.Clientset + isLeader *atomic.Bool + + cloudAgent *cloud_agent.CloudAgent + events chan struct{} debounce func(func()) } @@ -57,10 +73,15 @@ type Config struct { CNIType string InitMembers []string TunnelInitTimeout uint -} + ListenAddress string -func msgHandler(b []byte) {} -func nodeLeveHandler(name string) {} + LeaderElection struct { + LockName string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration + } +} func (c *Config) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.TunnelConfigFile, "tunnel-config", "/etc/fabedge/tunnels.yaml", "tunnel config file") @@ -71,6 +92,11 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&c.DebounceDuration, "debounce-duration", 5*time.Second, "period to sync routes/rules") fs.StringSliceVar(&c.InitMembers, "connector-node-addresses", []string{}, "internal address of all connector nodes") fs.UintVar(&c.TunnelInitTimeout, "tunnel-init-timeout", 10, "The timeout of tunnel initiation. Unit: second") + fs.StringVar(&c.LeaderElection.LockName, "leader-lock-name", "connector", "The name of leader lock") + fs.DurationVar(&c.LeaderElection.LeaseDuration, "leader-lease-duration", 15*time.Second, "The duration that non-leader candidates will wait to force acquire leadership") + fs.DurationVar(&c.LeaderElection.RenewDeadline, "leader-renew-deadline", 10*time.Second, "The duration that the acting controlplane will retry refreshing leadership before giving up") + fs.DurationVar(&c.LeaderElection.RetryPeriod, "leader-retry-period", 2*time.Second, "The duration that the LeaderElector clients should wait between tries of actions") + fs.StringVar(&c.ListenAddress, "listen-address", "127.0.0.1:30306", "The address of http server") } func (c Config) Manager() (*Manager, error) { @@ -87,32 +113,52 @@ func (c Config) Manager() (*Manager, error) { return nil, err } - mc, err := memberlist.New(c.InitMembers, msgHandler, nodeLeveHandler) + ipt, err := newIP4TablesHandler() if err != nil { return nil, err } - ipt, err := newIP4TablesHandler() + ipt6, err := newIP6TablesHandler() if err != nil { return nil, err } - ipt6, err := newIP6TablesHandler() + config, err := rest.InClusterConfig() if err != nil { return nil, err } + client := clientset.NewForConfigOrDie(config) - return &Manager{ + cloudAgent, err := cloud_agent.NewCloudAgent() + if err != nil { + return nil, err + } + + manager := &Manager{ Config: c, tm: tm, iptHandler: ipt, ipt6Handler: ipt6, router: router, - mc: mc, - log: klogr.New().WithName("manager"), - events: make(chan struct{}), - debounce: debpkg.New(c.DebounceDuration), - }, nil + + kubeClient: client, + isLeader: atomic.NewBool(false), + + cloudAgent: cloudAgent, + + log: klogr.New().WithName("manager"), + + events: make(chan struct{}), + debounce: debpkg.New(c.DebounceDuration), + } + + mc, err := memberlist.New(c.InitMembers, manager.handleMessage, manager.handleNodeLeave) + if err != nil { + return nil, err + } + manager.mc = mc + + return manager, nil } func (m *Manager) startTick() { @@ -136,6 +182,8 @@ func (m *Manager) Start() { m.clearFabEdgeIptablesChains() + go m.runLeaderElection() + go m.runHTTPServer() go m.workLoop() go m.startTick() go m.onConfigFileChange(m.TunnelConfigFile) @@ -151,16 +199,60 @@ func (m *Manager) Start() { m.log.Info("manager stopped") } +func (m *Manager) runLeaderElection() { + lock := newLock(m.LeaderElection.LockName, m.kubeClient) + leaderID := lock.Identity() + + leaderelection.RunOrDie(context.Background(), leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: m.LeaderElection.LeaseDuration, + RenewDeadline: m.LeaderElection.RenewDeadline, + RetryPeriod: m.LeaderElection.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + m.log.V(3).Info("Get leader role, clear iptables rules generated as cloud agent") + m.cloudAgent.CleanAll() + m.isLeader.Store(true) + m.notify() + }, + OnStoppedLeading: func() { + m.log.V(3).Info("Lose leader role, clear iptables and routes") + m.clearAll() + m.isLeader.Store(false) + }, + OnNewLeader: func(currentID string) { + if currentID == leaderID { + m.log.V(5).Info("Still be the leader!") + } else { + m.log.V(3).Info("Leader has changed", "NewLeaderID", currentID) + } + }, + }, + }) +} + func (m *Manager) gracefulShutdown() { err := m.router.CleanRoutes(m.connections) if err != nil { m.log.Error(err, "failed to clean routers") } - m.CleanSNatIPTablesRules() + m.cleanSNatIPTablesRules() } -func (m *Manager) CleanSNatIPTablesRules() { +func (m *Manager) clearAll() { + err := m.router.CleanRoutes(m.connections) + if err != nil { + m.log.Error(err, "failed to clean routers") + } + + m.cleanSNatIPTablesRules() + m.clearFabEdgeIptablesChains() + m.clearConnections() +} + +func (m *Manager) cleanSNatIPTablesRules() { for _, ipt := range []*IPTablesHandler{m.iptHandler, m.ipt6Handler} { if err := ipt.CleanSNatIPTablesRules(); err != nil { m.log.Error(err, "failed to clean iptables") @@ -176,31 +268,16 @@ func (m *Manager) clearFabEdgeIptablesChains() { } } -func (m *Manager) mainRoutes() { - active, err := m.tm.IsActive() - if err != nil { - m.log.Error(err, "failed to get tunnel manager status") +func (m *Manager) maintainRoutes() { + m.log.V(5).Info("tunnel manager is active, try to synchronize routes in table 220") + if err := m.router.SyncRoutes(m.connections); err != nil { + m.log.Error(err, "failed to sync routes") return } - - if active { - m.log.V(5).Info("tunnel manager is active, try to synchronize routes in table 220") - if err = m.router.SyncRoutes(m.connections); err != nil { - m.log.Error(err, "failed to sync routes") - return - } - } else { - m.log.V(5).Info("tunnel manager is not active, try to clean routes in route table 220") - if err = m.router.CleanRoutes(m.connections); err != nil { - m.log.Error(err, "failed to clean routes") - return - } - } - m.log.V(5).Info("routes are synced") } -func (m *Manager) mainTunnels() { +func (m *Manager) maintainTunnels() { if err := m.syncConnections(); err != nil { m.log.Error(err, "error when to sync tunnels") } else { @@ -228,14 +305,85 @@ func (m *Manager) broadcastConnectorPrefixes() { log.V(5).Info("connector prefixes is broadcast to cloud-agents") } +func (m *Manager) handleMessage(msgBytes []byte) { + if m.isLeader.Load() { + return + } + + m.cloudAgent.HandleMessage(msgBytes) +} + +func (m *Manager) handleNodeLeave(name string) { + m.cloudAgent.HandleNodeLeave(name) +} + func (m *Manager) workLoop() { for range m.events { - m.mainTunnels() - m.mainRoutes() + if !m.isLeader.Load() { + continue + } + + m.maintainTunnels() + m.maintainRoutes() m.broadcastConnectorPrefixes() - m.iptHandler.maintainIPTables() - m.ipt6Handler.maintainIPTables() + m.iptHandler.maintainIPSet() + m.iptHandler.maintainIPTables() + m.ipt6Handler.maintainIPSet() + m.ipt6Handler.maintainIPTables() + } +} + +func (m *Manager) runHTTPServer() { + r := chi.NewRouter() + r.Use(middleware.Recoverer) + r.Get("/is-leader", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(m.isLeader.String())) + }) + server := &http.Server{ + Addr: m.ListenAddress, + Handler: r, + } + + for { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + m.log.Error(err, "failed to start http server") + } + break + } +} + +// getConnectorName will return a valid name as leader election ID +func getConnectorName() string { + hostname, _ := os.Hostname() + if hostname != "" { + return hostname + } + + hostname = os.Getenv("HOSTNAME") + if hostname != "" { + return hostname + } + + podName := os.Getenv("POD_NAME") + return podName +} + +// getNamespace return the namespace where connector pod is running +func getNamespace() string { + return os.Getenv("NAMESPACE") +} + +func newLock(lockName string, client *clientset.Clientset) *resourcelock.LeaseLock { + return &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: lockName, + Namespace: getNamespace(), + }, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: getConnectorName(), + }, } } diff --git a/pkg/connector/tunnel.go b/pkg/connector/tunnel.go index a1d7aa0..41082bf 100644 --- a/pkg/connector/tunnel.go +++ b/pkg/connector/tunnel.go @@ -193,6 +193,22 @@ func (m *Manager) syncConnections() error { return nil } +func (m *Manager) clearConnections() { + oldNames, err := m.tm.ListConnNames() + if err != nil { + m.log.Error(err, "failed to get existing connection from tunnel manager") + return + } + + for _, name := range oldNames { + if err = m.tm.UnloadConn(name); err != nil { + m.log.Error(err, "failed to unload tunnel connection", "name", name) + } else { + m.log.V(5).Info("A staled tunnel connection is unloaded", "name", name) + } + } +} + // isIP6 check if ip is an IP6 address or a CIDR address func isIPv6(ip string) bool { return strings.IndexByte(ip, ':') != -1 diff --git a/pkg/operator/controllers/agent/confighandler.go b/pkg/operator/controllers/agent/confighandler.go index c12f3cf..d6791d9 100644 --- a/pkg/operator/controllers/agent/confighandler.go +++ b/pkg/operator/controllers/agent/confighandler.go @@ -124,9 +124,7 @@ func (handler *configHandler) buildNetworkConf(nodeName string) netconf.NetworkC Peers: make([]apis.Endpoint, 0, len(peerEndpoints)), } - for _, ep := range peerEndpoints { - conf.Peers = append(conf.Peers, ep) - } + conf.Peers = append(conf.Peers, peerEndpoints...) mediator, found := store.GetEndpoint(constants.DefaultMediatorName) if found { diff --git a/pkg/operator/controllers/connector/controller.go b/pkg/operator/controllers/connector/controller.go index 8ad2d11..377da90 100644 --- a/pkg/operator/controllers/connector/controller.go +++ b/pkg/operator/controllers/connector/controller.go @@ -336,9 +336,7 @@ func (ctl *controller) getPeers() []apis.Endpoint { endpoints := ctl.Store.GetEndpoints(nameSet.List()...) peers := make([]apis.Endpoint, 0, len(endpoints)) - for _, ep := range endpoints { - peers = append(peers, ep) - } + peers = append(peers, endpoints...) return peers }