Skip to content

Commit

Permalink
optimize gc and add tests for GC
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Aug 1, 2024
1 parent 9f99064 commit f763d6d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 15 deletions.
73 changes: 73 additions & 0 deletions aio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,76 @@ func BenchmarkContextSwitch(b *testing.B) {
}
close(die)
}

func TestGC(t *testing.T) {
par := 1024
msgsize := 65536
t.Log("testing GC:", par, "connections")
ln := echoServer(t, msgsize)
defer ln.Close()

w, err := NewWatcher()
if err != nil {
t.Fatal(err)
}
defer w.Close()

for i := 0; i < par; i++ {
go func() {
data := make([]byte, msgsize)
conn, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
log.Fatal(err)
}

// send
err = w.Write(nil, conn, data)
if err != nil {
log.Fatal(err)
}

conn = nil
}()
}

count := 0
LOOP:
for {
results, err := w.WaitIO()
if err != nil {
t.Fatal("waitio:", err)
return
}

for _, res := range results {
switch res.Operation {
case OpWrite:
case OpRead:
}
res.Conn = nil

count++
if count >= par {
break LOOP
}
}
}

found, closed := w.GetGC()
t.Logf("GC found:%d closed:%d", found, closed)
<-time.After(2 * time.Second)
runtime.GC()

found, closed = w.GetGC()
t.Logf("GC found:%d closed:%d", found, closed)
<-time.After(2 * time.Second)
runtime.GC()

found, closed = w.GetGC()
t.Logf("GC found:%d closed:%d", found, closed)
<-time.After(2 * time.Second)
runtime.GC()

found, closed = w.GetGC()
t.Logf("GC found:%d closed:%d", found, closed)
}
46 changes: 31 additions & 15 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ type watcher struct {
timer *time.Timer // Timer for handling timeouts

// Garbage collection
gc []net.Conn // List of connections to be garbage collected
gc []uintptr // List of connections to be garbage collected
gcMutex sync.Mutex // Mutex to synchronize access to the gc list
gcNotify chan struct{} // Channel to notify the GC processor
gcFound uint32 // number of net.Conn objects found unreachable by runtime
gcClosed uint32 // record number of objects closed successfully

// Shutdown and cleanup
die chan struct{} // Channel for signaling shutdown
Expand Down Expand Up @@ -571,19 +573,8 @@ func (w *watcher) loop() {
break
}
}

case <-w.gcNotify: // GC recycled net.Conn
w.gcMutex.Lock()
for i, c := range w.gc {
ptr := reflect.ValueOf(c).Pointer()
if ident, ok := w.connIdents[ptr]; ok {
w.releaseConn(ident)
}
w.gc[i] = nil
}
w.gc = w.gc[:0]
w.gcMutex.Unlock()

case <-w.gcNotify:
w.handleGC()
case cpuid := <-w.chCPUID:
setAffinity(cpuid)

Expand All @@ -593,6 +584,22 @@ func (w *watcher) loop() {
}
}

// handleGC processes the garbage collection of net.Conn objects.
func (w *watcher) handleGC() {
runtime.GC()
w.gcMutex.Lock()
if len(w.gc) > 0 {
for _, ptr := range w.gc {
if ident, ok := w.connIdents[ptr]; ok {
w.releaseConn(ident)
}
}
w.gcClosed += uint32(len(w.gc))
w.gc = w.gc[:0]
}
w.gcMutex.Unlock()
}

// handlePending processes new requests, acting as a reception desk.
func (w *watcher) handlePending(pending []*aiocb) {
PENDING:
Expand Down Expand Up @@ -641,7 +648,9 @@ PENDING:
// if not it will never be GC-ed.
runtime.SetFinalizer(pcb.conn, func(c net.Conn) {
w.gcMutex.Lock()
w.gc = append(w.gc, c)
ptr := reflect.ValueOf(c).Pointer()
w.gc = append(w.gc, ptr)
w.gcFound++
w.gcMutex.Unlock()

// notify gc processor
Expand Down Expand Up @@ -741,3 +750,10 @@ func (w *watcher) handleEvents(events pollerEvents) {
}
}
}

// read gcFound & gcClosed
func (w *watcher) GetGC() (found uint32, closed uint32) {
w.gcMutex.Lock()
defer w.gcMutex.Unlock()
return w.gcFound, w.gcClosed
}

0 comments on commit f763d6d

Please sign in to comment.