From f841bf15ef0edff39e2bd7a94a3e5b603cfbc0b2 Mon Sep 17 00:00:00 2001 From: lbbniu Date: Fri, 12 Jan 2024 14:12:33 +0800 Subject: [PATCH 1/2] perf(transport): optimize server handle --- tars/transport/tcphandler.go | 2 +- tars/transport/udphandler.go | 42 ++++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/tars/transport/tcphandler.go b/tars/transport/tcphandler.go index 11f8f873..380a3197 100755 --- a/tars/transport/tcphandler.go +++ b/tars/transport/tcphandler.go @@ -72,6 +72,7 @@ func (t *tcpHandler) getConnContext(connSt *connInfo) context.Context { func (t *tcpHandler) handleConn(connSt *connInfo, pkg []byte) { // recvPkgTs are more accurate ctx := t.getConnContext(connSt) + atomic.AddInt32(&connSt.numInvoke, 1) handler := func() { defer atomic.AddInt32(&connSt.numInvoke, -1) rsp := t.server.invoke(ctx, pkg) @@ -262,7 +263,6 @@ func (t *tcpHandler) recv(connSt *connInfo) { break } if status == PackageFull { - atomic.AddInt32(&connSt.numInvoke, 1) pkg := make([]byte, pkgLen) copy(pkg, currBuffer[:pkgLen]) currBuffer = currBuffer[pkgLen:] diff --git a/tars/transport/udphandler.go b/tars/transport/udphandler.go index a14ee2a6..b7a32668 100755 --- a/tars/transport/udphandler.go +++ b/tars/transport/udphandler.go @@ -38,6 +38,28 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context { return ctx } +func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) { + ctx := u.getConnContext(udpAddr) + atomic.AddInt32(&u.server.numInvoke, 1) + go func() { + defer atomic.AddInt32(&u.server.numInvoke, -1) + rsp := u.server.invoke(ctx, pkg) // no need to check package + + cPacketType, ok := current.GetPacketTypeFromContext(ctx) + if !ok { + TLOG.Error("Failed to GetPacketTypeFromContext") + } + + if cPacketType == basef.TARSONEWAY { + return + } + + if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil { + TLOG.Errorf("send pkg to %v failed %v", udpAddr, err) + } + }() +} + func (u *udpHandler) Handle() error { atomic.AddInt32(&u.server.numConn, 1) // wait invoke done @@ -67,25 +89,7 @@ func (u *udpHandler) Handle() error { } pkg := make([]byte, n) copy(pkg, buffer[0:n]) - ctx := u.getConnContext(udpAddr) - go func() { - atomic.AddInt32(&u.server.numInvoke, 1) - defer atomic.AddInt32(&u.server.numInvoke, -1) - rsp := u.server.invoke(ctx, pkg) // no need to check package - - cPacketType, ok := current.GetPacketTypeFromContext(ctx) - if !ok { - TLOG.Error("Failed to GetPacketTypeFromContext") - } - - if cPacketType == basef.TARSONEWAY { - return - } - - if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil { - TLOG.Errorf("send pkg to %v failed %v", udpAddr, err) - } - }() + u.handleUDPAddr(udpAddr, pkg) } } From 0277e02cdda60c7991945fc4b8556d32236511c8 Mon Sep 17 00:00:00 2001 From: lbbniu Date: Fri, 12 Jan 2024 14:28:43 +0800 Subject: [PATCH 2/2] perf(transport): udp server handle supports coroutine pools --- tars/transport/udphandler.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tars/transport/udphandler.go b/tars/transport/udphandler.go index b7a32668..0a2d6af3 100755 --- a/tars/transport/udphandler.go +++ b/tars/transport/udphandler.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + "github.com/TarsCloud/TarsGo/tars/util/gpool" + "github.com/TarsCloud/TarsGo/tars/protocol/res/basef" "github.com/TarsCloud/TarsGo/tars/util/current" "github.com/TarsCloud/TarsGo/tars/util/grace" @@ -17,6 +19,7 @@ type udpHandler struct { server *TarsServer conn *net.UDPConn + pool *gpool.Pool } func (u *udpHandler) Listen() (err error) { @@ -26,6 +29,11 @@ func (u *udpHandler) Listen() (err error) { return err } TLOG.Info("UDP listen", u.conn.LocalAddr()) + + // init goroutine pool + if cfg.MaxInvoke > 0 { + u.pool = gpool.NewPool(int(cfg.MaxInvoke), cfg.QueueCap) + } return nil } @@ -41,7 +49,7 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context { func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) { ctx := u.getConnContext(udpAddr) atomic.AddInt32(&u.server.numInvoke, 1) - go func() { + handler := func() { defer atomic.AddInt32(&u.server.numInvoke, -1) rsp := u.server.invoke(ctx, pkg) // no need to check package @@ -57,7 +65,14 @@ func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) { if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil { TLOG.Errorf("send pkg to %v failed %v", udpAddr, err) } - }() + } + + cfg := u.config + if cfg.MaxInvoke > 0 { // use goroutine pool + u.pool.JobQueue <- handler + } else { + go handler() + } } func (u *udpHandler) Handle() error {