Skip to content

Commit

Permalink
feat(nitro): celestia, reorg on read failure only if explicitely set …
Browse files Browse the repository at this point in the history
…via dangerous-reaorg-on-read-failure flag
  • Loading branch information
emilianobonassi committed Jul 24, 2024
1 parent 055e71a commit c5b9429
Showing 1 changed file with 33 additions and 30 deletions.
63 changes: 33 additions & 30 deletions das/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ import (
)

type DAConfig struct {
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
}

type ValidatorConfig struct {
Expand Down Expand Up @@ -110,6 +111,7 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".validator-config"+".tendermint-rpc", "", "Tendermint RPC endpoint, only used for validation")
f.String(prefix+".validator-config"+".eth-rpc", "", "L1 Websocket connection, only used for validation")
f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation")
f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node")
}

func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) {
Expand Down Expand Up @@ -380,14 +382,12 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
headerDataHash := [32]byte{}
copy(headerDataHash[:], header.DataHash)
if headerDataHash != blobPointer.DataRoot {
log.Error("Data Root mismatch", " header.DataHash", header.DataHash, "blobPointer.DataRoot", hex.EncodeToString(blobPointer.DataRoot[:]))
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Data Root mismatch, header.DataHash=%v, blobPointer.DataRoot=%v", header.DataHash, hex.EncodeToString(blobPointer.DataRoot[:])))
}

proofs, err := celestiaClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
log.Error("Error retrieving proof", "err", err)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Error retrieving proof, err=%v", err))
}

sharesLength := uint64(0)
Expand All @@ -396,21 +396,19 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
}

if sharesLength != blobPointer.SharesLength {
log.Error("Share length mismatch", "sharesLength", sharesLength, "blobPointer.SharesLength", blobPointer.SharesLength)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Share length mismatch, sharesLength=%v, blobPointer.SharesLength=%v", sharesLength, blobPointer.SharesLength))
}

blob, err := celestiaClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
// return an empty batch of data because we could not find the blob from the sequencer message
log.Error("Failed to get blob", "height", blobPointer.BlockHeight, "commitment", hex.EncodeToString(blobPointer.TxCommitment[:]), "err", err)
return []byte{}, nil, nil
// we eventually manually reorg, setting ReorgOnReadFailure=true
return c.returnErrorHelper(fmt.Errorf("Failed to get blob, height=%v, commitment=%v, err=%v", blobPointer.BlockHeight, hex.EncodeToString(blobPointer.TxCommitment[:]), err))
}

eds, err := celestiaClient.Share.GetEDS(ctx, header)
if err != nil {
log.Error("Failed to get EDS", "height", blobPointer.BlockHeight, "err", err)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Failed to get EDS, height=%v, err=%v", blobPointer.BlockHeight, err))
}

squareSize := uint64(eds.Width())
Expand All @@ -419,34 +417,29 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
startRow := blobPointer.Start / odsSize

if blobPointer.Start >= odsSize*odsSize {
log.Error("startIndexOds >= odsSize*odsSize", "startIndexOds", blobPointer.Start, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds >= odsSize*odsSize, startIndexOds=%v, odsSize*odsSize=%v", blobPointer.Start, odsSize*odsSize))
}

if blobPointer.Start+blobPointer.SharesLength < 1 {
log.Error("startIndexOds+blobPointer.SharesLength < 1", "startIndexOds+blobPointer.SharesLength", blobPointer.Start+blobPointer.SharesLength)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds+blobPointer.SharesLength < 1, startIndexOds+blobPointer.SharesLength=%v", blobPointer.Start+blobPointer.SharesLength))
}

endIndexOds := blobPointer.Start + blobPointer.SharesLength - 1
if endIndexOds >= odsSize*odsSize {
log.Error("endIndexOds >= odsSize*odsSize", "endIndexOds", endIndexOds, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endIndexOds >= odsSize*odsSize, endIndexOds=%v, odsSize*odsSize=%v", endIndexOds, odsSize*odsSize))
}

endRow := endIndexOds / odsSize

if endRow >= odsSize || startRow >= odsSize {
log.Error("endRow >= odsSize || startRow >= odsSize", "endRow", endRow, "startRow", startRow, "odsSize", odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endRow >= odsSize || startRow >= odsSize, endRow=%v, startRow=%v, odsSize=%v", endRow, startRow, odsSize))
}

startColumn := blobPointer.Start % odsSize
endColumn := endIndexOds % odsSize

if startRow == endRow && startColumn > endColumn {
log.Error("start and end row are the same and startColumn >= endColumn", "startColumn", startColumn, "endColumn+1 ", endColumn+1)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("start and end row are the same and startColumn >= endColumn, startColumn=%v, endColumn+1=%v", startColumn, endColumn+1))
}

rows := [][][]byte{}
Expand Down Expand Up @@ -689,3 +682,13 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei

return nil, fmt.Errorf("unable to find Data Commitment Stored event in Blobstream")
}

func (c *CelestiaDA) returnErrorHelper(err error) ([]byte, *types.SquareData, error) {
log.Error(err.Error())

if c.Cfg.ReorgOnReadFailure {
return []byte{}, nil, nil
}

return nil, nil, err
}

0 comments on commit c5b9429

Please sign in to comment.