Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[OCC] Add sync fallback for high incarnation counts" #459

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 19 additions & 60 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ const (
statusValidated status = "validated"
// statusWaiting tasks are waiting for another tx to complete
statusWaiting status = "waiting"
// maximumIncarnation before we revert to sequential (for high conflict rates)
maximumIncarnation = 5
)

type deliverTxTask struct {
Expand All @@ -46,22 +44,14 @@ type deliverTxTask struct {

mx sync.RWMutex
Status status
Dependencies map[int]struct{}
Dependencies []int
Abort *occ.Abort
Index int
Incarnation int
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
}

// AppendDependencies appends the given indexes to the task's dependencies
func (dt *deliverTxTask) AppendDependencies(deps []int) {
dt.mx.Lock()
defer dt.mx.Unlock()
for _, taskIdx := range deps {
dt.Dependencies[taskIdx] = struct{}{}
}
ValidateCh chan status
}

func (dt *deliverTxTask) IsStatus(s status) bool {
Expand All @@ -81,11 +71,13 @@ func (dt *deliverTxTask) Reset() {
dt.Response = nil
dt.Abort = nil
dt.AbortCh = nil
dt.Dependencies = nil
dt.VersionStores = nil
}

func (dt *deliverTxTask) Increment() {
dt.Incarnation++
dt.ValidateCh = make(chan status, 1)
}

// Scheduler processes tasks concurrently
Expand All @@ -102,8 +94,6 @@ type scheduler struct {
executeCh chan func()
validateCh chan func()
metrics *schedulerMetrics
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
}

// NewScheduler creates a new scheduler
Expand Down Expand Up @@ -140,18 +130,10 @@ func start(ctx context.Context, ch chan func(), workers int) {
}

func (s *scheduler) DoValidate(work func()) {
if s.synchronous {
work()
return
}
s.validateCh <- work
}

func (s *scheduler) DoExecute(work func()) {
if s.synchronous {
work()
return
}
s.executeCh <- work
}

Expand All @@ -168,7 +150,7 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
}
}
// any non-ok value makes valid false
valid = valid && ok
valid = ok && valid
}
sort.Ints(conflicts)
return valid, conflicts
Expand All @@ -178,20 +160,25 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
Request: r.Request,
Index: idx,
Dependencies: map[int]struct{}{},
Status: statusPending,
Request: r.Request,
Index: idx,
Status: statusPending,
ValidateCh: make(chan status, 1),
})
}
return res
}

func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
res := make([]types.ResponseDeliverTx, 0, len(tasks))
var maxIncarnation int
for _, t := range tasks {
if t.Incarnation > maxIncarnation {
maxIncarnation = t.Incarnation
}
res = append(res, *t.Response)
}
s.metrics.maxIncarnation = maxIncarnation
return res
}

Expand All @@ -207,25 +194,15 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) {
s.multiVersionStores = mvs
}

func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool {
for i := range deps {
func indexesValidated(tasks []*deliverTxTask, idx []int) bool {
for _, i := range idx {
if !tasks[i].IsStatus(statusValidated) {
return false
}
}
return true
}

func filterTasks(tasks []*deliverTxTask, filter func(*deliverTxTask) bool) []*deliverTxTask {
var res []*deliverTxTask
for _, t := range tasks {
if filter(t) {
res = append(res, t)
}
}
return res
}

func allValidated(tasks []*deliverTxTask) bool {
for _, t := range tasks {
if !t.IsStatus(statusValidated) {
Expand Down Expand Up @@ -288,16 +265,6 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t

toExecute := tasks
for !allValidated(tasks) {
// if the max incarnation >= 5, we should revert to synchronous
if s.maxIncarnation >= maximumIncarnation {
// process synchronously
s.synchronous = true
// execute all non-validated tasks (no more "waiting" status)
toExecute = filterTasks(tasks, func(t *deliverTxTask) bool {
return !t.IsStatus(statusValidated)
})
}

var err error

// execute sets statuses of tasks to either executed or aborted
Expand All @@ -320,7 +287,6 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
for _, mv := range s.multiVersionStores {
mv.WriteLatestToStore()
}
s.metrics.maxIncarnation = s.maxIncarnation
return s.collectResponses(tasks), nil
}

Expand All @@ -337,13 +303,13 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
// TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case
if valid, conflicts := s.findConflicts(task); !valid {
s.invalidateTask(task)
task.AppendDependencies(conflicts)

// if the conflicts are now validated, then rerun this task
if dependenciesValidated(s.allTasks, task.Dependencies) {
if indexesValidated(s.allTasks, conflicts) {
return true
} else {
// otherwise, wait for completion
task.Dependencies = conflicts
task.SetStatus(statusWaiting)
return false
}
Expand All @@ -357,7 +323,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {

case statusWaiting:
// if conflicts are done, then this task is ready to run again
return dependenciesValidated(s.allTasks, task.Dependencies)
return indexesValidated(s.allTasks, task.Dependencies)
}
panic("unexpected status: " + task.Status)
}
Expand Down Expand Up @@ -405,10 +371,6 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
defer mx.Unlock()
t.Reset()
t.Increment()
// update max incarnation for scheduler
if t.Incarnation > s.maxIncarnation {
s.maxIncarnation = t.Incarnation
}
res = append(res, t)
}
})
Expand Down Expand Up @@ -512,10 +474,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
// read the first abort from the channel
abort, ok := <-task.AbortCh
if ok {
// if there is an abort item that means we need to wait on the dependent tx
task.SetStatus(statusWaiting)
task.Abort = &abort
task.AppendDependencies([]int{abort.DependentTxIdx})
}
// write from version store to multiversion stores
for _, v := range task.VersionStores {
Expand Down
48 changes: 46 additions & 2 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
_ "net/http/pprof"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -69,6 +71,14 @@ func initTestCtx(injectStores bool) sdk.Context {
return ctx
}

func generateTasks(count int) []*deliverTxTask {
var res []*deliverTxTask
for i := 0; i < count; i++ {
res = append(res, &deliverTxTask{Index: i})
}
return res
}

func TestProcessAll(t *testing.T) {
runtime.SetBlockProfileRate(1)

Expand Down Expand Up @@ -184,7 +194,7 @@ func TestProcessAll(t *testing.T) {
{
name: "Test every tx accesses same key",
workers: 50,
runs: 5,
runs: 1,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (response types.ResponseDeliverTx) {
Expand Down Expand Up @@ -264,6 +274,41 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test every tx accesses same key with delays",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expected := ""
for idx, response := range res {
expected = expected + fmt.Sprintf("%d", idx)
require.Equal(t, expected, response.Info)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, expected, string(latest))
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand All @@ -285,7 +330,6 @@ func TestProcessAll(t *testing.T) {
}

res, err := s.ProcessAll(ctx, tt.requests)
require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIncarnation)
require.Len(t, res, len(tt.requests))

if !errors.Is(err, tt.expectedErr) {
Expand Down
Loading