Skip to content

Commit

Permalink
check both context for request
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Aug 13, 2024
1 parent 8d5a8ed commit 2e424e9
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
42 changes: 19 additions & 23 deletions pkg/rpcbackend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back

type RPCClient struct {
client *resty.Client
batchDispatcherContext context.Context
concurrencySlots chan bool
requestCounter int64
requestBatchQueue chan *batchRequest
Expand Down Expand Up @@ -167,53 +168,42 @@ type batchRequest struct {
}

func (rc *RPCClient) startBatchDispatcher(dispatcherRootContext context.Context, batchTimeout time.Duration, batchSize int) {
rc.batchDispatcherContext = dispatcherRootContext
if rc.requestBatchQueue == nil { // avoid orphaned dispatcher
requestQueue := make(chan *batchRequest)
go func() {
var batch []*batchRequest
var ticker *time.Ticker
var tickerChannel <-chan time.Time
var timeoutChannel <-chan time.Time
for {
select {
case req := <-requestQueue:
batch = append(batch, req)
if ticker == nil {
// first request received start a ticker
ticker = time.NewTicker(batchTimeout)
defer ticker.Stop()
tickerChannel = ticker.C
if timeoutChannel == nil {
// first request received, start a batch timeout
timeoutChannel = time.After(batchTimeout)
}

if len(batch) >= batchSize {
rc.dispatchBatch(dispatcherRootContext, batch)
rc.dispatchBatch(rc.batchDispatcherContext, batch)
batch = nil
ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed
ticker = nil
tickerChannel = nil
timeoutChannel = nil // stop the timeout and let it get reset by the next request
}

case <-tickerChannel:
case <-timeoutChannel:
if len(batch) > 0 {
rc.dispatchBatch(dispatcherRootContext, batch)
rc.dispatchBatch(rc.batchDispatcherContext, batch)
batch = nil

ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed
ticker = nil
tickerChannel = nil
}

case <-dispatcherRootContext.Done():
if ticker != nil {
ticker.Stop() // clean up the ticker
timeoutChannel = nil // stop the timeout and let it get reset by the next request
}
case <-rc.batchDispatcherContext.Done():
select { // drain the queue
case req := <-requestQueue:
batch = append(batch, req)
default:
}
for i, req := range batch {
// mark all queueing requests as failed
cancelCtxErr := i18n.NewError(dispatcherRootContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID)
cancelCtxErr := i18n.NewError(rc.batchDispatcherContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID)
batch[i].rpcErr <- cancelCtxErr
}

Expand Down Expand Up @@ -412,6 +402,9 @@ func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) (

select {
case rc.requestBatchQueue <- req:
case <-rc.batchDispatcherContext.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
Expand All @@ -422,6 +415,9 @@ func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) (
return rpcRes, nil
case err := <-req.rpcErr:
return nil, err
case <-rc.batchDispatcherContext.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
Expand Down
81 changes: 66 additions & 15 deletions pkg/rpcbackend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,23 +479,74 @@ func TestBatchSyncRequestCanceledContextWhenQueueing(t *testing.T) {
time.Sleep(50 * time.Millisecond) // wait for the request to be queued and start the ticker
cancelCtx()
<-checkDone

ctx2, cancelCtx2 := context.WithCancel(context.Background())

rpcOptions = ReadConfig(ctx2, rpcConfig)
rb = NewRPCClientWithOption(c, rpcOptions).(*RPCClient)

checkDone = make(chan bool)
go func() {
reqContext := context.Background()
_, err = rb.SyncRequest(reqContext, &RPCRequest{})
assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context
close(checkDone)
}()
cancelCtx2() // cancel context straight away to check the pending request are drained correctly
<-checkDone
}

// func TestBatchSyncRequestCanceledContextFlushTheQueueWhenRootContextIsCancelled(t *testing.T) {

// ctx, cancelCtx := context.WithCancel(context.Background())

// // Define the expected server response to the batch
// rpcServerResponseBatchBytes := []byte(`[
// {
// "jsonrpc": "2.0",
// "id": 1,
// "result": {
// "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1",
// "nonce": "0x24"
// }
// }
// ]`)

// server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// w.Header().Add("Content-Type", "application/json")
// w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes)))
// w.WriteHeader(200)
// w.Write(rpcServerResponseBatchBytes)
// }))
// defer server.Close()

// signerconfig.Reset()
// prefix := signerconfig.BackendConfig
// prefix.Set(ffresty.HTTPConfigURL, server.URL)
// c, err := ffresty.New(ctx, signerconfig.BackendConfig)
// assert.NoError(t, err)

// rpcConfig := config.RootSection("unittest")
// InitConfig(rpcConfig)
// rpcConfig.Set(ConfigBatchEnabled, true)
// rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay
// rpcConfig.Set(ConfigBatchSize, 1)
// rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) // set max concurrency to 1 so it can be jammed easily

// rpcOptions := ReadConfig(ctx, rpcConfig)

// rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient)

// checkDone := make(chan bool)
// // occupy the concurrent slot, so the first request is queued at batching
// rb.requestBatchConcurrencySlots <- true
// // emit the first request
// go func() {
// reqContext := context.Background()
// _, err = rb.SyncRequest(reqContext, &RPCRequest{
// Method: "eth_getTransactionByHash",
// Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)},
// })
// assert.NoError(t, err)
// }()

// // emit the second request, which is now blocked on joining the batch queue
// go func() {
// defer close(checkDone)
// reqContext := context.Background()
// _, err = rb.SyncRequest(reqContext, &RPCRequest{})
// assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context
// }()
// cancelCtx() // cancel the context
// <-rb.requestBatchConcurrencySlots // unblock the concurrency slot
// <-checkDone
// }

func TestBatchSyncRequestCanceledContextWhenDispatchingABatch(t *testing.T) {

blocked := make(chan struct{})
Expand Down
3 changes: 2 additions & 1 deletion pkg/rpcbackend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
)

const (
DefaultMaxConcurrentRequests = 50
DefaultConfigBatchSize = 500
DefaultConfigTimeout = "50ms"
DefaultConfigDispatchConcurrency = 50
Expand All @@ -57,7 +58,7 @@ type RPCClientOptions struct {

func InitConfig(section config.Section) {
section.AddKnownKey(ConfigBatchEnabled, false)
section.AddKnownKey(ConfigMaxConcurrentRequests, 0)
section.AddKnownKey(ConfigMaxConcurrentRequests, DefaultMaxConcurrentRequests)
section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize)
section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout)
section.AddKnownKey(ConfigBatchMaxDispatchConcurrency, DefaultConfigDispatchConcurrency)
Expand Down

0 comments on commit 2e424e9

Please sign in to comment.