From 9076f70b12a28212b39d9b07d47fbfffa5f196fe Mon Sep 17 00:00:00 2001 From: mac Date: Sat, 16 Dec 2023 22:36:10 +0800 Subject: [PATCH] feat: bytebuferpoll use mcache replace --- pkg/app/context.go | 2 +- pkg/app/server/hertz_test.go | 51 +++++++++++++++++++++++++++++++ pkg/common/bytebufferpool/pool.go | 14 ++++++++- pkg/common/config/option.go | 2 +- pkg/protocol/response.go | 24 ++++++++++++--- 5 files changed, 85 insertions(+), 8 deletions(-) diff --git a/pkg/app/context.go b/pkg/app/context.go index 4e2d4be33..c8ff39926 100644 --- a/pkg/app/context.go +++ b/pkg/app/context.go @@ -582,7 +582,7 @@ func (ctx *RequestContext) FileAttachment(filepath, filename string) { // SetBodyString sets response body to the given value. func (ctx *RequestContext) SetBodyString(body string) { - ctx.Response.SetBodyString(body) + ctx.Response.SetBodyString(body, string(ctx.URI().RequestURI())) } // SetContentTypeBytes sets response Content-Type. diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 4a9cee3d0..0f09b1486 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "net" "net/http" + "runtime" "strings" "sync" "sync/atomic" @@ -248,6 +249,56 @@ func TestServer_Run(t *testing.T) { _ = hertz.Shutdown(ctx) } +func TestBigBodyBug(t *testing.T) { + runtime.GOMAXPROCS(3) + hertz := New(WithHostPorts("127.0.0.1:8888")) + hertz.GET("/test1", func(c context.Context, ctx *app.RequestContext) { + body := make([]byte, 1024*1024*9) + ctx.SetBodyString(string(body)) + }) + hertz.GET("/test2", func(c context.Context, ctx *app.RequestContext) { + body := make([]byte, 1024) + ctx.SetBodyString(string(body)) + }) + hertz.GET("/test3", func(c context.Context, ctx *app.RequestContext) { + body := make([]byte, 1024*2) + ctx.SetBodyString(string(body)) + }) + go hertz.Run() + + go func() { + for i := 0; i < 2; i++ { + go func() { + for true { + http.Get("http://127.0.0.1:8888/test1") + } + }() + } + }() + + go func() { + for i := 0; i < 5; i++ { + go func() { + for true { + http.Get("http://127.0.0.1:8888/test2") + } + }() + } + }() + + go func() { + for i := 0; i < 5; i++ { + go func() { + for true { + http.Get("http://127.0.0.1:8888/test3") + } + }() + } + }() + + <-make(chan struct{}) +} + func TestNotAbsolutePath(t *testing.T) { engine := New(WithHostPorts("127.0.0.1:9990")) engine.POST("/", func(c context.Context, ctx *app.RequestContext) { diff --git a/pkg/common/bytebufferpool/pool.go b/pkg/common/bytebufferpool/pool.go index 889f118dd..4dc2f533d 100644 --- a/pkg/common/bytebufferpool/pool.go +++ b/pkg/common/bytebufferpool/pool.go @@ -45,6 +45,8 @@ import ( "sort" "sync" "sync/atomic" + + "github.com/bytedance/gopkg/lang/mcache" ) const ( @@ -54,7 +56,7 @@ const ( minSize = 1 << minBitSize maxSize = 1 << (minBitSize + steps - 1) - calibrateCallsThreshold = 42000 + calibrateCallsThreshold = 10 maxPercentile = 0.95 ) @@ -96,6 +98,16 @@ func (p *Pool) Get() *ByteBuffer { } } +func (p *Pool) GetWithSize(size int) *ByteBuffer { + return &ByteBuffer{ + B: mcache.Malloc(size)[:0], + } +} + +func (p *Pool) PutWithByte(b []byte) { + mcache.Free(b) +} + // Put returns byte buffer to the pool. // // ByteBuffer.B mustn't be touched after returning it to the pool. diff --git a/pkg/common/config/option.go b/pkg/common/config/option.go index 417955fc9..250bd41e9 100644 --- a/pkg/common/config/option.go +++ b/pkg/common/config/option.go @@ -37,7 +37,7 @@ const ( defaultAddr = ":8888" defaultNetwork = "tcp" defaultBasePath = "/" - defaultMaxRequestBodySize = 4 * 1024 * 1024 + defaultMaxRequestBodySize = 0 defaultWaitExitTimeout = time.Second * 5 defaultReadBufferSize = 4 * 1024 ) diff --git a/pkg/protocol/response.go b/pkg/protocol/response.go index 8beb38597..f55cc5c74 100644 --- a/pkg/protocol/response.go +++ b/pkg/protocol/response.go @@ -42,6 +42,7 @@ package protocol import ( + "fmt" "io" "net" "sync" @@ -138,9 +139,11 @@ func (resp *Response) SetConnectionClose() { } // SetBodyString sets response body. -func (resp *Response) SetBodyString(body string) { - resp.CloseBodyStream() //nolint:errcheck - resp.BodyBuffer().SetString(body) //nolint:errcheck +func (resp *Response) SetBodyString(body, url string) { + resp.CloseBodyStream() //nolint:errcheck + // resp.BodyBuffer().SetString(body) //nolint:errcheck + // resp.BodyBuffer(url).SetString(body) //nolint:errcheck + resp.BodyBufferWithSize(len(body), url).SetString(body) //nolint:errcheck } func (resp *Response) ConstructBodyStream(body *bytebufferpool.ByteBuffer, bodyStream io.Reader) { @@ -299,7 +302,8 @@ func (resp *Response) ResetBody() { resp.body.Reset() return } - responseBodyPool.Put(resp.body) + // responseBodyPool.Put(resp.body) + responseBodyPool.PutWithByte(resp.body.B) resp.body = nil } } @@ -385,11 +389,21 @@ func (resp *Response) CloseBodyStream() error { return err } -func (resp *Response) BodyBuffer() *bytebufferpool.ByteBuffer { +func (resp *Response) BodyBufferWithSize(size int, uri string) *bytebufferpool.ByteBuffer { + if resp.body == nil { + resp.body = responseBodyPool.GetWithSize(size) + } + resp.bodyRaw = nil + fmt.Printf("flipped url=%s resp Body len=%d cap=%d\n", uri, resp.body.Len(), cap(resp.body.B)) + return resp.body +} + +func (resp *Response) BodyBuffer(url ...string) *bytebufferpool.ByteBuffer { if resp.body == nil { resp.body = responseBodyPool.Get() } resp.bodyRaw = nil + fmt.Printf("flipped url=%s resp Body len=%d cap=%d\n", url, resp.body.Len(), cap(resp.body.B)) return resp.body }