Skip to content

Commit

Permalink
Merge branch 'master' into illia-malachyn/6754-deprecate-legacy-webso…
Browse files Browse the repository at this point in the history
…ckets-stream-api
  • Loading branch information
peterargue authored Jan 22, 2025
2 parents e1fe894 + 92474d3 commit 9cf78e9
Show file tree
Hide file tree
Showing 82 changed files with 2,090 additions and 2,066 deletions.
8 changes: 4 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ import (
storageerr "github.com/onflow/flow-go/storage"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/procedure"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
sutil "github.com/onflow/flow-go/storage/util"
)

Expand Down Expand Up @@ -734,10 +736,8 @@ func (exeNode *ExecutionNode) LoadExecutionState(
}
return nil
})
// chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache,
// chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache,
chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
Expand Down
27 changes: 24 additions & 3 deletions cmd/util/cmd/check-storage/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package check_storage
import (
"context"

"github.com/onflow/cadence/interpreter"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -322,17 +323,37 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo
// Check atree storage health

ledger := &registers.ReadOnlyLedger{Registers: accountRegisters}
storage := runtime.NewStorage(ledger, nil)
storage := runtime.NewStorage(ledger, nil, runtime.StorageConfig{})

inter, err := interpreter.NewInterpreter(
nil,
nil,
&interpreter.Config{
Storage: storage,
},
)
if err != nil {
issues = append(
issues,
accountStorageIssue{
Address: address.Hex(),
Kind: storageErrorKindString[otherErrorKind],
Msg: err.Error(),
},
)
return issues
}

err = util.CheckStorageHealth(address, storage, accountRegisters, util.StorageMapDomains, nWorkers)
err = util.CheckStorageHealth(inter, address, storage, accountRegisters, common.AllStorageDomains, nWorkers)
if err != nil {
issues = append(
issues,
accountStorageIssue{
Address: address.Hex(),
Kind: storageErrorKindString[cadenceAtreeStorageErrorKind],
Msg: err.Error(),
})
},
)
}

// TODO: check health of non-atree registers
Expand Down
29 changes: 24 additions & 5 deletions cmd/util/cmd/check-storage/evm_account_storage_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"
"slices"

"github.com/onflow/cadence/interpreter"
"golang.org/x/exp/maps"

"github.com/onflow/atree"

"github.com/onflow/cadence/common"
"github.com/onflow/cadence/runtime"

"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/fvm/evm/emulator/state"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -70,16 +70,35 @@ func checkCadenceAtreeRegistersInEVMAccount(
) []accountStorageIssue {
var issues []accountStorageIssue

storage := runtime.NewStorage(ledger, nil)
storage := runtime.NewStorage(ledger, nil, runtime.StorageConfig{})

inter, err := interpreter.NewInterpreter(
nil,
nil,
&interpreter.Config{
Storage: storage,
},
)
if err != nil {
issues = append(
issues,
accountStorageIssue{
Address: address.Hex(),
Kind: storageErrorKindString[otherErrorKind],
Msg: fmt.Sprintf("failed to create interpreter for cadence registers: %s", err),
},
)
return issues
}

// Load Cadence domains storage map, so atree slab iterator can traverse connected slabs from loaded root slab.
// NOTE: don't preload all atree slabs in evm account because evm-atree registers require evm-atree decoder.

for _, domain := range util.StorageMapDomains {
_ = storage.GetStorageMap(address, domain, false)
for _, domain := range common.AllStorageDomains {
_ = storage.GetDomainStorageMap(inter, address, domain, false)
}

err := storage.CheckHealth()
err = storage.CheckHealth()
if err != nil {
issues = append(
issues,
Expand Down
15 changes: 10 additions & 5 deletions cmd/util/cmd/check-storage/evm_account_storage_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func createEVMStorage(t *testing.T, ledger atree.Ledger, address common.Address)
}

func createCadenceStorage(t *testing.T, ledger atree.Ledger, address common.Address) {
storage := runtime.NewStorage(ledger, nil)

storage := runtime.NewStorage(ledger, nil, runtime.StorageConfig{})

inter, err := interpreter.NewInterpreter(
nil,
Expand All @@ -123,13 +124,17 @@ func createCadenceStorage(t *testing.T, ledger atree.Ledger, address common.Addr
require.NoError(t, err)

// Create storage and public domains
for _, domain := range []string{"storage", "public"} {
storageDomain := storage.GetStorageMap(address, domain, true)
for _, domain := range []common.StorageDomain{
common.StorageDomainPathStorage,
common.StorageDomainPathPublic,
} {
storageDomain := storage.GetDomainStorageMap(inter, address, domain, true)

// Create large domain map so there are more than one atree registers under the hood.
for i := 0; i < 100; i++ {
key := interpreter.StringStorageMapKey(domain + "_key_" + strconv.Itoa(i))
value := interpreter.NewUnmeteredStringValue(domain + "_value_" + strconv.Itoa(i))
domainStr := domain.Identifier()
key := interpreter.StringStorageMapKey(domainStr + "_key_" + strconv.Itoa(i))
value := interpreter.NewUnmeteredStringValue(domainStr + "_value_" + strconv.Itoa(i))
storageDomain.SetValue(inter, key, value)
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/util/cmd/checkpoint-collect-stats/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/montanaflynn/stats"
"github.com/onflow/cadence/common"
"github.com/pkg/profile"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -460,7 +461,7 @@ func getTypeStats(t string, values []float64) RegisterStatsByTypes {
}

func getStats(valueSizesByType sizesByType) []RegisterStatsByTypes {
domainStats := make([]RegisterStatsByTypes, 0, len(util.StorageMapDomains))
domainStats := make([]RegisterStatsByTypes, 0, len(common.AllStorageDomains))
var allDomainSizes []float64

statsByTypes := make([]RegisterStatsByTypes, 0, len(valueSizesByType))
Expand Down Expand Up @@ -514,7 +515,7 @@ func getType(key ledger.Key) string {
return "atree slab"
}

isDomain := slices.Contains(util.StorageMapDomains, kstr)
_, isDomain := common.AllStorageDomainsByIdentifier[kstr]
if isDomain {
return domainTypePrefix + kstr
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/diff-states/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func diffAccount(
).DiffStates(
accountRegisters1,
accountRegisters2,
util.StorageMapDomains,
common.AllStorageDomains,
)
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/util/cmd/generate-authorization-fixes/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/cadence/common"
"github.com/onflow/cadence/interpreter"
"github.com/onflow/cadence/sema"
"github.com/onflow/cadence/stdlib"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -334,9 +333,10 @@ func (g *AuthorizationFixGenerator) generateFixesForAccount(address common.Addre
log.Fatal().Err(err)
}

capabilityControllerStorage := mr.Storage.GetStorageMap(
capabilityControllerStorage := mr.Storage.GetDomainStorageMap(
mr.Interpreter,
address,
stdlib.CapabilityControllerStorageDomain,
common.StorageDomainCapabilityController,
false,
)
if capabilityControllerStorage == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import (
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
)

var (
flagHeight uint64
flagDataDir string
flagHeight uint64
flagDataDir string
flagChunkDataPackDir string
)

var Cmd = &cobra.Command{
Expand All @@ -36,11 +40,16 @@ func init() {
Cmd.Flags().StringVar(&flagDataDir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk_data_pack_dir", "/var/flow/data/chunk_data_pack",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("chunk_data_pack_dir")
}

func run(*cobra.Command, []string) {
log.Info().
Str("datadir", flagDataDir).
Str("chunk_data_pack_dir", flagChunkDataPackDir).
Uint64("height", flagHeight).
Msg("flags")

Expand All @@ -60,18 +69,28 @@ func run(*cobra.Command, []string) {
metrics := &metrics.NoopCollector{}
transactionResults := badger.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
commits := badger.NewCommits(metrics, db)
chunkDataPacks := badger.NewChunkDataPacks(metrics, db, badger.NewCollections(db, badger.NewTransactions(metrics, db)), badger.DefaultCacheSize)
collections := badger.NewCollections(db, badger.NewTransactions(metrics, db))
results := badger.NewExecutionResults(metrics, db)
receipts := badger.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize)
myReceipts := badger.NewMyExecutionReceipts(metrics, db, receipts)
headers := badger.NewHeaders(metrics, db)
events := badger.NewEvents(metrics, db)
serviceEvents := badger.NewServiceEvents(metrics, db)

// require the chunk data pack data must exist before returning the storage module
chunkDataPacksPebbleDB, err := storagepebble.MustOpenDefaultPebbleDB(flagChunkDataPackDir)
if err != nil {
log.Fatal().Err(err).Msgf("could not open chunk data pack DB at %v", flagChunkDataPackDir)
}
chunkDataPacksDB := pebbleimpl.ToDB(chunkDataPacksPebbleDB)
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, collections, 1000)

writeBatch := badger.NewBatch(db)
chunkBatch := chunkDataPacksDB.NewBatch()

err = removeExecutionResultsFromHeight(
writeBatch,
chunkBatch,
state,
headers,
transactionResults,
Expand All @@ -86,6 +105,13 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msgf("could not remove result from height %v", flagHeight)
}

// remove chunk data packs first, because otherwise the index to find chunk data pack will be removed.
err = chunkBatch.Commit()
if err != nil {
log.Fatal().Err(err).Msgf("could not commit chunk batch at %v", flagHeight)
}

err = writeBatch.Flush()
if err != nil {
log.Fatal().Err(err).Msgf("could not flush write batch at %v", flagHeight)
Expand All @@ -109,11 +135,12 @@ func run(*cobra.Command, []string) {
// need to include the Remove methods
func removeExecutionResultsFromHeight(
writeBatch *badger.Batch,
chunkBatch storage.Batch,
protoState protocol.State,
headers *badger.Headers,
transactionResults *badger.TransactionResults,
commits *badger.Commits,
chunkDataPacks *badger.ChunkDataPacks,
chunkDataPacks storage.ChunkDataPacks,
results *badger.ExecutionResults,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
Expand Down Expand Up @@ -148,7 +175,7 @@ func removeExecutionResultsFromHeight(

blockID := head.ID()

err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
err = removeForBlockID(writeBatch, chunkBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
if err != nil {
return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err)
}
Expand All @@ -167,7 +194,7 @@ func removeExecutionResultsFromHeight(
total = len(pendings)

for _, pending := range pendings {
err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)
err = removeForBlockID(writeBatch, chunkBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)

if err != nil {
return fmt.Errorf("could not remove result for pending block %v: %w", pending, err)
Expand All @@ -188,11 +215,12 @@ func removeExecutionResultsFromHeight(
// It bubbles up any error encountered
func removeForBlockID(
writeBatch *badger.Batch,
chunkBatch storage.Batch,
headers *badger.Headers,
commits *badger.Commits,
transactionResults *badger.TransactionResults,
results *badger.ExecutionResults,
chunks *badger.ChunkDataPacks,
chunks storage.ChunkDataPacks,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
serviceEvents *badger.ServiceEvents,
Expand All @@ -211,12 +239,7 @@ func removeForBlockID(
for _, chunk := range result.Chunks {
chunkID := chunk.ID()
// remove chunk data pack
err := chunks.BatchRemove(chunkID, writeBatch)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msgf("chunk %v not found for block %v", chunkID, blockID)
continue
}

err := chunks.BatchRemove(chunkID, chunkBatch)
if err != nil {
return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err)
}
Expand Down
Loading

0 comments on commit 9cf78e9

Please sign in to comment.