Skip to content

Commit

Permalink
feat(config): wildcard interface name support & refactor lazybind (#758)
Browse files Browse the repository at this point in the history
Co-authored-by: mzz <[email protected]>
  • Loading branch information
LostAttractor and mzz2017 authored Mar 3, 2025
1 parent 79d3fd2 commit fa7318f
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 80 deletions.
151 changes: 151 additions & 0 deletions component/interface_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package component

import (
"context"
"path"
"sync"

"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)

type callbackSet struct {
pattern string
newCallback func(netlink.Link)
delCallback func(netlink.Link)
}

type InterfaceManager struct {
log *logrus.Logger
closed context.Context
close context.CancelFunc
mu sync.Mutex
callbacks []callbackSet
upLinks map[string]bool
}

func NewInterfaceManager(log *logrus.Logger) *InterfaceManager {
closed, toClose := context.WithCancel(context.Background())
mgr := &InterfaceManager{
log: log,
callbacks: make([]callbackSet, 0),
closed: closed,
close: toClose,
upLinks: make(map[string]bool),
}

ch := make(chan netlink.LinkUpdate)
done := make(chan struct{})
if e := netlink.LinkSubscribeWithOptions(ch, done, netlink.LinkSubscribeOptions{
ErrorCallback: func(err error) {
log.Debug("LinkSubscribe:", err)
},
ListExisting: true,
}); e != nil {
log.Errorf("Failed to subscribe to link updates: %v", e)
}

go mgr.monitor(ch, done)
return mgr
}

func (m *InterfaceManager) monitor(ch <-chan netlink.LinkUpdate, done chan struct{}) {
for {
select {
case <-m.closed.Done():
close(done)
return
case update := <-ch:
ifName := update.Link.Attrs().Name

switch update.Header.Type {
case unix.RTM_NEWLINK:
m.mu.Lock()
_, exists := m.upLinks[ifName]
if exists {
m.mu.Unlock()
continue
}
m.upLinks[ifName] = true
for _, callback := range m.callbacks {
matched, err := path.Match(callback.pattern, ifName)
if err != nil || !matched {
continue
}
if callback.newCallback != nil {
callback.newCallback(update.Link)
}
}
m.mu.Unlock()

case unix.RTM_DELLINK:
m.mu.Lock()
delete(m.upLinks, ifName)
for _, callback := range m.callbacks {
matched, err := path.Match(callback.pattern, ifName)
if err != nil || !matched {
continue
}
if callback.delCallback != nil {
callback.delCallback(update.Link)
}
}
m.mu.Unlock()
}
}
}
}

func (m *InterfaceManager) RegisterWithPattern(pattern string, initCallback func(netlink.Link), newCallback func(netlink.Link), delCallback func(netlink.Link)) {
m.mu.Lock()
defer m.mu.Unlock()

links, err := netlink.LinkList()
if err == nil {
for _, link := range links {
ifname := link.Attrs().Name
if matched, err := path.Match(pattern, ifname); err == nil && matched {
m.upLinks[ifname] = true

if initCallback != nil {
initCallback(link)
}
}
}
} else {
m.log.Errorf("Failed to get link list: %v", err)
}

m.callbacks = append(m.callbacks, callbackSet{
pattern: pattern,
newCallback: newCallback,
delCallback: delCallback,
})
}

func (m *InterfaceManager) Register(ifname string, initCallback func(netlink.Link), newCallback func(netlink.Link), delCallback func(netlink.Link)) {
m.mu.Lock()
defer m.mu.Unlock()

link, err := netlink.LinkByName(ifname)
if err == nil {
m.upLinks[ifname] = true

if initCallback != nil {
initCallback(link)
}
}

m.callbacks = append(m.callbacks, callbackSet{
pattern: ifname,
newCallback: newCallback,
delCallback: delCallback,
})
}

// Close cancels the context to stop the monitor goroutine
func (m *InterfaceManager) Close() error {
m.close()
return nil
}
8 changes: 2 additions & 6 deletions control/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,7 @@ func NewControlPlane(
}
global.LanInterface = common.Deduplicate(global.LanInterface)
for _, ifname := range global.LanInterface {
if err = core.bindLan(ifname, global.AutoConfigKernelParameter); err != nil {
return nil, fmt.Errorf("bindLan: %v: %w", ifname, err)
}
core.bindLan(ifname, global.AutoConfigKernelParameter)
}
}
// Bind to WAN
Expand All @@ -249,9 +247,7 @@ func NewControlPlane(
}
}
}
if err = core.bindWan(ifname, global.AutoConfigKernelParameter); err != nil {
return nil, fmt.Errorf("bindWan: %v: %w", ifname, err)
}
core.bindWan(ifname, global.AutoConfigKernelParameter)
}
}
// Bind to dae0 and dae0peer
Expand Down
133 changes: 59 additions & 74 deletions control/control_plane_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package control

import (
"context"
"errors"
"fmt"
"net/netip"
"os"
Expand All @@ -19,6 +18,7 @@ import (
ciliumLink "github.com/cilium/ebpf/link"
"github.com/daeuniverse/dae/common"
"github.com/daeuniverse/dae/common/consts"
"github.com/daeuniverse/dae/component"
internal "github.com/daeuniverse/dae/pkg/ebpf_internal"
dnsmessage "github.com/miekg/dns"
"github.com/mohae/deepcopy"
Expand Down Expand Up @@ -47,6 +47,7 @@ type controlPlaneCore struct {

closed context.Context
close context.CancelFunc
ifmgr *component.InterfaceManager
}

func newControlPlaneCore(log *logrus.Logger,
Expand All @@ -63,6 +64,8 @@ func newControlPlaneCore(log *logrus.Logger,
deferFuncs = append(deferFuncs, bpf.Close)
}
closed, toClose := context.WithCancel(context.Background())
ifmgr := component.NewInterfaceManager(log)
deferFuncs = append(deferFuncs, ifmgr.Close)
return &controlPlaneCore{
log: log,
deferFuncs: deferFuncs,
Expand All @@ -72,6 +75,7 @@ func newControlPlaneCore(log *logrus.Logger,
flip: coreFlip,
isReload: isReload,
bpfEjected: false,
ifmgr: ifmgr,
closed: closed,
close: toClose,
}
Expand Down Expand Up @@ -192,86 +196,40 @@ func (c *controlPlaneCore) delQdisc(ifname string) error {
return nil
}

func (c *controlPlaneCore) addLinkCb(_ifname string, rtmType uint16, cb func()) error {
ch := make(chan netlink.LinkUpdate)
done := make(chan struct{})
if e := netlink.LinkSubscribeWithOptions(ch, done, netlink.LinkSubscribeOptions{
ErrorCallback: func(err error) {
c.log.Debug("LinkSubscribe:", err)
},
ListExisting: true,
}); e != nil {
return e
}
go func(ctx context.Context, ch <-chan netlink.LinkUpdate, done chan struct{}) {
for {
select {
case <-ctx.Done():
close(done)
return
case <-done:
return
case update := <-ch:
if update.Header.Type == rtmType {
ifname := update.Link.Attrs().Name
if ifname == _ifname {
cb()
close(done)
return
}
}
}
}
}(c.closed, ch, done)
return nil
}

// addNewLinkBindLanCb waits for NEWLINK msg of given `ifname` and invokes `bindLan`.
func (c *controlPlaneCore) addNewLinkBindLanCb(ifname string, autoConfigKernelParameter bool) error {
return c.addLinkCb(ifname, unix.RTM_NEWLINK, func() {
c.log.Warnf("New link creation of '%v' is detected. Bind LAN program to it.", ifname)
if err := c.addQdisc(ifname); err != nil {
c.log.Errorf("addQdisc: %v", err)
// bindLan automatically configures kernel parameters and bind to lan interface `ifname`.
// bindLan supports lazy-bind if interface `ifname` is not found.
// bindLan supports rebinding when the interface `ifname` is detected in the future.
func (c *controlPlaneCore) bindLan(ifname string, autoConfigKernelParameter bool) {
initlinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
if err := c.bindLan(ifname, autoConfigKernelParameter); err != nil {
if autoConfigKernelParameter {
SetSendRedirects(link.Attrs().Name, "0")
SetForwarding(link.Attrs().Name, "1")
}
if err := c._bindLan(link.Attrs().Name); err != nil {
c.log.Errorf("bindLan: %v", err)
}
})
}

// bindLan automatically configures kernel parameters and bind to lan interface `ifname`.
// bindLan supports lazy-bind if interface `ifname` is not found.
// bindLan supports rebinding when the interface `ifname` is deleted in the future.
func (c *controlPlaneCore) bindLan(ifname string, autoConfigKernelParameter bool) error {
if autoConfigKernelParameter {
SetSendRedirects(ifname, "0")
SetForwarding(ifname, "1")
}
if err := c._bindLan(ifname); err != nil {
var notFoundErr netlink.LinkNotFoundError
if !errors.As(err, &notFoundErr) {
return err
}
newlinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
// Not found error.

// Listen for `NEWLINK` to bind.
c.log.Warnf("Link '%v' is not found. Bind LAN program to it once it is created.", ifname)
if e := c.addNewLinkBindLanCb(ifname, autoConfigKernelParameter); e != nil {
return fmt.Errorf("%w: %v", err, e)
c.log.Warnf("New link creation of '%v' is detected. Bind LAN program to it.", link.Attrs().Name)
if err := c.addQdisc(link.Attrs().Name); err != nil {
c.log.Errorf("addQdisc: %v", err)
return
}
return nil
initlinkCallback(link)
}
// Listen for `DELLINK` and add `NEWLINK` callback to re-bind.
if err := c.addLinkCb(ifname, unix.RTM_DELLINK, func() {
c.log.Warnf("Link deletion of '%v' is detected. Bind LAN program to it once it is re-created.", ifname)
if e := c.addNewLinkBindLanCb(ifname, autoConfigKernelParameter); e != nil {
c.log.Errorf("Failed to add callback for re-bind LAN program to '%v': %v", ifname, e)
dellinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
}); err != nil {
return fmt.Errorf("failed to add re-bind callback: %w", err)
c.log.Warnf("Link deletion of '%v' is detected. Bind LAN program to it once it is re-created.", link.Attrs().Name)
}
return nil
c.ifmgr.RegisterWithPattern(ifname, initlinkCallback, newlinkCallback, dellinkCallback)
}

func (c *controlPlaneCore) _bindLan(ifname string) error {
Expand Down Expand Up @@ -441,8 +399,35 @@ func (c *controlPlaneCore) setupLocalTcpFastRedirect() (err error) {

}

func (c *controlPlaneCore) bindWan(ifname string, autoConfigKernelParameter bool) error {
return c._bindWan(ifname)
// bindWan supports lazy-bind if interface `ifname` is not found.
// bindWan supports rebinding when the interface `ifname` is detected in the future.
func (c *controlPlaneCore) bindWan(ifname string, autoConfigKernelParameter bool) {
initlinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
if err := c._bindWan(link.Attrs().Name); err != nil {
c.log.Errorf("bindWan: %v", err)
}
}
newlinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
c.log.Warnf("New link creation of '%v' is detected. Bind WAN program to it.", link.Attrs().Name)
if err := c.addQdisc(link.Attrs().Name); err != nil {
c.log.Errorf("addQdisc: %v", err)
return
}
initlinkCallback(link)
}
dellinkCallback := func(link netlink.Link) {
if link.Attrs().Name == HostVethName {
return
}
c.log.Warnf("Link deletion of '%v' is detected. Bind WAN program to it once it is re-created.", link.Attrs().Name)
}
c.ifmgr.RegisterWithPattern(ifname, initlinkCallback, newlinkCallback, dellinkCallback)
}

func (c *controlPlaneCore) _bindWan(ifname string) error {
Expand Down

0 comments on commit fa7318f

Please sign in to comment.