From f101345daaf05ad5a1484675636a78528aff0d09 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 25 Jun 2024 18:20:15 -0400 Subject: [PATCH] Begin using CelestiaDASClient --- arbnode/node.go | 4 +- contracts | 2 +- das/celestia/celestia.go | 647 ------------------ das/celestia/celestiaDasRpcClient.go | 92 +++ das/celestia/proof.go | 110 --- das/celestia/types/da_interface.go | 9 + das/celestia/types/square_data.go | 10 - ...=> full_celestia_challenge_test.backup_go} | 2 +- 8 files changed, 105 insertions(+), 771 deletions(-) delete mode 100644 das/celestia/celestia.go create mode 100644 das/celestia/celestiaDasRpcClient.go delete mode 100644 das/celestia/proof.go delete mode 100644 das/celestia/types/square_data.go rename system_tests/{full_celestia_challenge_test.go => full_celestia_challenge_test.backup_go} (98%) diff --git a/arbnode/node.go b/arbnode/node.go index b28f002575..2e4a5c1506 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -97,7 +97,7 @@ type Config struct { ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` // SnapSyncConfig is only used for testing purposes, these should not be configured in production. SnapSyncTest SnapSyncConfig - Celestia celestia.DAConfig `koanf:"celestia-cfg"` + Celestia celestia.CelestiaConfig `koanf:"celestia-cfg"` } func (c *Config) Validate() error { @@ -555,7 +555,7 @@ func createNodeImpl( } if config.Celestia.Enable { - celestiaService, err := celestia.NewCelestiaDA(&config.Celestia, nil) + celestiaService, err := celestia.NewCelestiaDASRPCClient(config.Celestia.URL) if err != nil { return nil, err } diff --git a/contracts b/contracts index 0434f475bf..b6266ba420 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit 0434f475bf38bf737b5b79868214ef4964f96de0 +Subproject commit b6266ba420db66723d2629d6d5757860d1d1bce7 diff --git a/das/celestia/celestia.go b/das/celestia/celestia.go deleted file mode 100644 index 1968f78a61..0000000000 --- a/das/celestia/celestia.go +++ /dev/null @@ -1,647 +0,0 @@ -package celestia - -import ( - "bytes" - "context" - "encoding/binary" - "encoding/hex" - "errors" - "fmt" - "math/big" - "strings" - "time" - - "github.com/spf13/pflag" - - openrpc "github.com/celestiaorg/celestia-openrpc" - "github.com/celestiaorg/celestia-openrpc/types/blob" - "github.com/celestiaorg/celestia-openrpc/types/share" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/offchainlabs/nitro/das/celestia/types" - "github.com/offchainlabs/nitro/solgen/go/celestiagen" - - blobstreamx "github.com/succinctlabs/blobstreamx/bindings" - "github.com/tendermint/tendermint/rpc/client/http" -) - -type DAConfig struct { - Enable bool `koanf:"enable"` - GasPrice float64 `koanf:"gas-price" reload:"hot"` - GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"` - Rpc string `koanf:"rpc" reload:"hot"` - NamespaceId string `koanf:"namespace-id" ` - AuthToken string `koanf:"auth-token" reload:"hot"` - NoopWriter bool `koanf:"noop-writer" reload:"hot"` - ValidatorConfig *ValidatorConfig `koanf:"validator-config"` -} - -type ValidatorConfig struct { - TendermintRPC string `koanf:"tendermint-rpc" reload:"hot"` - EthClient string `koanf:"eth-rpc" reload:"hot"` - BlobstreamAddr string `koanf:"blobstream"` -} - -var ( - celestiaDALastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/action/last_success", nil) - celestiaLastNonDefaultGasprice = metrics.NewRegisteredGaugeFloat64("celestia/last_gas_price", nil) - celestiaSuccessCounter = metrics.NewRegisteredCounter("celestia/action/celestia_success", nil) - celestiaFailureCounter = metrics.NewRegisteredCounter("celestia/action/celestia_failure", nil) - celestiaGasRetries = metrics.NewRegisteredCounter("celestia/action/gas_retries", nil) - celestiaBlobInclusionRetries = metrics.NewRegisteredCounter("celestia/action/inclusion_retries", nil) - - celestiaValidationLastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/validation/last_success", nil) - celestiaValidationSuccessCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_success", nil) - celestiaValidationFailureCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_failure", nil) -) - -var ( - // ErrTxTimedout is the error message returned by the DA when mempool is congested - ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block") - - // ErrTxAlreadyInMempool is the error message returned by the DA when tx is already in mempool - ErrTxAlreadyInMempool = errors.New("tx already in mempool") - - // ErrTxIncorrectAccountSequence is the error message returned by the DA when tx has incorrect sequence - ErrTxIncorrectAccountSequence = errors.New("incorrect account sequence") -) - -type CelestiaDA struct { - Cfg *DAConfig - Client *openrpc.Client - Namespace *share.Namespace - Prover *CelestiaProver -} - -type CelestiaProver struct { - Trpc *http.HTTP - EthClient *ethclient.Client - BlobstreamX *blobstreamx.BlobstreamX -} - -func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.Bool(prefix+".enable", false, "Enable Celestia DA") - f.Float64(prefix+".gas-price", 0.01, "Gas for retrying Celestia transactions") - f.Float64(prefix+".gas-multiplier", 1.01, "Gas multiplier for Celestia transactions") - f.String(prefix+".rpc", "", "Rpc endpoint for celestia-node") - f.String(prefix+".namespace-id", "", "Celestia Namespace to post data to") - f.String(prefix+".auth-token", "", "Auth token for Celestia Node") - f.Bool(prefix+".noop-writer", false, "Noop writer (disable posting to celestia)") - f.String(prefix+".validator-config"+".tendermint-rpc", "", "Tendermint RPC endpoint, only used for validation") - f.String(prefix+".validator-config"+".eth-rpc", "", "L1 Websocket connection, only used for validation") - f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation") -} - -// CelestiaMessageHeaderFlag indicates that this data is a Blob Pointer -// which will be used to retrieve data from Celestia -const CelestiaMessageHeaderFlag byte = 0x63 - -func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) { - if cfg == nil { - return nil, errors.New("celestia cfg cannot be blank") - } - daClient, err := openrpc.NewClient(context.Background(), cfg.Rpc, cfg.AuthToken) - if err != nil { - return nil, err - } - - if cfg.NamespaceId == "" { - return nil, errors.New("namespace id cannot be blank") - } - nsBytes, err := hex.DecodeString(cfg.NamespaceId) - if err != nil { - return nil, err - } - - namespace, err := share.NewBlobNamespaceV0(nsBytes) - if err != nil { - return nil, err - } - - if cfg.ValidatorConfig != nil { - trpc, err := http.New(cfg.ValidatorConfig.TendermintRPC, "/websocket") - if err != nil { - log.Error("Unable to establish connection with celestia-core tendermint rpc") - return nil, err - } - err = trpc.Start() - if err != nil { - return nil, err - } - - var ethRpc *ethclient.Client - if ethClient != nil { - ethRpc = ethClient - } else if len(cfg.ValidatorConfig.EthClient) > 0 { - ethRpc, err = ethclient.Dial(cfg.ValidatorConfig.EthClient) - if err != nil { - return nil, err - } - } - - blobstreamx, err := blobstreamx.NewBlobstreamX(common.HexToAddress(cfg.ValidatorConfig.BlobstreamAddr), ethClient) - if err != nil { - return nil, err - } - - return &CelestiaDA{ - Cfg: cfg, - Client: daClient, - Namespace: &namespace, - Prover: &CelestiaProver{ - Trpc: trpc, - EthClient: ethRpc, - BlobstreamX: blobstreamx, - }, - }, nil - - } - - return &CelestiaDA{ - Cfg: cfg, - Client: daClient, - Namespace: &namespace, - }, nil -} - -func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) { - if c.Cfg.NoopWriter { - log.Warn("NoopWriter enabled, falling back", "c.Cfg.NoopWriter", c.Cfg.NoopWriter) - celestiaFailureCounter.Inc(1) - return nil, errors.New("NoopWriter enabled") - } - // set a 5 minute timeout context on submissions - // if it takes longer than that to succesfully submit and verify a blob, - // then there's an issue with the connection to the celestia node - ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Minute*5)) - defer cancel() - dataBlob, err := blob.NewBlobV0(*c.Namespace, message) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("Error creating blob", "err", err) - return nil, err - } - - height := uint64(0) - submitted := false - // this will trigger node to use the default gas price from celestia app - gasPrice := -1.0 - for !submitted { - height, err = c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, gasPrice) - if err != nil { - switch { - case strings.Contains(err.Error(), ErrTxTimedout.Error()), strings.Contains(err.Error(), ErrTxAlreadyInMempool.Error()), strings.Contains(err.Error(), ErrTxIncorrectAccountSequence.Error()): - log.Warn("Failed to submit blob, bumping gas price and retrying...", "err", err) - if gasPrice == -1.0 { - gasPrice = c.Cfg.GasPrice - } else { - gasPrice = gasPrice * c.Cfg.GasMultiplier - } - - celestiaGasRetries.Inc(1) - continue - default: - celestiaFailureCounter.Inc(1) - log.Warn("Blob Submission error", "err", err) - return nil, err - } - } - - if height == 0 { - celestiaFailureCounter.Inc(1) - log.Warn("Unexpected height from blob response", "height", height) - return nil, errors.New("unexpected response code") - } - - submitted = true - - celestiaLastNonDefaultGasprice.Update(gasPrice) - } - - proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("Error retrieving proof", "err", err) - return nil, err - } - - proofRetries := 0 - for proofs == nil { - log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries) - time.Sleep(time.Millisecond * 100) - proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("Error retrieving proof", "err", err) - return nil, err - } - proofRetries++ - celestiaBlobInclusionRetries.Inc(1) - } - - included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment) - if err != nil || !included { - celestiaFailureCounter.Inc(1) - log.Warn("Error checking for inclusion", "err", err, "proof", proofs) - return nil, err - } - log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment)) - - // we fetch the blob so that we can get the correct start index in the square - dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment) - if err != nil { - celestiaFailureCounter.Inc(1) - return nil, err - } - - if dataBlob.Index() <= 0 { - celestiaFailureCounter.Inc(1) - log.Warn("Unexpected index from blob response", "index", dataBlob.Index()) - return nil, errors.New("unexpected response code") - } - - header, err := c.Client.Header.GetByHeight(ctx, height) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("Header retrieval error", "err", err) - return nil, err - } - - sharesLength := uint64(0) - for _, proof := range *proofs { - sharesLength += uint64(proof.End()) - uint64(proof.Start()) - } - - txCommitment, dataRoot := [32]byte{}, [32]byte{} - copy(txCommitment[:], dataBlob.Commitment) - - copy(dataRoot[:], header.DataHash) - - // Row roots give us the length of the EDS - squareSize := uint64(len(header.DAH.RowRoots)) - // ODS size - odsSize := squareSize / 2 - - blobIndex := uint64(dataBlob.Index()) - // startRow - startRow := blobIndex / squareSize - if odsSize*startRow > blobIndex { - celestiaFailureCounter.Inc(1) - // return an empty batch - return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index()) - } - startIndexOds := blobIndex - odsSize*startRow - blobPointer := types.BlobPointer{ - BlockHeight: height, - Start: startIndexOds, - SharesLength: sharesLength, - TxCommitment: txCommitment, - DataRoot: dataRoot, - } - log.Info("Posted blob to height and dataRoot", "height", blobPointer.BlockHeight, "dataRoot", hex.EncodeToString(blobPointer.DataRoot[:])) - - blobPointerData, err := blobPointer.MarshalBinary() - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("BlobPointer MashalBinary error", "err", err) - return nil, err - } - - buf := new(bytes.Buffer) - err = binary.Write(buf, binary.BigEndian, CelestiaMessageHeaderFlag) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("batch type byte serialization failed", "err", err) - return nil, err - } - - err = binary.Write(buf, binary.BigEndian, blobPointerData) - if err != nil { - celestiaFailureCounter.Inc(1) - log.Warn("blob pointer data serialization failed", "err", err) - return nil, err - } - - serializedBlobPointerData := buf.Bytes() - - celestiaSuccessCounter.Inc(1) - celestiaDALastSuccesfulActionGauge.Update(time.Now().Unix()) - - return serializedBlobPointerData, nil -} - -func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ([]byte, *types.SquareData, error) { - // Wait until our client is synced - err := c.Client.Header.SyncWait(ctx) - if err != nil { - return nil, nil, err - } - - header, err := c.Client.Header.GetByHeight(ctx, blobPointer.BlockHeight) - if err != nil { - return nil, nil, err - } - - headerDataHash := [32]byte{} - copy(headerDataHash[:], header.DataHash) - if headerDataHash != blobPointer.DataRoot { - log.Error("Data Root mismatch", " header.DataHash", header.DataHash, "blobPointer.DataRoot", hex.EncodeToString(blobPointer.DataRoot[:])) - return []byte{}, nil, nil - } - - proofs, err := c.Client.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) - if err != nil { - log.Error("Error retrieving proof", "err", err) - return []byte{}, nil, nil - } - - sharesLength := uint64(0) - for _, proof := range *proofs { - sharesLength += uint64(proof.End()) - uint64(proof.Start()) - } - - if sharesLength != blobPointer.SharesLength { - log.Error("Share length mismatch", "sharesLength", sharesLength, "blobPointer.SharesLength", blobPointer.SharesLength) - return []byte{}, nil, nil - } - - blob, err := c.Client.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) - if err != nil { - // return an empty batch of data because we could not find the blob from the sequencer message - log.Error("Failed to get blob", "height", blobPointer.BlockHeight, "commitment", hex.EncodeToString(blobPointer.TxCommitment[:])) - return []byte{}, nil, nil - } - - eds, err := c.Client.Share.GetEDS(ctx, header) - if err != nil { - log.Error("Failed to get EDS", "height", blobPointer.BlockHeight) - return []byte{}, nil, nil - } - - squareSize := uint64(eds.Width()) - odsSize := squareSize / 2 - - startRow := blobPointer.Start / odsSize - - if blobPointer.Start >= odsSize*odsSize { - log.Error("startIndexOds >= odsSize*odsSize", "startIndexOds", blobPointer.Start, "odsSize*odsSize", odsSize*odsSize) - return []byte{}, nil, nil - } - - if blobPointer.Start+blobPointer.SharesLength < 1 { - log.Error("startIndexOds+blobPointer.SharesLength < 1", "startIndexOds+blobPointer.SharesLength", blobPointer.Start+blobPointer.SharesLength) - return []byte{}, nil, nil - } - - endIndexOds := blobPointer.Start + blobPointer.SharesLength - 1 - if endIndexOds >= odsSize*odsSize { - log.Error("endIndexOds >= odsSize*odsSize", "endIndexOds", endIndexOds, "odsSize*odsSize", odsSize*odsSize) - return []byte{}, nil, nil - } - - endRow := endIndexOds / odsSize - - if endRow >= odsSize || startRow >= odsSize { - log.Error("endRow >= odsSize || startRow >= odsSize", "endRow", endRow, "startRow", startRow, "odsSize", odsSize) - return []byte{}, nil, nil - } - - startColumn := blobPointer.Start % odsSize - endColumn := endIndexOds % odsSize - - if startRow == endRow && startColumn > endColumn { - log.Error("start and end row are the same and startColumn >= endColumn", "startColumn", startColumn, "endColumn+1 ", endColumn+1) - return []byte{}, nil, nil - } - - rows := [][][]byte{} - for i := startRow; i <= endRow; i++ { - rows = append(rows, eds.Row(uint(i))) - } - - squareData := types.SquareData{ - RowRoots: header.DAH.RowRoots, - ColumnRoots: header.DAH.ColumnRoots, - Rows: rows, - SquareSize: squareSize, - StartRow: startRow, - EndRow: endRow, - } - - return blob.Data, &squareData, nil -} - -func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { - if c.Prover == nil { - celestiaValidationFailureCounter.Inc(1) - return nil, fmt.Errorf("no celestia prover config found") - } - - fmt.Printf("Inbox Message: %v\n", msg) - buf := bytes.NewBuffer(msg) - // msgLength := uint32(len(msg) + 1) - blobPointer := types.BlobPointer{} - blobBytes := buf.Bytes() - err := blobPointer.UnmarshalBinary(blobBytes) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - log.Error("Couldn't unmarshal Celestia blob pointer", "err", err) - return nil, nil - } - - // Get data root from a celestia node - header, err := c.Client.Header.GetByHeight(ctx, blobPointer.BlockHeight) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - log.Warn("Header retrieval error", "err", err) - return nil, err - } - - latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background()) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - return nil, err - } - - // check the latest celestia block on the Blobstream contract - latestCelestiaBlock, err := c.Prover.BlobstreamX.LatestBlock(&bind.CallOpts{ - Pending: false, - BlockNumber: big.NewInt(int64(latestBlockNumber)), - Context: ctx, - }) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - return nil, err - } - - fmt.Printf("Blob Pointer Height: %v\n", blobPointer.BlockHeight) - fmt.Printf("Latest Blobstream Height: %v\n", latestCelestiaBlock) - - var backwards bool - if blobPointer.BlockHeight < latestCelestiaBlock { - backwards = true - } else { - backwards = false - } - - var event *blobstreamx.BlobstreamXDataCommitmentStored - - event, err = c.filter(ctx, latestBlockNumber, blobPointer.BlockHeight, backwards) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - return nil, err - } - - // get the block data root inclusion proof to the data root tuple root - dataRootProof, err := c.Prover.Trpc.DataRootInclusionProof(ctx, blobPointer.BlockHeight, event.StartBlock, event.EndBlock) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - return nil, err - } - - // verify that the data root was committed to by the BlobstreamX contract - sideNodes := make([][32]byte, len(dataRootProof.Proof.Aunts)) - for i, aunt := range dataRootProof.Proof.Aunts { - sideNodes[i] = *(*[32]byte)(aunt) - } - - tuple := blobstreamx.DataRootTuple{ - Height: big.NewInt(int64(blobPointer.BlockHeight)), - DataRoot: [32]byte(header.DataHash), - } - - proof := blobstreamx.BinaryMerkleProof{ - SideNodes: sideNodes, - Key: big.NewInt(dataRootProof.Proof.Index), - NumLeaves: big.NewInt(dataRootProof.Proof.Total), - } - - valid, err := c.Prover.BlobstreamX.VerifyAttestation( - &bind.CallOpts{}, - event.ProofNonce, - tuple, - proof, - ) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - return nil, err - } - - log.Info("Verified Celestia Attestation", "height", blobPointer.BlockHeight, "valid", valid) - - if valid { - sharesProof, err := c.Prover.Trpc.ProveShares(ctx, blobPointer.BlockHeight, blobPointer.Start, blobPointer.Start+blobPointer.SharesLength) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - log.Error("Unable to get ShareProof", "err", err) - return nil, err - } - - namespaceNode := toNamespaceNode(sharesProof.RowProof.RowRoots[0]) - rowProof := toRowProofs((sharesProof.RowProof.Proofs[0])) - attestationProof := toAttestationProof(event.ProofNonce.Uint64(), blobPointer.BlockHeight, blobPointer.DataRoot, dataRootProof) - - celestiaVerifierAbi, err := celestiagen.CelestiaBatchVerifierMetaData.GetAbi() - if err != nil { - celestiaValidationFailureCounter.Inc(1) - log.Error("Could not get ABI for Celestia Batch Verifier", "err", err) - return nil, err - } - - verifyProofABI := celestiaVerifierAbi.Methods["verifyProof"] - - proofData, err := verifyProofABI.Inputs.Pack( - common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), namespaceNode, rowProof, attestationProof, - ) - if err != nil { - celestiaValidationFailureCounter.Inc(1) - log.Error("Could not pack structs into ABI", "err", err) - return nil, err - } - - celestiaValidationSuccessCounter.Inc(1) - celestiaValidationLastSuccesfulActionGauge.Update(time.Now().Unix()) - return proofData, nil - } - - celestiaValidationFailureCounter.Inc(1) - return nil, err -} - -func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHeight uint64, backwards bool) (*blobstreamx.BlobstreamXDataCommitmentStored, error) { - // Geth has a default of 5000 block limit for filters - start := uint64(0) - if latestBlock > 5000 { - start = latestBlock - 5000 - } - end := latestBlock - - for attempt := 0; attempt < 11; attempt++ { - eventsIterator, err := c.Prover.BlobstreamX.FilterDataCommitmentStored( - &bind.FilterOpts{ - Context: ctx, - Start: start, - End: &end, - }, - nil, - nil, - nil, - ) - if err != nil { - log.Error("Error creating event iterator", "err", err) - return nil, err - } - - var event *blobstreamx.BlobstreamXDataCommitmentStored - for eventsIterator.Next() { - e := eventsIterator.Event - if e.StartBlock <= celestiaHeight && celestiaHeight < e.EndBlock { - event = &blobstreamx.BlobstreamXDataCommitmentStored{ - ProofNonce: e.ProofNonce, - StartBlock: e.StartBlock, - EndBlock: e.EndBlock, - DataCommitment: e.DataCommitment, - } - break - } - } - if err := eventsIterator.Error(); err != nil { - return nil, err - } - err = eventsIterator.Close() - if err != nil { - return nil, err - } - if event != nil { - log.Info("Found Data Root submission event", "proof_nonce", event.ProofNonce, "start", event.StartBlock, "end", event.EndBlock) - return event, nil - } - - if backwards { - if start >= 5000 { - start -= 5000 - } else { - start = 0 - } - if end < 5000 { - end = start + 1000 - } else { - end -= 5000 - } - } else { - time.Sleep(time.Second * 3600) - latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background()) - if err != nil { - return nil, err - } - - start = end - end = latestBlockNumber - } - } - - return nil, fmt.Errorf("unable to find Data Commitment Stored event in Blobstream") -} diff --git a/das/celestia/celestiaDasRpcClient.go b/das/celestia/celestiaDasRpcClient.go new file mode 100644 index 0000000000..238e67ebbc --- /dev/null +++ b/das/celestia/celestiaDasRpcClient.go @@ -0,0 +1,92 @@ +package celestia + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" + "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/rpc" + celestiaTypes "github.com/offchainlabs/nitro/das/celestia/types" + "github.com/offchainlabs/nitro/util/pretty" +) + +type CelestiaConfig struct { + Enable bool `koanf:"enable"` + URL string `koanf:"url"` +} + +type CelestiaDASClient struct { + clnt *rpc.Client + url string +} + +func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", false, "Enable Celestia DA") + f.String(prefix+".url", "http://localhost:9876", "address to use against Celestia DA RPC service") +} + +func NewCelestiaDASRPCClient(target string) (*CelestiaDASClient, error) { + clnt, err := rpc.Dial(target) + if err != nil { + return nil, err + } + return &CelestiaDASClient{ + clnt: clnt, + url: target, + }, nil +} + +func (c *CelestiaDASClient) Store(ctx context.Context, message []byte) ([]byte, error) { + log.Trace("celestia.CelestiaDASClient.Store(...)", "message", pretty.FirstFewBytes(message)) + ret := []byte{} + if err := c.clnt.CallContext(ctx, &ret, "celestia_store", hexutil.Bytes(message)); err != nil { + return nil, err + } + log.Info("Got result from Celestia DAS", "result", ret) + return ret, nil +} + +func (c *CelestiaDASClient) String() string { + return fmt.Sprintf("CelestiaDASClient{url:%s}", c.url) +} + +type ReadResult struct { + Message []byte `json:"message"` + RowRoots [][]byte `json:"row_roots"` + ColumnRoots [][]byte `json:"column_roots"` + Rows [][][]byte `json:"rows"` + SquareSize uint64 `json:"square_size"` // Refers to original data square size + StartRow uint64 `json:"start_row"` + EndRow uint64 `json:"end_row"` +} + +func (c *CelestiaDASClient) Read(ctx context.Context, blobPointer *celestiaTypes.BlobPointer) ([]byte, *celestiaTypes.SquareData, error) { + log.Trace("celestia.CelestiaDASClient.Read(...)", "blobPointer", blobPointer) + var ret ReadResult + if err := c.clnt.CallContext(ctx, &ret, "celestia_read", blobPointer); err != nil { + return nil, nil, err + } + + squareData := celestiaTypes.SquareData{ + RowRoots: ret.RowRoots, + ColumnRoots: ret.ColumnRoots, + Rows: ret.Rows, + SquareSize: ret.SquareSize, + StartRow: ret.StartRow, + EndRow: ret.EndRow, + } + + return ret.Message, &squareData, nil +} + +func (c *CelestiaDASClient) GetProof(ctx context.Context, msg []byte) ([]byte, error) { + res := []byte{} + err := c.clnt.CallContext(ctx, &res, "celestia_getProof", msg) + if err != nil { + return nil, err + } + return res, nil +} diff --git a/das/celestia/proof.go b/das/celestia/proof.go deleted file mode 100644 index 144c73e21c..0000000000 --- a/das/celestia/proof.go +++ /dev/null @@ -1,110 +0,0 @@ -package celestia - -import ( - "math/big" - - "github.com/tendermint/tendermint/crypto/merkle" - coretypes "github.com/tendermint/tendermint/rpc/core/types" -) - -type Namespace struct { - Version [1]byte - Id [28]byte -} - -type NamespaceNode struct { - Min Namespace - Max Namespace - Digest [32]byte -} - -type BinaryMerkleProof struct { - SideNodes [][32]byte - Key *big.Int - NumLeaves *big.Int -} - -type DataRootTuple struct { - Height *big.Int - DataRoot [32]byte -} - -type AttestationProof struct { - TupleRootNonce *big.Int - Tuple DataRootTuple - Proof BinaryMerkleProof -} - -func minNamespace(innerNode []byte) Namespace { - version := innerNode[0] - var id [28]byte - copy(id[:], innerNode[1:29]) - return Namespace{ - Version: [1]byte{version}, - Id: id, - } -} - -func maxNamespace(innerNode []byte) Namespace { - version := innerNode[29] - var id [28]byte - copy(id[:], innerNode[30:58]) - return Namespace{ - Version: [1]byte{version}, - Id: id, - } -} - -func toNamespaceNode(node []byte) NamespaceNode { - minNs := minNamespace(node) - maxNs := maxNamespace(node) - var digest [32]byte - copy(digest[:], node[58:]) - return NamespaceNode{ - Min: minNs, - Max: maxNs, - Digest: digest, - } -} - -func toRowProofs(proof *merkle.Proof) BinaryMerkleProof { - sideNodes := make([][32]byte, len(proof.Aunts)) - for j, sideNode := range proof.Aunts { - var bzSideNode [32]byte - copy(bzSideNode[:], sideNode) - sideNodes[j] = bzSideNode - } - rowProof := BinaryMerkleProof{ - SideNodes: sideNodes, - Key: big.NewInt(proof.Index), - NumLeaves: big.NewInt(proof.Total), - } - return rowProof -} - -func toAttestationProof( - nonce uint64, - height uint64, - blockDataRoot [32]byte, - dataRootInclusionProof *coretypes.ResultDataRootInclusionProof, -) AttestationProof { - sideNodes := make([][32]byte, len(dataRootInclusionProof.Proof.Aunts)) - for i, sideNode := range dataRootInclusionProof.Proof.Aunts { - var bzSideNode [32]byte - copy(bzSideNode[:], sideNode) - sideNodes[i] = bzSideNode - } - - return AttestationProof{ - TupleRootNonce: big.NewInt(int64(nonce)), - Tuple: DataRootTuple{ - Height: big.NewInt(int64(height)), - DataRoot: blockDataRoot, - }, - Proof: BinaryMerkleProof{ - SideNodes: sideNodes, - Key: big.NewInt(dataRootInclusionProof.Proof.Index), - NumLeaves: big.NewInt(dataRootInclusionProof.Proof.Total), - }, - } -} diff --git a/das/celestia/types/da_interface.go b/das/celestia/types/da_interface.go index 7d2c395523..e6483d740a 100644 --- a/das/celestia/types/da_interface.go +++ b/das/celestia/types/da_interface.go @@ -8,6 +8,15 @@ type CelestiaWriter interface { Store(context.Context, []byte) ([]byte, error) } +type SquareData struct { + RowRoots [][]byte `json:"row_roots"` + ColumnRoots [][]byte `json:"column_roots"` + Rows [][][]byte `json:"rows"` + SquareSize uint64 `json:"square_size"` // Refers to original data square size + StartRow uint64 `json:"start_row"` + EndRow uint64 `json:"end_row"` +} + type CelestiaReader interface { Read(context.Context, *BlobPointer) ([]byte, *SquareData, error) GetProof(ctx context.Context, msg []byte) ([]byte, error) diff --git a/das/celestia/types/square_data.go b/das/celestia/types/square_data.go deleted file mode 100644 index fc8dcdc886..0000000000 --- a/das/celestia/types/square_data.go +++ /dev/null @@ -1,10 +0,0 @@ -package types - -type SquareData struct { - RowRoots [][]byte - ColumnRoots [][]byte - Rows [][][]byte - SquareSize uint64 // Refers to original data square size - StartRow uint64 - EndRow uint64 -} diff --git a/system_tests/full_celestia_challenge_test.go b/system_tests/full_celestia_challenge_test.backup_go similarity index 98% rename from system_tests/full_celestia_challenge_test.go rename to system_tests/full_celestia_challenge_test.backup_go index 35f3bf3682..3ff9d62793 100644 --- a/system_tests/full_celestia_challenge_test.go +++ b/system_tests/full_celestia_challenge_test.backup_go @@ -95,7 +95,7 @@ func writeTxToCelestiaBatch(writer io.Writer, tx *types.Transaction) error { return err } -func makeCelestiaBatch(t *testing.T, l2Node *arbnode.Node, celestiaDA *celestia.CelestiaDA, undecided bool, counterfactual bool, mockStream *mocksgen.Mockstream, deployer *bind.TransactOpts, l2Info *BlockchainTestInfo, backend *ethclient.Client, sequencer *bind.TransactOpts, seqInbox *mocksgen.SequencerInboxStub, seqInboxAddr common.Address, modStep int64) { +func makeCelestiaBatch(t *testing.T, l2Node *arbnode.Node, celestiaDA *celestia.CelestiaDASClient, undecided bool, counterfactual bool, mockStream *mocksgen.Mockstream, deployer *bind.TransactOpts, l2Info *BlockchainTestInfo, backend *ethclient.Client, sequencer *bind.TransactOpts, seqInbox *mocksgen.SequencerInboxStub, seqInboxAddr common.Address, modStep int64) { ctx := context.Background() batchBuffer := bytes.NewBuffer([]byte{})