From a9e30f776548445c6f938d2e1a6bd89876cd36dd Mon Sep 17 00:00:00 2001 From: Ferret-san Date: Tue, 4 Jun 2024 20:23:17 +0000 Subject: [PATCH] Update deps, correct fallback to DAS, improve gas logic --- arbnode/batch_poster.go | 61 ++++++++++++++------------ arbnode/node.go | 4 +- cmd/replay/main.go | 11 ++--- das/celestia/celestia.go | 93 ++++++++++++++++++++++++++++------------ go.mod | 4 +- go.sum | 10 +++++ 6 files changed, 118 insertions(+), 65 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index b281481784..60fd703019 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -1207,38 +1207,45 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if config.DisableCelestiaFallbackStoreDataOnChain && config.DisableCelestiaFallbackStoreDataOnDAS { return false, errors.New("unable to post batch to Celestia and fallback storing data on chain and das is disabled") } - log.Warn("Falling back to storing data on chain", "err", err) - } else { - sequencerMsg = celestiaMsg - } - } + if config.DisableCelestiaFallbackStoreDataOnDAS { + log.Warn("Falling back to storing data on chain ", "err", err) + } else { + log.Warn("Falling back to storing data on DAC ", "err", err) - if b.daWriter != nil { - if b.celestiaWriter != nil && config.DisableCelestiaFallbackStoreDataOnDAS { - return false, errors.New("found Celestia DA enabled and DAS, but fallbacks to DAS aredisabled") - } - if !b.redisLock.AttemptLock(ctx) { - return false, errAttemptLockFailed - } + } - gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx) - if err != nil { - return false, err - } - if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) { - return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce) - } + // We nest the anytrust logic here for now as using this fork liekly means your primary DA is Celestia + // and the Anytrust DAC is instead used as a fallback + if b.daWriter != nil { + if config.DisableCelestiaFallbackStoreDataOnDAS { + return false, errors.New("found Celestia DA enabled and DAS, but fallbacks to DAS are disabled") + } + if !b.redisLock.AttemptLock(ctx) { + return false, errAttemptLockFailed + } + + gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx) + if err != nil { + return false, err + } + if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) { + return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce) + } - cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled - if errors.Is(err, das.BatchToDasFailed) { - if config.DisableDasFallbackStoreDataOnChain { - return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") + cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled + if errors.Is(err, das.BatchToDasFailed) { + if config.DisableDasFallbackStoreDataOnChain { + return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") + } + log.Warn("Falling back to storing data on chain", "err", err) + } else if err != nil { + return false, err + } else { + sequencerMsg = das.Serialize(cert) + } } - log.Warn("Falling back to storing data on chain", "err", err) - } else if err != nil { - return false, err } else { - sequencerMsg = das.Serialize(cert) + sequencerMsg = celestiaMsg } } diff --git a/arbnode/node.go b/arbnode/node.go index 3a27e1124f..3c3b1fcabd 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -538,7 +538,9 @@ func createNodeImpl( } } else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee { return nil, errors.New("a data availability service is required for this chain, but it was not configured") - } else if config.Celestia.Enable { + } + + if config.Celestia.Enable { celestiaService, err := celestia.NewCelestiaDA(&config.Celestia, nil) if err != nil { return nil, err diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 9f72bae671..438b9793a8 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -319,17 +319,12 @@ func main() { panic(fmt.Sprintf("Error opening state db: %v", err.Error())) } - readMessage := func(arbChainParams params.ArbitrumChainParams) *arbostypes.MessageWithMetadata { + readMessage := func() *arbostypes.MessageWithMetadata { var delayedMessagesRead uint64 if lastBlockHeader != nil { delayedMessagesRead = lastBlockHeader.Nonce.Uint64() } - // TODO: consider removing this panic - if arbChainParams.DataAvailabilityCommittee && arbChainParams.CelestiaDA { - panic(fmt.Sprintf("Error Multiple DA providers enabled: DAC is %v and CelestiaDA is %v", arbChainParams.DataAvailabilityCommittee, arbChainParams.CelestiaDA)) - } - backend := WavmInbox{} var keysetValidationMode = arbstate.KeysetPanicIfInvalid if backend.GetPositionWithinMessage() > 0 { @@ -394,7 +389,7 @@ func main() { // need to add Celestia or just "ExternalDA" as an option to the ArbitrumChainParams // for now we hard code Cthis to treu and hardcode Celestia in `readMessage` // to test the integration - message := readMessage(chainConfig.ArbitrumChainParams) + message := readMessage() chainContext := WavmChainContext{} batchFetcher := func(batchNum uint64) ([]byte, error) { @@ -408,7 +403,7 @@ func main() { } else { // Initialize ArbOS with this init message and create the genesis block. - message := readMessage(params.ArbitrumChainParams{}) + message := readMessage() initMessage, err := message.Message.ParseInitMessage() if err != nil { diff --git a/das/celestia/celestia.go b/das/celestia/celestia.go index c13423f8c3..c6f95011ad 100644 --- a/das/celestia/celestia.go +++ b/das/celestia/celestia.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math/big" + "strings" "time" "github.com/spf13/pflag" @@ -28,20 +29,29 @@ import ( type DAConfig struct { Enable bool `koanf:"enable"` - GasPrice float64 `koanf:"gas-price"` - Rpc string `koanf:"rpc"` - NamespaceId string `koanf:"namespace-id"` - AuthToken string `koanf:"auth-token"` + GasPrice float64 `koanf:"gas-price" reload:"hot"` + GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"` + Rpc string `koanf:"rpc" reload:"hot"` + NamespaceId string `koanf:"namespace-id" ` + AuthToken string `koanf:"auth-token" reload:"hot"` NoopWriter bool `koanf:"noop-writer" reload:"hot"` ValidatorConfig *ValidatorConfig `koanf:"validator-config"` } type ValidatorConfig struct { - TendermintRPC string `koanf:"tendermint-rpc"` - EthClient string `koanf:"eth-rpc"` + TendermintRPC string `koanf:"tendermint-rpc" reload:"hot"` + EthClient string `koanf:"eth-rpc" reload:"hot"` BlobstreamAddr string `koanf:"blobstream"` } +var ( + // ErrTxTimedout is the error message returned by the DA when mempool is congested + ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block") + + // ErrTxAlreadyInMempool is the error message returned by the DA when tx is already in mempool + ErrTxAlreadyInMempool = errors.New("tx already in mempool") +) + // CelestiaMessageHeaderFlag indicates that this data is a Blob Pointer // which will be used to retrieve data from Celestia const CelestiaMessageHeaderFlag byte = 0x63 @@ -162,42 +172,69 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) return nil, err } - commitment, err := blob.CreateCommitment(dataBlob) - if err != nil { - log.Warn("Error creating commitment", "err", err) - return nil, err - } + height := uint64(0) + submitted := false + // this will trigger node to use the default gas price from celestia app + gasPrice := -1.0 + for !submitted { + height, err = c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, gasPrice) + if err != nil { + switch { + case strings.Contains(err.Error(), ErrTxTimedout.Error()), strings.Contains(err.Error(), ErrTxAlreadyInMempool.Error()): + log.Warn("Failed to submit blob, bumping gas price and retrying...", "err", err) + if gasPrice == -1.0 { + gasPrice = c.Cfg.GasPrice + } else { + gasPrice = gasPrice * c.Cfg.GasMultiplier + } + continue + default: + log.Warn("Blob Submission error", "err", err) + return nil, err + } + } - height, err := c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, openrpc.GasPrice(c.Cfg.GasPrice)) - if err != nil { - log.Warn("Blob Submission error", "err", err) - return nil, err - } - if height == 0 { - log.Warn("Unexpected height from blob response", "height", height) - return nil, errors.New("unexpected response code") + if height == 0 { + log.Warn("Unexpected height from blob response", "height", height) + return nil, errors.New("unexpected response code") + } + + submitted = true } - proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, commitment) + proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { log.Warn("Error retrieving proof", "err", err) return nil, err } - included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, commitment) + proofRetries := 0 + for proofs == nil { + log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries) + time.Sleep(time.Millisecond * 100) + proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) + if err != nil { + log.Warn("Error retrieving proof", "err", err) + return nil, err + } + proofRetries++ + } + + included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment) if err != nil || !included { log.Warn("Error checking for inclusion", "err", err, "proof", proofs) return nil, err } - log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(commitment)) + log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment)) // we fetch the blob so that we can get the correct start index in the square - blob, err := c.Client.Blob.Get(ctx, height, *c.Namespace, commitment) + dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { return nil, err } - if blob.Index <= 0 { - log.Warn("Unexpected index from blob response", "index", blob.Index) + + if dataBlob.Index() <= 0 { + log.Warn("Unexpected index from blob response", "index", dataBlob.Index()) return nil, errors.New("unexpected response code") } @@ -213,7 +250,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) } txCommitment, dataRoot := [32]byte{}, [32]byte{} - copy(txCommitment[:], commitment) + copy(txCommitment[:], dataBlob.Commitment) copy(dataRoot[:], header.DataHash) @@ -222,12 +259,12 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) // ODS size odsSize := squareSize / 2 - blobIndex := uint64(blob.Index) + blobIndex := uint64(dataBlob.Index()) // startRow startRow := blobIndex / squareSize if odsSize*startRow > blobIndex { // return an empty batch - return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, blob.Index) + return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index()) } startIndexOds := blobIndex - odsSize*startRow blobPointer := types.BlobPointer{ diff --git a/go.mod b/go.mod index 22587ae188..5282a09861 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.10 github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9 github.com/cavaliergopher/grab/v3 v3.0.1 - github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1 + github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a github.com/celestiaorg/nmt v0.20.0 github.com/celestiaorg/rsmt2d v0.11.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 @@ -85,6 +85,8 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/celestiaorg/go-fraud v0.2.0 // indirect github.com/celestiaorg/go-header v0.4.1 // indirect + github.com/celestiaorg/go-square v1.0.1 // indirect + github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b // indirect github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/ceramicnetwork/go-dag-jose v0.1.0 // indirect diff --git a/go.sum b/go.sum index 2ff0462f8f..041c14d823 100644 --- a/go.sum +++ b/go.sum @@ -176,10 +176,20 @@ github.com/celestiaorg/celestia-core v1.29.0-tm-v0.34.29 h1:Fd7ymPUzExPGNl2gZw4i github.com/celestiaorg/celestia-core v1.29.0-tm-v0.34.29/go.mod h1:xrICN0PBhp3AdTaZ8q4wS5Jvi32V02HNjaC2EsWiEKk= github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1 h1:CLhcfNP4496pg0aptcgHJubNXoY97PMHF0sDWx4HRrg= github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1/go.mod h1:+2xwD+PBy76D2XOAwDbkuNVUSAvwUFV54cQqMFBA1s0= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240530213251-6ed9977848e1 h1:QTIYNnZfdh5yGFkmB+XNWtKy1q336iiYP3WDKZwkEe0= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240530213251-6ed9977848e1/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174244-9787dffd2ad6 h1:5fph1ybOZAFJqLvYmTayBY0EgvzerxgbmLHUpAu1z5I= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174244-9787dffd2ad6/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a h1:T9vsMQvvYvA/1FObT0eqL0J+XJJx/wG6Jzf+DH+IFxc= +github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc= github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I= github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.4.1 h1:bjbUcKDnhrJJ9EoE7vtPpgleNLVjc2S+cB4/qe8nQmo= github.com/celestiaorg/go-header v0.4.1/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c= +github.com/celestiaorg/go-square v1.0.1 h1:LEG1zrw4i03VBMElQF8GAbKYgh1bT1uGzWxasU2ePuo= +github.com/celestiaorg/go-square v1.0.1/go.mod h1:XMv5SGCeGSkynW2OOsedugaW/rQlvzxGzWGxTKsyYOU= +github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b h1:jo6M4RJnr33sQC/TTraP5gA6ZgbFO/QqzX8e/lIQC7Q= +github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b/go.mod h1:86qIYnEhmn/hfW+xvw98NOI3zGaDEB3x8JGjYo2FqLs= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4/go.mod h1:fzuHnhzj1pUygGz+1ZkB3uQbEUL4htqCGJ4Qs2LwMZA= github.com/celestiaorg/nmt v0.20.0 h1:9i7ultZ8Wv5ytt8ZRaxKQ5KOOMo4A2K2T/aPGjIlSas=