Skip to content

Commit

Permalink
before starting local pipeline, load stores in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Feb 4, 2025
1 parent ad1f022 commit 475568f
Showing 1 changed file with 42 additions and 4 deletions.
46 changes: 42 additions & 4 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stage

import (
"context"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -517,20 +518,57 @@ func (s *Stages) previousUnitComplete(u Unit) bool {
return state == UnitCompleted || state == UnitNoOp
}

type loadedStore struct {
name string
kv *store.FullKV
err error
}

func (s *Stages) FinalStoreMap(exclusiveEndBlock uint64) (store.Map, error) {
out := store.NewMap()

var storeModuleStates []*StoreModuleState
for _, stage := range s.stages {
if stage.kind != KindStore {
continue
}
for _, modState := range stage.storeModuleStates {
storeModuleStates = append(storeModuleStates, modState)
}
}

out := store.NewMap()
if len(storeModuleStates) == 0 {
return out, nil
}

loadingChan := make(chan loadedStore, len(storeModuleStates))
for _, modState := range storeModuleStates {
modState := modState
go func() {
fullKV, err := modState.getStore(s.ctx, exclusiveEndBlock)
if err != nil {
return nil, fmt.Errorf("stores didn't sync up properly, expected store %q to be at block %d but was at %d: %w", modState.name, exclusiveEndBlock, modState.lastBlockInStore, err)
loadingChan <- loadedStore{
name: modState.name,
kv: fullKV,
err: err,
}
out[modState.name] = fullKV
}()
}

var errs error
for loaded := range loadingChan {
if loaded.err != nil {
errs = errors.Join(errs, fmt.Errorf("while loading %s: %w", loaded.name, loaded.err))
continue
}
out[loaded.name] = loaded.kv
if len(out) == len(storeModuleStates) {
close(loadingChan)
}
}
if errs != nil {
return nil, errs
}

return out, nil
}

Expand Down

0 comments on commit 475568f

Please sign in to comment.