Skip to content

Commit

Permalink
feat: support edge-triggered I/O
Browse files Browse the repository at this point in the history
Fixes #573
  • Loading branch information
panjf2000 committed Apr 17, 2024
1 parent 601bfdf commit f934133
Show file tree
Hide file tree
Showing 14 changed files with 740 additions and 187 deletions.
6 changes: 5 additions & 1 deletion acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
}

c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
return err
}
el.connections.addConn(c, el.idx)
Expand Down
178 changes: 140 additions & 38 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type connHandler struct {
type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
svr *testClient
packetLen int
}

Expand Down Expand Up @@ -87,117 +87,219 @@ func (ev *clientEvents) OnShutdown(e Engine) {
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

func TestServeWithGnetClient(t *testing.T) {
func TestClient(t *testing.T) {
// start an engine
// connect 10 clients
// each client will pipe random data for 1-3 seconds.
// the writes to the engine will be random sizes. 0KB - 1MB.
// the engine will echo back the data.
// waits for graceful connection closing.
t.Run("poll", func(t *testing.T) {
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-reuseport", func(t *testing.T) {
t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-LT-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
})
})
})

t.Run("poll-ET-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
})
})
})
}

type testClientServer struct {
type testClient struct {
*BuiltinEventEngine
client *Client
tester *testing.T
Expand All @@ -215,20 +317,20 @@ type testClientServer struct {
udpReadHeader int32
}

func (s *testClientServer) OnBoot(eng Engine) (action Action) {
func (s *testClient) OnBoot(eng Engine) (action Action) {
s.eng = eng
return
}

func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) {
func (s *testClient) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(&sync.Once{})
atomic.AddInt32(&s.connected, 1)
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
return
}

func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
func (s *testClient) OnClose(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
Expand All @@ -246,13 +348,13 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
return
}

func (s *testClientServer) OnShutdown(Engine) {
func (s *testClient) OnShutdown(Engine) {
if s.network == "udp" {
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
}
}

func (s *testClientServer) OnTraffic(c Conn) (action Action) {
func (s *testClient) OnTraffic(c Conn) (action Action) {
readHeader := func() {
ping := make([]byte, len(pingMsg))
n, err := io.ReadFull(c, ping)
Expand Down Expand Up @@ -302,7 +404,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
return
}

func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
func (s *testClient) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
Expand All @@ -321,8 +423,8 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
return
}

func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClientServer{
func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClient{
tester: t,
network: network,
addr: addr,
Expand All @@ -347,10 +449,10 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus

err = Run(ts,
network+"://"+addr,
WithEdgeTriggeredIO(et),
WithLockOSThread(async),
WithMulticore(multicore),
WithReusePort(reuseport),
WithReuseAddr(reuseaddr),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPNoDelay(TCPDelay),
Expand Down
9 changes: 8 additions & 1 deletion client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,16 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
options.WriteBufferCap = math.CeilToPowerOfTwo(wbc)
}

el.buffer = make([]byte, options.ReadBufferCap)
el.connections.init()
el.eventHandler = eh
el.read = el.readLT
el.write = el.writeLT
if options.EdgeTriggeredIO {
el.read = el.readET
el.write = el.writeET

Check warning on line 115 in client_unix.go

View check run for this annotation

Codecov / codecov/patch

client_unix.go#L114-L115

Added lines #L114 - L115 were not covered by tests
} else {
el.buffer = make([]byte, options.ReadBufferCap)
}
cli.el = &el
return
}
Expand Down
Loading

0 comments on commit f934133

Please sign in to comment.