diff --git a/common/interfaces.go b/common/interfaces.go index e3066a3..5023bcf 100644 --- a/common/interfaces.go +++ b/common/interfaces.go @@ -15,7 +15,7 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*worker.Process) // Exec payload - Exec(ctx context.Context, p *payload.Payload) (*payload.Payload, error) + Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error) // Reset kill all workers inside the watcher and replaces with new Reset(ctx context.Context) error // Destroy all underlying stack (but let them complete the task). diff --git a/go.mod b/go.mod index 9081654..f91fe65 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/roadrunner-server/http/v4 go 1.20 require ( - github.com/caddyserver/certmagic v0.19.0 + github.com/caddyserver/certmagic v0.19.1 github.com/goccy/go-json v0.10.2 github.com/google/go-cmp v0.5.9 github.com/mholt/acmez v1.2.0 @@ -12,13 +12,13 @@ require ( github.com/roadrunner-server/endure/v2 v2.3.0 github.com/roadrunner-server/errors v1.2.0 github.com/roadrunner-server/goridge/v3 v3.6.3 - github.com/roadrunner-server/sdk/v4 v4.3.1 + github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.25.0 golang.org/x/net v0.12.0 golang.org/x/sys v0.10.0 ) @@ -40,7 +40,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.11.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect github.com/roadrunner-server/tcplisten v1.3.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect @@ -48,7 +48,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zeebo/blake3 v0.2.3 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect - go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/mod v0.12.0 // indirect diff --git a/go.sum b/go.sum index cb1c7dd..f85556f 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,14 @@ -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/caddyserver/certmagic v0.19.0 h1:HuJ1Yf1H1jAfmBGrSSQN1XRkafnWcpDtyIiyMV6vmpM= -github.com/caddyserver/certmagic v0.19.0/go.mod h1:fsL01NomQ6N+kE2j37ZCnig2MFosG+MIO4ztnmG/zz8= +github.com/caddyserver/certmagic v0.19.1 h1:4jyOYm2DHvQI8YM0sk6qm62Gl5XznHxiMBMWjMTlQkw= +github.com/caddyserver/certmagic v0.19.1/go.mod h1:fsL01NomQ6N+kE2j37ZCnig2MFosG+MIO4ztnmG/zz8= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -28,9 +27,6 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -44,11 +40,6 @@ github.com/mholt/acmez v1.2.0 h1:1hhLxSgY5FvH5HCnGUuwbKY2VQVo8IU7rxXKSnZ7F30= github.com/mholt/acmez v1.2.0/go.mod h1:VT9YwH1xgNX1kmYY89gY8xPJC84BFAisjo8Egigt4kE= github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo= github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= @@ -57,8 +48,8 @@ github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUo github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= -github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= -github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ= github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c= github.com/roadrunner-server/endure/v2 v2.3.0 h1:ctsXL3BjcgHJ0kyO42B2QIaKeZa0modVV9jYx3qSxqo= @@ -67,14 +58,13 @@ github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM0 github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY= github.com/roadrunner-server/goridge/v3 v3.6.3 h1:8hCuPVK9BxIE4IGyNphK6KPAy9Kg6t5tHaItBIQKh2o= github.com/roadrunner-server/goridge/v3 v3.6.3/go.mod h1:hB5+lHhl8msuHrngjKQ+Wx8B705AU0/2DlYGFXbjtgU= -github.com/roadrunner-server/sdk/v4 v4.3.1 h1:DwmyzcKbprXz6JLnyR4fbOmgSC0qr528xC4uSwxRVSY= -github.com/roadrunner-server/sdk/v4 v4.3.1/go.mod h1:YiYFMLx2zVcDjy52P8i/c++VJIY/qaUSdboN0PiPGok= +github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2 h1:YsAJaS5Sdnw7Z9ULiejLh+g0MrMLgqE4yrZ/kduew/0= +github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2/go.mod h1:QcZBTccDGT8zhbHkbzqM7SORktVtvh6Jigkz3hy6kBk= github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY= github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= @@ -100,13 +90,11 @@ go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26 go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= @@ -133,6 +121,5 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler/handler.go b/handler/handler.go index 5fdc3da..2763aef 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -46,10 +46,10 @@ type Handler struct { gid int // internal - reqPool sync.Pool - respPool sync.Pool - pldPool sync.Pool - errPool sync.Pool + reqPool sync.Pool + respPool sync.Pool + pldPool sync.Pool + stopChPool sync.Pool } // NewHandler return handle interface implementation @@ -69,9 +69,9 @@ func NewHandler(cfg *config.Config, pool common.Pool, log *zap.Logger) (*Handler uid: cfg.UID, gid: cfg.GID, - errPool: sync.Pool{ + stopChPool: sync.Pool{ New: func() any { - return make(chan error, 1) + return make(chan struct{}, 1) }, }, reqPool: sync.Pool{ @@ -141,29 +141,48 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - wResp, err := h.pool.Exec(context.Background(), pld) + stopCh := h.getCh() + wResp, err := h.pool.Exec(context.Background(), pld, stopCh) if err != nil { req.Close(h.log, r) h.putReq(req) h.putPld(pld) + h.putCh(stopCh) h.handleError(w, err) h.log.Error("execute", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err)) return } - err = h.Write(wResp, w) - if err != nil { - req.Close(h.log, r) - h.putReq(req) - h.putPld(pld) - h.handleError(w, err) - h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err)) - return + for recv := range wResp { + if recv.Error() != nil { + req.Close(h.log, r) + h.putReq(req) + h.putPld(pld) + h.putCh(stopCh) + h.handleError(w, err) + h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err)) + return + } + + err = h.Write(recv.Payload(), w) + if err != nil { + // send stop signal to the workers pool + stopCh <- struct{}{} + + req.Close(h.log, r) + h.putReq(req) + h.putPld(pld) + h.handleError(w, err) + h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err)) + h.putCh(stopCh) + return + } } h.putPld(pld) req.Close(h.log, r) h.putReq(req) + h.putCh(stopCh) } func (h *Handler) Dispose() {} @@ -249,3 +268,18 @@ func (h *Handler) getPld() *payload.Payload { pld.Codec = frame.CodecJSON return pld } + +func (h *Handler) getCh() chan struct{} { + ch := h.stopChPool.Get().(chan struct{}) + // just check if the chan is not empty + select { + case <-ch: + default: + } + + return ch +} + +func (h *Handler) putCh(ch chan struct{}) { + h.stopChPool.Put(ch) +}