From 63d81eb107d03621598e38fb1ddac9b58ddd76f3 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 13 Mar 2024 13:54:08 -0400 Subject: [PATCH 1/3] Revert "[OCC] Add sync fallback for high incarnation counts (#452)" This reverts commit 61d6380800857df4631e2e2fdd1bcf0e215da3bc. --- tasks/scheduler.go | 76 +++++++++++------------------------------ tasks/scheduler_test.go | 11 ++++-- 2 files changed, 28 insertions(+), 59 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index f27566d88..a79042ee5 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -36,8 +36,6 @@ const ( statusValidated status = "validated" // statusWaiting tasks are waiting for another tx to complete statusWaiting status = "waiting" - // maximumIncarnation before we revert to sequential (for high conflict rates) - maximumIncarnation = 5 ) type deliverTxTask struct { @@ -46,22 +44,14 @@ type deliverTxTask struct { mx sync.RWMutex Status status - Dependencies map[int]struct{} + Dependencies []int Abort *occ.Abort Index int Incarnation int Request types.RequestDeliverTx Response *types.ResponseDeliverTx VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore -} - -// AppendDependencies appends the given indexes to the task's dependencies -func (dt *deliverTxTask) AppendDependencies(deps []int) { - dt.mx.Lock() - defer dt.mx.Unlock() - for _, taskIdx := range deps { - dt.Dependencies[taskIdx] = struct{}{} - } + ValidateCh chan status } func (dt *deliverTxTask) IsStatus(s status) bool { @@ -81,11 +71,13 @@ func (dt *deliverTxTask) Reset() { dt.Response = nil dt.Abort = nil dt.AbortCh = nil + dt.Dependencies = nil dt.VersionStores = nil } func (dt *deliverTxTask) Increment() { dt.Incarnation++ + dt.ValidateCh = make(chan status, 1) } // Scheduler processes tasks concurrently @@ -102,8 +94,6 @@ type scheduler struct { executeCh chan func() validateCh chan func() metrics *schedulerMetrics - synchronous bool // true if maxIncarnation exceeds threshold - maxIncarnation int // current highest incarnation } // NewScheduler creates a new scheduler @@ -140,18 +130,10 @@ func start(ctx context.Context, ch chan func(), workers int) { } func (s *scheduler) DoValidate(work func()) { - if s.synchronous { - work() - return - } s.validateCh <- work } func (s *scheduler) DoExecute(work func()) { - if s.synchronous { - work() - return - } s.executeCh <- work } @@ -168,7 +150,7 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { } } // any non-ok value makes valid false - valid = valid && ok + valid = ok && valid } sort.Ints(conflicts) return valid, conflicts @@ -178,10 +160,10 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { res := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { res = append(res, &deliverTxTask{ - Request: r.Request, - Index: idx, - Dependencies: map[int]struct{}{}, - Status: statusPending, + Request: r.Request, + Index: idx, + Status: statusPending, + ValidateCh: make(chan status, 1), }) } return res @@ -189,9 +171,14 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { res := make([]types.ResponseDeliverTx, 0, len(tasks)) + var maxIncarnation int for _, t := range tasks { + if t.Incarnation > maxIncarnation { + maxIncarnation = t.Incarnation + } res = append(res, *t.Response) } + s.metrics.maxIncarnation = maxIncarnation return res } @@ -207,8 +194,8 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { s.multiVersionStores = mvs } -func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool { - for i := range deps { +func indexesValidated(tasks []*deliverTxTask, idx []int) bool { + for _, i := range idx { if !tasks[i].IsStatus(statusValidated) { return false } @@ -216,16 +203,6 @@ func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool { return true } -func filterTasks(tasks []*deliverTxTask, filter func(*deliverTxTask) bool) []*deliverTxTask { - var res []*deliverTxTask - for _, t := range tasks { - if filter(t) { - res = append(res, t) - } - } - return res -} - func allValidated(tasks []*deliverTxTask) bool { for _, t := range tasks { if !t.IsStatus(statusValidated) { @@ -288,16 +265,6 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t toExecute := tasks for !allValidated(tasks) { - // if the max incarnation >= 5, we should revert to synchronous - if s.maxIncarnation >= maximumIncarnation { - // process synchronously - s.synchronous = true - // execute all non-validated tasks (no more "waiting" status) - toExecute = filterTasks(tasks, func(t *deliverTxTask) bool { - return !t.IsStatus(statusValidated) - }) - } - var err error // execute sets statuses of tasks to either executed or aborted @@ -320,7 +287,6 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t for _, mv := range s.multiVersionStores { mv.WriteLatestToStore() } - s.metrics.maxIncarnation = s.maxIncarnation return s.collectResponses(tasks), nil } @@ -337,13 +303,13 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { // TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case if valid, conflicts := s.findConflicts(task); !valid { s.invalidateTask(task) - task.AppendDependencies(conflicts) // if the conflicts are now validated, then rerun this task - if dependenciesValidated(s.allTasks, task.Dependencies) { + if indexesValidated(s.allTasks, conflicts) { return true } else { // otherwise, wait for completion + task.Dependencies = conflicts task.SetStatus(statusWaiting) return false } @@ -357,7 +323,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { case statusWaiting: // if conflicts are done, then this task is ready to run again - return dependenciesValidated(s.allTasks, task.Dependencies) + return indexesValidated(s.allTasks, task.Dependencies) } panic("unexpected status: " + task.Status) } @@ -405,10 +371,6 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del defer mx.Unlock() t.Reset() t.Increment() - // update max incarnation for scheduler - if t.Incarnation > s.maxIncarnation { - s.maxIncarnation = t.Incarnation - } res = append(res, t) } }) diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 22378cffa..45bf7be06 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -69,6 +69,14 @@ func initTestCtx(injectStores bool) sdk.Context { return ctx } +func generateTasks(count int) []*deliverTxTask { + var res []*deliverTxTask + for i := 0; i < count; i++ { + res = append(res, &deliverTxTask{Index: i}) + } + return res +} + func TestProcessAll(t *testing.T) { runtime.SetBlockProfileRate(1) @@ -184,7 +192,7 @@ func TestProcessAll(t *testing.T) { { name: "Test every tx accesses same key", workers: 50, - runs: 5, + runs: 1, addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (response types.ResponseDeliverTx) { @@ -285,7 +293,6 @@ func TestProcessAll(t *testing.T) { } res, err := s.ProcessAll(ctx, tt.requests) - require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIncarnation) require.Len(t, res, len(tt.requests)) if !errors.Is(err, tt.expectedErr) { From e039ece685234aac03b32f70f930fff0005c89af Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 13 Mar 2024 13:58:13 -0400 Subject: [PATCH 2/3] fix compilation --- tasks/scheduler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index a79042ee5..74be334c4 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -474,10 +474,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) { // read the first abort from the channel abort, ok := <-task.AbortCh if ok { - // if there is an abort item that means we need to wait on the dependent tx - task.SetStatus(statusWaiting) task.Abort = &abort - task.AppendDependencies([]int{abort.DependentTxIdx}) } // write from version store to multiversion stores for _, v := range task.VersionStores { From 0b3e75bbe41676937b1f9492a78440123bfc872c Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 13 Mar 2024 14:05:15 -0400 Subject: [PATCH 3/3] add test --- tasks/scheduler_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 45bf7be06..0d53198f3 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "math/rand" "net/http" _ "net/http/pprof" "runtime" "testing" + "time" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/types" @@ -272,6 +274,41 @@ func TestProcessAll(t *testing.T) { }, expectedErr: nil, }, + { + name: "Test every tx accesses same key with delays", + workers: 50, + runs: 1, + addStores: true, + requests: requestList(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) + wait := rand.Intn(10) + time.Sleep(time.Duration(wait) * time.Millisecond) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + time.Sleep(time.Duration(wait) * time.Millisecond) + // write to the store with this tx's index + newVal := val + fmt.Sprintf("%d", ctx.TxIndex()) + kv.Set(itemKey, []byte(newVal)) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: newVal, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + expected := "" + for idx, response := range res { + expected = expected + fmt.Sprintf("%d", idx) + require.Equal(t, expected, response.Info) + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, expected, string(latest)) + }, + expectedErr: nil, + }, } for _, tt := range tests {