Skip to content

Commit

Permalink
Merge pull request #174 from coroot/invalidate_ingnored_containers_cache
Browse files Browse the repository at this point in the history
containers: invalidate ignored containers cache
  • Loading branch information
apetruhin authored Feb 4, 2025
2 parents ff54f40 + 0789560 commit b64d949
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
5 changes: 5 additions & 0 deletions cgroup/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func TestContainerByCgroup(t *testing.T) {
as.Equal("49f9e8e5395d57c1083996c09e2e6f042d5fe1ec0310facab32f94912b35ce59", id)
as.Nil(err)

typ, id, err = containerByCgroup("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podea19ff5d_943a_4466_a07e_a71e9e50cc62.slice/crio-21572039dd8398ff8272b031fa5422a40165145ab37f2f8794e1e7f844fe8118.scope/container")
as.Equal(typ, ContainerTypeCrio)
as.Equal("21572039dd8398ff8272b031fa5422a40165145ab37f2f8794e1e7f844fe8118", id)
as.Nil(err)

typ, id, err = containerByCgroup("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3e61c214bc3ed9ff81e21474dd6cba17.slice/cri-containerd-c74b0f5062f0bc726cae1e9369ad4a95deed6b298d247f0407475adb23fa3190")
as.Equal(typ, ContainerTypeContainerd)
as.Equal("c74b0f5062f0bc726cae1e9369ad4a95deed6b298d247f0407475adb23fa3190", id)
Expand Down
55 changes: 34 additions & 21 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"k8s.io/klog/v2"
)

const MinTrafficStatsUpdateInterval = 5 * time.Second
const (
MinTrafficStatsUpdateInterval = 5 * time.Second
IgnoredContainersCacheTTL = 15 * time.Second
)

var (
selfNetNs = netns.None()
Expand All @@ -44,11 +47,12 @@ type Registry struct {
tracer *ebpftracer.Tracer
events chan ebpftracer.Event

containersById map[ContainerID]*Container
containersByCgroupId map[string]*Container
containersByPid map[uint32]*Container
ip2fqdn map[netaddr.IP]string
ip2fqdnLock sync.RWMutex
containersById map[ContainerID]*Container
containersByCgroupId map[string]*Container
containersByPid map[uint32]*Container
containersByPidIgnored map[uint32]*time.Time
ip2fqdn map[netaddr.IP]string
ip2fqdnLock sync.RWMutex

processInfoCh chan<- ProcessInfo

Expand Down Expand Up @@ -79,29 +83,30 @@ func NewRegistry(reg prometheus.Registerer, processInfoCh chan<- ProcessInfo) (*
if err != nil {
return nil, err
}
if err := cgroup.Init(); err != nil {
if err = cgroup.Init(); err != nil {
return nil, err
}
if err := DockerdInit(); err != nil {
if err = DockerdInit(); err != nil {
klog.Warningln(err)
}
if err := ContainerdInit(); err != nil {
if err = ContainerdInit(); err != nil {
klog.Warningln(err)
}
if err := CrioInit(); err != nil {
if err = CrioInit(); err != nil {
klog.Warningln(err)
}
if err := JournaldInit(); err != nil {
if err = JournaldInit(); err != nil {
klog.Warningln(err)
}

r := &Registry{
reg: reg,
events: make(chan ebpftracer.Event, 10000),
containersById: map[ContainerID]*Container{},
containersByCgroupId: map[string]*Container{},
containersByPid: map[uint32]*Container{},
ip2fqdn: map[netaddr.IP]string{},
reg: reg,
events: make(chan ebpftracer.Event, 10000),
containersById: map[ContainerID]*Container{},
containersByCgroupId: map[string]*Container{},
containersByPid: map[uint32]*Container{},
containersByPidIgnored: map[uint32]*time.Time{},
ip2fqdn: map[netaddr.IP]string{},

processInfoCh: processInfoCh,

Expand Down Expand Up @@ -158,6 +163,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
c.onProcessExit(pid, false)
}
}
r.containersByPidIgnored = map[uint32]*time.Time{}
activeIPs := map[netaddr.IP]struct{}{}
for id, c := range r.containersById {
for dst := range c.lastConnectionAttempts {
Expand Down Expand Up @@ -287,10 +293,16 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}

func (r *Registry) getOrCreateContainer(pid uint32) *Container {
if c, seen := r.containersByPid[pid]; c != nil {
if c := r.containersByPid[pid]; c != nil {
return c
} else if seen { // ignored
return nil
} else {
if t := r.containersByPidIgnored[pid]; t != nil {
if time.Since(*t) < IgnoredContainersCacheTTL {
return nil
} else {
delete(r.containersByPidIgnored, pid)
}
}
}
cg, err := proc.ReadCgroup(pid)
if err != nil {
Expand Down Expand Up @@ -326,7 +338,8 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
klog.InfoS("ignoring without persisting", "cg", cg.Id, "pid", pid)
} else {
klog.InfoS("ignoring", "cg", cg.Id, "pid", pid)
r.containersByPid[pid] = nil
t := time.Now()
r.containersByPidIgnored[pid] = &t
}
return nil
}
Expand Down

0 comments on commit b64d949

Please sign in to comment.