-
Notifications
You must be signed in to change notification settings - Fork 619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(relayer): adapt to CodecV7 for EuclidV2 #1583
base: omerfirmak/mpt
Are you sure you want to change the base?
Changes from 4 commits
41606fe
0c0c417
78c9963
940fde0
b460d4a
182f8e3
783b965
5a479c3
310abdd
2efbbd7
e713424
f4e17bc
99c0a9f
8db5339
69a80d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -373,11 +373,178 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() { | |
// ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1. | ||
func (r *Layer2Relayer) ProcessPendingBatches() { | ||
// get pending batches from database in ascending order by their index. | ||
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, 5) | ||
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, max(5, r.cfg.SenderConfig.BatchSubmission.MaxBatches)) | ||
if err != nil { | ||
log.Error("Failed to fetch pending L2 batches", "err", err) | ||
return | ||
} | ||
|
||
var batchesToSubmit []*dbBatchWithChunksAndParent | ||
var forceSubmit bool | ||
for i, dbBatch := range dbBatches { | ||
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV6 { | ||
// if the first batch is not >= V6 then we need to submit batches one by one | ||
colinlyguo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
r.processPendingBatchesV4(dbBatches) | ||
return | ||
} | ||
|
||
batchesToSubmitLen := len(batchesToSubmit) | ||
var dbChunks []*orm.Chunk | ||
var dbParentBatch *orm.Batch | ||
|
||
// Verify batches compatibility | ||
{ | ||
dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex) | ||
if err != nil { | ||
log.Error("failed to get chunks in range", "err", err) | ||
return | ||
} | ||
|
||
// check codec version | ||
for _, dbChunk := range dbChunks { | ||
if dbBatch.CodecVersion != dbChunk.CodecVersion { | ||
log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion) | ||
return | ||
} | ||
} | ||
|
||
if dbBatch.Index == 0 { | ||
log.Error("invalid args: batch index is 0, should only happen in committing genesis batch") | ||
return | ||
} | ||
|
||
dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of these batches are already in memory ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed to use |
||
if err != nil { | ||
log.Error("failed to get parent batch header", "err", err) | ||
return | ||
} | ||
|
||
if dbParentBatch.CodecVersion > dbBatch.CodecVersion { | ||
log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion) | ||
return | ||
} | ||
|
||
// make sure we commit batches of the same codec version together. | ||
// If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far. | ||
// The next call of ProcessPendingBatches will then start with the batch with the different codec version. | ||
if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion { | ||
break | ||
} | ||
} | ||
|
||
// if one of the batches is too old, we force submit all batches that we have so far in the next step | ||
if !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.SenderConfig.BatchSubmission.TimeoutSec)*time.Second { | ||
forceSubmit = true | ||
} | ||
|
||
if batchesToSubmitLen <= r.cfg.SenderConfig.BatchSubmission.MaxBatches { | ||
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{ | ||
Batch: dbBatch, | ||
Chunks: dbChunks, | ||
ParentBatch: dbParentBatch, | ||
}) | ||
} | ||
} | ||
|
||
if !forceSubmit && len(batchesToSubmit) < r.cfg.SenderConfig.BatchSubmission.MinBatches { | ||
log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", r.cfg.SenderConfig.BatchSubmission.MinBatches, "maxBatches", r.cfg.SenderConfig.BatchSubmission.MaxBatches) | ||
return | ||
} | ||
|
||
if forceSubmit { | ||
log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "created at", batchesToSubmit[0].Batch.CreatedAt) | ||
} | ||
|
||
// We have at least 1 batch to commit | ||
firstBatch := batchesToSubmit[0].Batch | ||
lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch | ||
|
||
var calldata []byte | ||
var blobs []*kzg4844.Blob | ||
var maxBlockHeight uint64 | ||
var totalGasUsed uint64 | ||
|
||
codecVersion := encoding.CodecVersion(firstBatch.CodecVersion) | ||
switch codecVersion { | ||
case encoding.CodecV6: | ||
calldata, blobs, maxBlockHeight, totalGasUsed, err = r.constructCommitBatchPayloadCodecV6(batchesToSubmit) | ||
if err != nil { | ||
log.Error("failed to construct commitBatchWithBlobProof payload for V6", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err) | ||
return | ||
} | ||
default: | ||
log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index) | ||
return | ||
} | ||
|
||
txHash, err := r.commitSender.SendTransaction(r.contextIDFromBatches(batchesToSubmit), &r.cfg.RollupContractAddress, calldata, blobs, 0) | ||
if err != nil { | ||
if errors.Is(err, sender.ErrTooManyPendingBlobTxs) { | ||
r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc() | ||
log.Debug( | ||
"Skipped sending commitBatch tx to L1: too many pending blob txs", | ||
"maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs, | ||
"err", err, | ||
) | ||
return | ||
} | ||
log.Error( | ||
"Failed to send commitBatch tx to layer1", | ||
"start index", firstBatch.Index, | ||
"start hash", firstBatch.Hash, | ||
"end index", lastBatch.Index, | ||
"end hash", lastBatch.Hash, | ||
"RollupContractAddress", r.cfg.RollupContractAddress, | ||
"err", err, | ||
"calldata", common.Bytes2Hex(calldata), | ||
) | ||
return | ||
} | ||
|
||
if err = r.db.Transaction(func(dbTX *gorm.DB) error { | ||
for _, dbBatch := range batchesToSubmit { | ||
if err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Batch.Hash, txHash.String(), types.RollupCommitting, dbTX); err != nil { | ||
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", dbBatch.Batch.Index, dbBatch.Batch.Hash, err) | ||
} | ||
} | ||
|
||
return nil | ||
}); err != nil { | ||
log.Error("failed to update status for batches to RollupCommitting", "err", err) | ||
} | ||
|
||
r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight)) | ||
r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed)) | ||
r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Add(float64(len(batchesToSubmit))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might also want to monitor the time series of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in 5a479c3 |
||
|
||
log.Info("Sent the commitBatches tx to layer1", "batches count", len(batchesToSubmit), "start index", firstBatch.Index, "start hash", firstBatch.Hash, "end index", lastBatch.Index, "end hash", lastBatch.Hash, "tx hash", txHash.String()) | ||
} | ||
|
||
func (r *Layer2Relayer) contextIDFromBatches(batches []*dbBatchWithChunksAndParent) string { | ||
contextIDs := []string{"v6"} | ||
|
||
for _, batch := range batches { | ||
contextIDs = append(contextIDs, batch.Batch.Hash) | ||
} | ||
|
||
return strings.Join(contextIDs, "-") | ||
} | ||
|
||
func (r *Layer2Relayer) batchHashesFromContextID(contextID string) []string { | ||
if strings.HasPrefix(contextID, "v6-") { | ||
return strings.Split(contextID, "-")[1:] | ||
} | ||
|
||
return []string{contextID} | ||
} | ||
|
||
type dbBatchWithChunksAndParent struct { | ||
Batch *orm.Batch | ||
Chunks []*orm.Chunk | ||
ParentBatch *orm.Batch | ||
} | ||
|
||
func (r *Layer2Relayer) processPendingBatchesV4(dbBatches []*orm.Batch) { | ||
for _, dbBatch := range dbBatches { | ||
r.metrics.rollupL2RelayerProcessPendingBatchTotal.Inc() | ||
|
||
|
@@ -432,7 +599,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() { | |
return | ||
} | ||
default: | ||
log.Error("unsupported codec version", "codecVersion", codecVersion) | ||
log.Error("unsupported codec version in processPendingBatchesV4", "codecVersion", codecVersion) | ||
return | ||
} | ||
|
||
|
@@ -444,7 +611,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() { | |
log.Warn("Batch commit previously failed, using eth_estimateGas for the re-submission", "hash", dbBatch.Hash) | ||
} | ||
|
||
txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, blob, fallbackGasLimit) | ||
txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, []*kzg4844.Blob{blob}, fallbackGasLimit) | ||
if err != nil { | ||
if errors.Is(err, sender.ErrTooManyPendingBlobTxs) { | ||
r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc() | ||
|
@@ -730,9 +897,17 @@ func (r *Layer2Relayer) handleConfirmation(cfm *sender.Confirmation) { | |
log.Warn("CommitBatchTxType transaction confirmed but failed in layer1", "confirmation", cfm) | ||
} | ||
|
||
err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, cfm.ContextID, cfm.TxHash.String(), status) | ||
if err != nil { | ||
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "confirmation", cfm, "err", err) | ||
batchHashes := r.batchHashesFromContextID(cfm.ContextID) | ||
if err := r.db.Transaction(func(dbTX *gorm.DB) error { | ||
for _, batchHash := range batchHashes { | ||
if err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, cfm.TxHash.String(), status, dbTX); err != nil { | ||
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed, batchHash: %s, txHash: %s, status: %s, err: %w", batchHash, cfm.TxHash.String(), status, err) | ||
} | ||
} | ||
|
||
return nil | ||
}); err != nil { | ||
log.Warn("failed to update confirmation status for batches", "confirmation", cfm, "err", err) | ||
} | ||
case types.SenderTypeFinalizeBatch: | ||
if strings.HasPrefix(cfm.ContextID, "finalizeBundle-") { | ||
|
@@ -868,6 +1043,71 @@ func (r *Layer2Relayer) constructCommitBatchPayloadCodecV4(dbBatch *orm.Batch, d | |
return calldata, daBatch.Blob(), nil | ||
} | ||
|
||
func (r *Layer2Relayer) constructCommitBatchPayloadCodecV6(batchesToSubmit []*dbBatchWithChunksAndParent) ([]byte, []*kzg4844.Blob, uint64, uint64, error) { | ||
var maxBlockHeight uint64 | ||
var totalGasUsed uint64 | ||
blobs := make([]*kzg4844.Blob, len(batchesToSubmit)) | ||
|
||
version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion) | ||
var firstParentBatch *orm.Batch | ||
// construct blobs | ||
for _, b := range batchesToSubmit { | ||
// double check that all batches have the same version | ||
batchVersion := encoding.CodecVersion(b.Batch.CodecVersion) | ||
if batchVersion != version { | ||
return nil, nil, 0, 0, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index) | ||
} | ||
|
||
if firstParentBatch == nil { | ||
firstParentBatch = b.ParentBatch | ||
} | ||
|
||
chunks := make([]*encoding.Chunk, len(b.Chunks)) | ||
for i, c := range b.Chunks { | ||
blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber) | ||
if err != nil { | ||
return nil, nil, 0, 0, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err) | ||
} | ||
chunks[i] = &encoding.Chunk{Blocks: blocks} | ||
|
||
if c.EndBlockNumber > maxBlockHeight { | ||
maxBlockHeight = c.EndBlockNumber | ||
} | ||
totalGasUsed += c.TotalL2TxGas | ||
} | ||
|
||
encodingBatch := &encoding.Batch{ | ||
Index: b.Batch.Index, | ||
TotalL1MessagePoppedBefore: b.Chunks[0].TotalL1MessagesPoppedBefore, | ||
ParentBatchHash: common.HexToHash(b.ParentBatch.Hash), | ||
Chunks: chunks, | ||
} | ||
|
||
codec, err := encoding.CodecFromVersion(version) | ||
if err != nil { | ||
return nil, nil, 0, 0, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err) | ||
} | ||
|
||
daBatch, err := codec.NewDABatch(encodingBatch) | ||
if err != nil { | ||
return nil, nil, 0, 0, fmt.Errorf("failed to create DA batch: %w", err) | ||
} | ||
|
||
blobs = append(blobs, daBatch.Blob()) | ||
} | ||
|
||
if firstParentBatch == nil { | ||
return nil, nil, 0, 0, fmt.Errorf("firstParentBatch is nil") | ||
} | ||
|
||
// TODO: this needs to be updated once the contract interface is finalized | ||
calldata, err := r.l1RollupABI.Pack("commitBatches", version, firstParentBatch.BatchHeader) | ||
if err != nil { | ||
return nil, nil, 0, 0, fmt.Errorf("failed to pack commitBatchWithBlobProof: %w", err) | ||
jonastheis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return calldata, blobs, maxBlockHeight, totalGasUsed, nil | ||
} | ||
|
||
func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV4(dbBatch *orm.Batch, aggProof *message.BundleProof) ([]byte, error) { | ||
if aggProof != nil { // finalizeBundle with proof. | ||
calldata, packErr := r.l1RollupABI.Pack( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove 5, and use config value only?