Skip to content

Commit

Permalink
add CommitBatchDAV7 and handle multiple commit events submitted in a …
Browse files Browse the repository at this point in the history
…single transactions
  • Loading branch information
jonastheis committed Jan 22, 2025
1 parent fb4fe7c commit 9bf2f25
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1
github.com/rs/cors v1.7.0
github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5
github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634
github.com/scroll-tech/zktrie v0.8.4
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sourcegraph/conc v0.3.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5 h1:vZ75srkZCStjDWq/kqZGLoucf7Y7qXC13nKjQVZ0zp8=
github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/da-codec v0.1.3-0.20250121050419-8c2a5ccc1b2e h1:Sp1RjVsK9PLVW5zMlwMUNegsDpYmVN8noT/C4Bjro0U=
github.com/scroll-tech/da-codec v0.1.3-0.20250121050419-8c2a5ccc1b2e/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/da-codec v0.1.3-0.20250122003441-91171709155b h1:DWiVtzXK/3lXK3+/aaAeorurjj88ITO17hhLECIS/0g=
github.com/scroll-tech/da-codec v0.1.3-0.20250122003441-91171709155b/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634 h1:YtD7XjP1F7GzL9nxj1lq88m1/bwSroVSGVR050i05yY=
github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
Expand Down
97 changes: 61 additions & 36 deletions rollup/da_syncer/da/calldata_blob_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,30 @@ func (ds *CalldataBlobSource) L1Finalized() uint64 {

func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEvents) (Entries, error) {
var entries Entries
var entry Entry
var err error

var emptyHash common.Hash
// we keep track of the last commit transaction hash, so we can process all events created in the same tx together.
// if we have a different commit transaction, we need to create a new commit batch DA.
var lastCommitTransactionHash common.Hash
// we keep track of the commit events created in the same tx, so we can process them together.
var lastCommitEvents []*l1.CommitBatchEvent

// getAndAppendCommitBatchDA is a helper function that gets the commit batch DA for the last commit events and appends it to the entries list.
// It also resets the last commit events and last commit transaction hash.
getAndAppendCommitBatchDA := func() error {
commitBatchDAEntries, err := ds.getCommitBatchDA(lastCommitEvents)
if err != nil {
return fmt.Errorf("failed to get commit batch da: %v, err: %w", lastCommitEvents[0].BatchIndex().Uint64(), err)
}

entries = append(entries, commitBatchDAEntries...)
lastCommitEvents = nil
lastCommitTransactionHash = emptyHash

return nil
}

var entry Entry
var err error
for _, rollupEvent := range rollupEvents {
switch rollupEvent.Type() {
case l1.CommitEventType:
Expand All @@ -113,13 +131,9 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven

// if this is a different commit transaction, we need to create a new DA
if lastCommitTransactionHash != commitEvent.TxHash() {
entry, err = ds.getCommitBatchDA(lastCommitEvents)
if err != nil {
return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", rollupEvent.BatchIndex().Uint64(), err)
if err = getAndAppendCommitBatchDA(); err != nil {
return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err)
}
entries = append(entries, entry)
lastCommitEvents = nil
lastCommitTransactionHash = emptyHash
}

// add commit event to the list of previous commit events, so we can process events created in the same tx together
Expand All @@ -128,13 +142,9 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven
case l1.RevertEventType:
// if we have any previous commit events, we need to create a new DA before processing the revert event
if len(lastCommitEvents) > 0 {
entry, err = ds.getCommitBatchDA(lastCommitEvents)
if err != nil {
return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", rollupEvent.BatchIndex().Uint64(), err)
if err = getAndAppendCommitBatchDA(); err != nil {
return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err)
}
entries = append(entries, entry)
lastCommitEvents = nil
lastCommitTransactionHash = emptyHash
}

revertEvent, ok := rollupEvent.(*l1.RevertBatchEvent)
Expand All @@ -148,13 +158,9 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven
case l1.FinalizeEventType:
// if we have any previous commit events, we need to create a new DA before processing the finalized event
if len(lastCommitEvents) > 0 {
entry, err = ds.getCommitBatchDA(lastCommitEvents)
if err != nil {
return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", rollupEvent.BatchIndex().Uint64(), err)
if err = getAndAppendCommitBatchDA(); err != nil {
return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err)
}
entries = append(entries, entry)
lastCommitEvents = nil
lastCommitTransactionHash = emptyHash
}

finalizeEvent, ok := rollupEvent.(*l1.FinalizeBatchEvent)
Expand All @@ -172,11 +178,9 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven

// if we have any previous commit events, we need to process them before returning
if len(lastCommitEvents) > 0 {
entry, err = ds.getCommitBatchDA(lastCommitEvents)
if err != nil {
return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", lastCommitEvents[0].BatchIndex().Uint64(), err)
if err = getAndAppendCommitBatchDA(); err != nil {
return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err)
}
entries = append(entries, entry)
}

return entries, nil
Expand All @@ -197,6 +201,11 @@ func (ds *CalldataBlobSource) getCommitBatchDA(commitEvents []*l1.CommitBatchEve
return nil, fmt.Errorf("failed to fetch commit tx data of batch %d, tx hash: %v, err: %w", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.TxHash().Hex(), err)
}

blockHeader, err := ds.l1Reader.FetchBlockHeaderByNumber(firstCommitEvent.BlockNumber())
if err != nil {
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
}

codec, err := encoding.CodecFromVersion(encoding.CodecVersion(args.Version))
if err != nil {
return nil, fmt.Errorf("unsupported codec version: %v, batch index: %v, err: %w", args.Version, firstCommitEvent.BatchIndex().Uint64(), err)
Expand All @@ -205,35 +214,51 @@ func (ds *CalldataBlobSource) getCommitBatchDA(commitEvents []*l1.CommitBatchEve
var entries Entries
var entry Entry
var previousEvent *l1.CommitBatchEvent
for _, commitEvent := range commitEvents {
// sanity check events
for i, commitEvent := range commitEvents {
// sanity check commit events from batches submitted in the same L1 transaction
if commitEvent.TxHash() != firstCommitEvent.TxHash() {
return nil, fmt.Errorf("commit events have different tx hashes, batch index: %d, tx: %s - batch index: %d, tx: %s", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.TxHash().Hex(), commitEvent.BatchIndex().Uint64(), commitEvent.TxHash().Hex())
}
if commitEvent.BlockNumber() != firstCommitEvent.BlockNumber() {
return nil, fmt.Errorf("commit events have different block numbers, batch index: %d, block number: %d - batch index: %d, block number: %d", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.BlockNumber(), commitEvent.BatchIndex().Uint64(), commitEvent.BlockNumber())
}
if commitEvent.BlockHash() != firstCommitEvent.BlockHash() {
return nil, fmt.Errorf("commit events have different block hashes, batch index: %d, hash: %s - batch index: %d, hash: %s", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.BlockHash().Hex(), commitEvent.BatchIndex().Uint64(), commitEvent.BlockHash().Hex())
}
if previousEvent != nil && commitEvent.BatchIndex().Uint64() != previousEvent.BatchIndex().Uint64()+1 {
return nil, fmt.Errorf("commit events are not in sequence, batch index: %d, hash: %s - previous batch index: %d, hash: %s", commitEvent.BatchIndex().Uint64(), commitEvent.BatchHash().Hex(), previousEvent.BatchIndex().Uint64(), previousEvent.BatchHash().Hex())
}
previousEvent = commitEvent

switch codec.Version() {
case 0:
if entry, err = NewCommitBatchDAV0(ds.db, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap); err != nil {
return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err)
}
case 1, 2, 3, 4:
if entry, err = NewCommitBatchDAWithBlob(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, args.BlobHashes); err != nil {
case 1, 2, 3, 4, 5, 6:
if entry, err = NewCommitBatchDAV1(ds.ctx, ds.db, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, args.BlobHashes, blockHeader.Time); err != nil {
return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err)
}
case 7:
if i >= len(args.BlobHashes) {
return nil, fmt.Errorf("not enough blob hashes for commit transaction: %s, index in tx: %d, batch index: %d, hash: %s", firstCommitEvent.TxHash(), i, commitEvent.BatchIndex().Uint64(), commitEvent.BatchHash().Hex())
}
blobHash := args.BlobHashes[i]

var parentBatchHash common.Hash
if previousEvent == nil {
parentBatchHash = common.BytesToHash(args.ParentBatchHeader)
} else {
parentBatchHash = previousEvent.BatchHash()
}

if entry, err = NewCommitBatchDAV7(ds.ctx, ds.db, ds.blobClient, codec, commitEvent, blobHash, parentBatchHash, blockHeader.Time); err != nil {
return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err)
}
case 6:
// TODO: implement codec version 6
// - there shouldn't be any need for args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap
// - get blob hash from args for this commit event
// - sanity check somehow that this is the correct blob hash -> compute batch hash?
return nil, nil
default:
return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version)
}

previousEvent = commitEvent
entries = append(entries, entry)
}

Expand Down
10 changes: 3 additions & 7 deletions rollup/da_syncer/da/commitV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ type CommitBatchDAV1 struct {
versionedHashes []common.Hash
}

func NewCommitBatchDAWithBlob(ctx context.Context, db ethdb.Database,
l1Reader *l1.Reader,
func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database,
blobClient blob_client.BlobClient,
codec encoding.Codec,
commitEvent *l1.CommitBatchEvent,
parentBatchHeader []byte,
chunks [][]byte,
skippedL1MessageBitmap []byte,
versionedHashes []common.Hash,
l1BlockTime uint64,
) (*CommitBatchDAV1, error) {
decodedChunks, err := codec.DecodeDAChunksRawTx(chunks)
if err != nil {
Expand All @@ -42,11 +42,7 @@ func NewCommitBatchDAWithBlob(ctx context.Context, db ethdb.Database,
}
versionedHash := versionedHashes[0]

header, err := l1Reader.FetchBlockHeaderByNumber(commitEvent.BlockNumber())
if err != nil {
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
}
blob, err := blobClient.GetBlobByVersionedHashAndBlockTime(ctx, versionedHash, header.Time)
blob, err := blobClient.GetBlobByVersionedHashAndBlockTime(ctx, versionedHash, l1BlockTime)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err)
}
Expand Down
Loading

0 comments on commit 9bf2f25

Please sign in to comment.