Skip to content

Commit

Permalink
add timer
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders committed Mar 13, 2024
1 parent f7f7e2f commit 2b98f1f
Showing 1 changed file with 7 additions and 12 deletions.
19 changes: 7 additions & 12 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/store/multiversion"
store "github.com/cosmos/cosmos-sdk/store/types"
Expand Down Expand Up @@ -274,9 +275,6 @@ func (s *scheduler) emitMetrics() {
func (s *scheduler) reportAll() {
sm := make(map[status]int)
for _, t := range s.allTasks {
if t.Status == statusExecuted {
fmt.Println("Executed status TX", t.AbsoluteIndex, "Incarnation", t.Incarnation, "Dependencies", t.Dependencies)
}
if _, ok := sm[t.Status]; !ok {
sm[t.Status] = 0
}
Expand All @@ -286,6 +284,12 @@ func (s *scheduler) reportAll() {
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
startTime := time.Now()
var validationCycles int
defer func() {
fmt.Printf("Scheduler ProcessAll duration:%s, iterations=%d", time.Since(startTime), validationCycles)
}()

// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
// prefill estimates
Expand Down Expand Up @@ -313,10 +317,8 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
// validation tasks uses length of tasks to avoid blocking on validation
start(workerCtx, s.validateCh, len(tasks))

validationCycles := 0
toExecute := tasks
for !allValidated(tasks) {
fmt.Println("first report All")
s.reportAll()
// if the max incarnation >= 5, we should revert to synchronous
if validationCycles >= maximumIterations {
Expand Down Expand Up @@ -359,10 +361,6 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
// these are retries which apply to metrics
s.metrics.retries += len(toExecute)
validationCycles++
for _, t := range toExecute {
fmt.Println("ToExecute abs indices", t.AbsoluteIndex)
}
fmt.Println("last report All")
s.reportAll()
}

Expand All @@ -386,7 +384,6 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
// TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case
if valid, conflicts := s.findConflicts(task); !valid {
s.invalidateTask(task)
fmt.Println("task invalid", "index", task.AbsoluteIndex, "conflicts", conflicts)
task.AppendDependencies(conflicts)

// if the conflicts are now validated, then rerun this task
Expand Down Expand Up @@ -445,14 +442,12 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
}

wg := &sync.WaitGroup{}
fmt.Println("Validating all with start idx (abs)", tasks[startIdx].AbsoluteIndex)
for i := startIdx; i < len(tasks); i++ {
wg.Add(1)
t := tasks[i]
s.DoValidate(func() {
defer wg.Done()
if !s.validateTask(ctx, t) {
fmt.Println("scheduler validation failed", "sync", s.synchronous, "task", t.AbsoluteIndex, "status", t.Status, "incarnation", t.Incarnation, "dependencies", t.Dependencies)
mx.Lock()
defer mx.Unlock()
t.Reset()
Expand Down

0 comments on commit 2b98f1f

Please sign in to comment.