支持大文件异步传输 #822
Replies: 5 comments
-
可以考虑先读取body数据到内存中,这时可以设置一个阈值,如果大于该阈值直接暂存到磁盘中,然后等待传输完毕再转发给后端节点。目前我们这边遇到的困境就是由于body较大且网速较慢后端java线程长时间卡住。 |
Beta Was this translation helpful? Give feedback.
-
@iwannay 其它场景(例如流量镜像)也有类似需求,需要将请求body临时存放在内存及磁盘中; |
Beta Was this translation helpful? Give feedback.
-
@iyangsj 我这边实现了类似的功能 func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic.Request) (action int) {
var err error
var res *bfe_http.Response
var hl *bfe_module.HandlerList
var retVal int
var clusterName string
var cluster *bfe_cluster.BfeCluster
var outreq *bfe_http.Request
var serverConf *bfe_route.ServerDataConf
var writeTimer *time.Timer
req := basicReq.HttpRequest
isRedirect := false
resFlushInterval := time.Duration(0)
cancelOnClientClose := false
// get instance of BfeServer
srv := p.server
// set clientip of original user for request
setClientAddr(basicReq)
// Callback for HandleBeforeLocation
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleBeforeLocation)
if hl != nil {
retVal, res = hl.FilterRequest(basicReq)
basicReq.HttpResponse = res
switch retVal {
case bfe_module.BfeHandlerClose:
// close the connection directly (with no response)
action = closeDirectly
return
case bfe_module.BfeHandlerFinish:
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
isRedirect = true
basicReq.BfeStatusCode = basicReq.Redirect.Code
goto send_response
case bfe_module.BfeHandlerResponse:
goto response_got
}
}
// find product
if err := srv.findProduct(basicReq); err != nil {
basicReq.ErrCode = bfe_basic.ErrBkFindProduct
basicReq.ErrMsg = err.Error()
p.proxyState.ErrBkFindProduct.Inc(1)
log.Logger.Info("FindProduct error[%s] host[%s] vip[%s] clientip[%s]", err.Error(),
basicReq.HttpRequest.Host, basicReq.Session.Vip, basicReq.ClientAddr)
// close connection
res = bfe_basic.CreateInternalSrvErrResp(basicReq)
action = closeAfterReply
goto response_got
}
// Callback for HandleFoundProduct
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleFoundProduct)
if hl != nil {
retVal, res = hl.FilterRequest(basicReq)
basicReq.HttpResponse = res
switch retVal {
case bfe_module.BfeHandlerClose:
// close the connection directly (with no response)
action = closeDirectly
return
case bfe_module.BfeHandlerFinish:
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
isRedirect = true
basicReq.BfeStatusCode = basicReq.Redirect.Code
goto send_response
case bfe_module.BfeHandlerResponse:
goto response_got
}
}
// find cluster
if err = srv.findCluster(basicReq); err != nil {
basicReq.ErrCode = bfe_basic.ErrBkFindLocation
basicReq.ErrMsg = err.Error()
p.proxyState.ErrBkFindLocation.Inc(1)
log.Logger.Info("FindLocation error[%s] host[%s]", err, basicReq.HttpRequest.Host)
// close connection
res = bfe_basic.CreateInternalSrvErrResp(basicReq)
action = closeAfterReply
goto response_got
}
clusterName = basicReq.Route.ClusterName
// look up for cluster
serverConf = basicReq.SvrDataConf.(*bfe_route.ServerDataConf)
cluster, err = serverConf.ClusterTable.Lookup(clusterName)
if err != nil {
log.Logger.Warn("no cluster for %s", clusterName)
basicReq.Stat.ResponseStart = time.Now()
basicReq.ErrCode = bfe_basic.ErrBkNoCluster
basicReq.ErrMsg = err.Error()
p.proxyState.ErrBkNoCluster.Inc(1)
res = bfe_basic.CreateInternalSrvErrResp(basicReq)
action = closeAfterReply
goto response_got
}
basicReq.Backend.ClusterName = clusterName
// set deadline to finish read client request body
p.setTimeout(bfe_basic.StageReadReqBody, basicReq.Connection, req, cluster.TimeoutReadClient())
// Callback for HandleAfterLocation
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleAfterLocation)
if hl != nil {
retVal, res = hl.FilterRequest(basicReq)
basicReq.HttpResponse = res
switch retVal {
case bfe_module.BfeHandlerClose:
// close the connection directly (with no response)
action = closeDirectly
return
case bfe_module.BfeHandlerFinish:
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
isRedirect = true
basicReq.BfeStatusCode = basicReq.Redirect.Code
goto send_response
case bfe_module.BfeHandlerResponse:
goto response_got
}
}
if bfe_debug.DebugServHTTP {
log.Logger.Debug("ReverseProxy.ServeHTTP(): cluster name = %s", clusterName)
}
// prepare out request to downstream RS backend
outreq = new(bfe_http.Request)
*outreq = *req // includes shallow copies of maps, but okay
basicReq.OutRequest = outreq
// set http proto for out request
httpProtoSet(outreq)
// remove hop-by-hop headers
hopByHopHeaderRemove(outreq, req)
if outreq.Body != nil {
body, err := multibuf.New(req.Body, multibuf.MaxBytes(cluster.MaxRequestBodyBytes()),
multibuf.MemBytes(cluster.MemRequestBodyBytes()))
var failed bool
var totalSize int64
if err != nil || body == nil {
failed = true
} else {
totalSize, err = body.Size()
if err != nil {
failed = true
}
}
defer func() {
if body != nil {
errc := body.Close()
if errc != nil {
log.Logger.Error("buffer: failed to close req body, err: %v", errc)
}
}
}()
if failed {
outreq.Body.Close()
basicReq.ErrCode = bfe_basic.ErrClientReadBody
basicReq.ErrMsg = err.Error()
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
p.proxyState.ErrClientTimeout.Inc(1)
res = bfe_basic.CreateStatusRequestTimeout(basicReq)
} else {
p.proxyState.ErrClientReqFailReadBody.Inc(1)
res = bfe_basic.CreateStatusBadRequestResp(basicReq)
}
// close connection
action = closeAfterReply
goto response_got
}
if body == nil || totalSize == 0 {
outreq.Body = ioutil.NopCloser(req.Body)
} else {
outreq.Body = ioutil.NopCloser(body.(io.Reader))
}
outreq.ContentLength = totalSize
}
// invoke cluster to get response
res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw)
basicReq.HttpResponse = res
// Note: The runtime will not GC the objects referenced by basicReq.SvrDataConf until the request
// has been processed. But the request may last a long time. It's better to remove the reference
// to objects which are not used any more.
basicReq.SvrDataConf = nil
if err != nil || res == nil {
basicReq.Stat.ResponseStart = time.Now()
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
res = bfe_basic.CreateInternalSrvErrResp(basicReq)
goto response_got
}
resFlushInterval = cluster.ResFlushInterval()
cancelOnClientClose = cluster.CancelOnClientClose()
if resFlushInterval == 0 && basicReq.HttpRequest.Header.Get("Accept") == "text/event-stream" {
resFlushInterval = cluster.DefaultSSEFlushInterval()
}
// timeout for write response to client
// Note: we use io.Copy() to read from backend and write to client.
// For avoid from blocking on client conn or backend conn forever,
// we must timeout both conns after specified duration.
p.setTimeout(bfe_basic.StageWriteClient, basicReq.Connection, req, cluster.TimeoutWriteClient())
writeTimer = time.AfterFunc(cluster.TimeoutWriteClient(), func() {
transport := basicReq.Trans.Transport.(*bfe_http.Transport)
transport.CancelRequest(basicReq.OutRequest) // force close connection to backend
})
defer writeTimer.Stop()
// for read next request
defer p.setTimeout(bfe_basic.StageEndRequest, basicReq.Connection, req, cluster.TimeoutReadClientAgain())
response_got:
defer res.Body.Close()
// Callback for HandleReadResponse
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleReadResponse)
if hl != nil {
retVal = hl.FilterResponse(basicReq, res)
switch retVal {
case bfe_module.BfeHandlerFinish:
// close the connection after response
action = closeAfterReply
basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
return
case bfe_module.BfeHandlerRedirect:
// make redirect
Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header)
isRedirect = true
basicReq.BfeStatusCode = basicReq.Redirect.Code
goto send_response
}
}
send_response:
// send http response to client
basicReq.Stat.ResponseStart = time.Now()
if !isRedirect && res != nil {
if res.Body != nil {
body, err := multibuf.New(res.Body, multibuf.MaxBytes(cluster.MaxResponseBodyBytes()),
multibuf.MemBytes(cluster.MemResponseBodyBytes()))
var failed bool
if err != nil || body == nil {
failed = true
}
defer func() {
if body != nil {
errc := body.Close()
if errc != nil {
log.Logger.Error("buffer: failed to close resp body, err: %v", errc)
}
}
}()
if failed {
p.proxyState.ErrBkRespFailReadBody.Inc(1)
}
if body == nil {
res.Body = ioutil.NopCloser(res.Body)
} else {
res.Body = ioutil.NopCloser(body.(io.Reader))
}
}
err = p.sendResponse(rw, res, resFlushInterval, cancelOnClientClose)
if err != nil {
// Note: for h2/spdy protocol, not close client conn when send
// response error. h2/spdy module will close conn/stream properly
if !CheckSupportMultiplex(basicReq.Session.Proto) {
action = closeAfterReply
}
basicReq.ErrCode = bfe_basic.ErrClientWrite
basicReq.ErrMsg = err.Error()
p.proxyState.ErrClientWrite.Inc(1)
}
}
return
} 投入生产环境运行平稳,但不知是否符合bfe设计思路,是否应该提交pr |
Beta Was this translation helpful? Give feedback.
-
@iwannay 感谢您的贡献,可按如下流程提交PR: 实现建议:应包含功能开关,并可选启用该机制 |
Beta Was this translation helpful? Give feedback.
-
@iyangsj ok |
Beta Was this translation helpful? Give feedback.
-
作者您好,由于bfe目前client->bfe->backend这个数据流是同步传输的,造成大文件上传时后端线程长时间等待。在已知的反代产品中,nginx支持异步传输,可以等待文件上传完毕再转发给backend,这样可以有效避免后端线程长时间等待。请问bfe是否后期有计划支持异步传输。
Beta Was this translation helpful? Give feedback.
All reactions