Skip to content

Commit

Permalink
Merge branch 'FabEdge:main' into iptables-helper
Browse files Browse the repository at this point in the history
  • Loading branch information
lostcharlie authored Sep 11, 2023
2 parents cb3f894 + a0b8cba commit ece170f
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (cfg Config) Manager() (*Manager, error) {
tm, err := strongswan.New(
strongswan.StartAction("clear"),
strongswan.DpdDelay("10s"),
strongswan.DpdAction("trap"),
strongswan.InitTimeout(cfg.TunnelInitTimeout),
)
if err != nil {
Expand Down
47 changes: 36 additions & 11 deletions pkg/cloud-agent/cloud_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
errAtLeaseOneConnector = fmt.Errorf("at least one connector node address is needed")
)

type cloudAgent struct {
type CloudAgent struct {
debounce func(f func())
iph *IptablesHandler
iph6 *IptablesHandler
Expand All @@ -71,15 +71,15 @@ func Execute() {
os.Exit(1)
}

agent, err := newCloudAgent()
agent, err := NewCloudAgent()
if err != nil {
logger.Error(err, "failed to create cloud agent")
os.Exit(1)
}

var mc *memberlist.Client
for {
mc, err = memberlist.New(initMembers, agent.handleMessage, agent.handleNodeLeave)
mc, err = memberlist.New(initMembers, agent.HandleMessage, agent.HandleNodeLeave)
if err == nil {
break
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func Execute() {
}
}

func newCloudAgent() (*cloudAgent, error) {
func NewCloudAgent() (*CloudAgent, error) {
iph, err := newIptableHandler(iptables.ProtocolIPv4)
if err != nil {
return nil, err
Expand All @@ -122,15 +122,15 @@ func newCloudAgent() (*cloudAgent, error) {
return nil, fmt.Errorf("at lease one iptablesHandler is required")
}

return &cloudAgent{
return &CloudAgent{
iph: iph,
iph6: iph6,
debounce: debounce.New(10 * time.Second),
routesByHost: make(map[string][]netlink.Route),
}, nil
}

func (a *cloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) {
func (a *CloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) {
if a.iph != nil {
go a.iph.maintainRules(cp.RemotePrefixes)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (a *cloudAgent) addAndSaveRoutes(cp routing.ConnectorPrefixes) {
logger.V(5).Info("routes are synced", "routes", routes)
}

func (a *cloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string) []netlink.Route {
func (a *CloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string) []netlink.Route {
if len(localPrefixes) == 0 || len(remotePrefixes) == 0 {
logger.V(5).Info("no localPrefixes or no remotePrefixes, skip synchronizing routes")
return nil
Expand Down Expand Up @@ -202,7 +202,7 @@ func (a *cloudAgent) syncRoutes(localPrefixes []string, remotePrefixes []string)
return routes
}

func (a *cloudAgent) handleMessage(msgBytes []byte) {
func (a *CloudAgent) HandleMessage(msgBytes []byte) {
a.debounce(func() {
var cp routing.ConnectorPrefixes
if err := json.Unmarshal(msgBytes, &cp); err != nil {
Expand All @@ -216,7 +216,7 @@ func (a *cloudAgent) handleMessage(msgBytes []byte) {
})
}

func (a *cloudAgent) deleteRoutesByHost(host string) {
func (a *CloudAgent) deleteRoutesByHost(host string) {
routes := func() []netlink.Route {
a.routesLock.Lock()
a.routesLock.Unlock()
Expand All @@ -236,14 +236,39 @@ func (a *cloudAgent) deleteRoutesByHost(host string) {
}
}

func (a *cloudAgent) handleNodeLeave(name string) {
func (a *CloudAgent) HandleNodeLeave(name string) {
logger.V(5).Info("A node has left, to delete all routes via it", "node", name)
go a.deleteRoutesByHost(name)
}

func (a *CloudAgent) CleanAll() {
if a.iph != nil {
if err := a.iph.clearRules(); err != nil {
logger.Error(err, "failed to clear iptables rules")
}
}

if a.iph6 != nil {
if err := a.iph6.clearRules(); err != nil {
logger.Error(err, "failed to clear iptables rules")
}
}

var hosts []string
a.routesLock.Lock()
for host := range a.routesByHost {
hosts = append(hosts, host)
}

a.routesLock.Unlock()
for _, host := range hosts {
a.deleteRoutesByHost(host)
}
}

// if there is data in routesByHost, this cloud-agent must have lost
// connection to connector
func (a *cloudAgent) isConnectorLost() bool {
func (a *CloudAgent) isConnectorLost() bool {
a.routesLock.RLock()
defer a.routesLock.RUnlock()

Expand Down
8 changes: 8 additions & 0 deletions pkg/cloud-agent/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,11 @@ func (h IptablesHandler) syncRemotePodCIDRSet(remotePodCIDRs []string) error {

return h.ipset.EnsureIPSet(set, sets.NewString(remotePodCIDRs...))
}

func (h IptablesHandler) clearRules() error {
if err := h.ipt.ClearChain(TableNat, ChainFabEdgePostRouting); err != nil {

Check failure on line 156 in pkg/cloud-agent/iptables.go

View workflow job for this annotation

GitHub Actions / Unit Test

undefined: TableNat

Check failure on line 156 in pkg/cloud-agent/iptables.go

View workflow job for this annotation

GitHub Actions / Unit Test

undefined: ChainFabEdgePostRouting
return err
}

return h.ipt.ClearChain(TableFilter, ChainFabEdgeForward)

Check failure on line 160 in pkg/cloud-agent/iptables.go

View workflow job for this annotation

GitHub Actions / Unit Test

undefined: TableFilter

Check failure on line 160 in pkg/cloud-agent/iptables.go

View workflow job for this annotation

GitHub Actions / Unit Test

undefined: ChainFabEdgeForward
}
Loading

0 comments on commit ece170f

Please sign in to comment.