Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NH M1.1 #2

Merged
merged 9 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ FROM golang:1.21 AS build
# INSTALL DEPENDENCIES
RUN go install github.com/gobuffalo/packr/v2/[email protected]
COPY go.mod go.sum /src/
COPY . /src
RUN cd /src && go mod download

# BUILD BINARY
COPY . /src
#COPY . /src
RUN cd /src/db && packr2
RUN cd /src && make build
RUN cd /src && go mod tidy && make build

# CONTAINER FOR RUNNING BINARY
FROM alpine:3.18.0
COPY --from=build /src/dist/zkevm-node /app/zkevm-node
COPY --from=build /src/config/environments/testnet/node.config.toml /app/example.config.toml
RUN apk update && apk add postgresql15-client
RUN apk update && apk add postgresql15-client && cd /app/ && mkdir logs
EXPOSE 8123
CMD ["/bin/sh", "-c", "/app/zkevm-node run"]
136 changes: 122 additions & 14 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/nhconnector"
"github.com/0xPolygonHermez/zkevm-node/state"
substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"google.golang.org/grpc"
Expand All @@ -43,6 +45,12 @@ type finalProofMsg struct {
finalProof *prover.FinalProof
}

type finalProofElement struct {
finalProof finalProofMsg
attestationId substrateTypes.U64
proofValue string
}

// Aggregator represents an aggregator
type Aggregator struct {
prover.UnimplementedAggregatorServiceServer
Expand All @@ -61,9 +69,11 @@ type Aggregator struct {
finalProof chan finalProofMsg
verifyingProof bool

srv *grpc.Server
ctx context.Context
exit context.CancelFunc
srv *grpc.Server
ctx context.Context
exit context.CancelFunc
nhConnector nhconnector.NHConnector
finalProofQueue FinalProofsQueue
}

// New creates a new aggregator.
Expand All @@ -72,6 +82,7 @@ func New(
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
nhConnector nhconnector.NHConnector,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -92,7 +103,9 @@ func New(
TimeSendFinalProofMutex: &sync.RWMutex{},
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,

finalProof: make(chan finalProofMsg),
finalProof: make(chan finalProofMsg),
nhConnector: nhConnector,
finalProofQueue: FinalProofsQueue{},
}

return a, nil
Expand Down Expand Up @@ -144,7 +157,7 @@ func (a *Aggregator) Start(ctx context.Context) error {
a.resetVerifyProofTime()

go a.cleanupLockedProofs()
go a.sendFinalProof()
go a.processVerifiedProof()

<-ctx.Done()
return ctx.Err()
Expand Down Expand Up @@ -232,6 +245,89 @@ func (a *Aggregator) Channel(stream prover.AggregatorService_ChannelServer) erro
}
}

func (a *Aggregator) processVerifiedProof() {

for {
log.Debug("ProcessVerifiedProof...")
if !a.finalProofQueue.IsEmpty() {
proofToCheck, err := a.finalProofQueue.Peek()
log.Debug("Found proof in queue with attestation id: ", proofToCheck.attestationId)
if err != nil {
log.Errorf("Failed to retrieve the finalProofQueue peek element")
}
isProofValidated, err := a.State.IsAttestationPublishedOnL1(a.ctx, proofToCheck.attestationId, nil)
if isProofValidated {
log.Debug("Proof already validated, move on with sending it!")
a.SendFinalProofv2(proofToCheck)
}

}
time.Sleep(a.cfg.RetryTime.Duration)
}
}

func (a *Aggregator) SendFinalProofv2(finalProofElement finalProofElement) {
ctx := a.ctx
proof := finalProofElement.finalProof.recursiveProof

proofMerklePath := a.nhConnector.GetProofMerklePath(finalProofElement.attestationId, finalProofElement.proofValue)
log.Debug("Proof Merkle Path: ", proofMerklePath)

log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
log.Info("Verifying final proof with ethereum smart contract")

a.startProofVerification()

finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
log.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
a.endProofVerification()
return
}

inputs := ethmanTypes.FinalProofInputs{
FinalProof: finalProofElement.finalProof.finalProof,
NewLocalExitRoot: finalBatch.LocalExitRoot.Bytes(),
NewStateRoot: finalBatch.StateRoot.Bytes(),
AttestationId: uint64(finalProofElement.attestationId),
LeafCount: uint64(proofMerklePath.NumberOfLeaves),
LeafIndex: uint64(proofMerklePath.LeafIndex),
MerklePath: proofMerklePath.Proof,
}

log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
log := log.WithFields("tx", monitoredTxID)
log.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

a.resetVerifyProofTime()
a.endProofVerification()
proofToDelete, err := a.finalProofQueue.Dequeue()
if err != nil {
log.Errorf("Error in removing proof element from the FinalProofQueue %v", err)
}
a.State.DeletePublishedAttestationIds(ctx, proofToDelete.attestationId, nil)
}

// This function waits to receive a final proof from a prover. Once it receives
// the proof, it performs these steps in order:
// - send the final proof to L1
Expand Down Expand Up @@ -304,8 +400,16 @@ func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Cont
a.endProofVerification()
}

func (a *Aggregator) sendProofToNH(finalProof *prover.FinalProof) nhconnector.PoeNewElement {
proofVerifiedEvent, err := a.nhConnector.SendProofToNH(finalProof)
if err != nil {
log.Errorf("Error in sending proof to NH: %v", err)
}
return proofVerifiedEvent
}

// buildFinalProof builds and return the final proof for an aggregated/batch proof.
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*prover.FinalProof, error) {
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*prover.FinalProof, nhconnector.PoeNewElement, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
Expand All @@ -317,7 +421,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface

finalProofID, err := prover.FinalProof(proof.Proof, a.cfg.SenderAddress)
if err != nil {
return nil, fmt.Errorf("failed to get final proof id: %w", err)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to get final proof id: %w", err)
}
proof.ProofID = finalProofID

Expand All @@ -326,26 +430,27 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface

finalProof, err := prover.WaitFinalProof(ctx, *proof.ProofID)
if err != nil {
return nil, fmt.Errorf("failed to get final proof from prover: %w", err)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to get final proof from prover: %w", err)
}

log.Info("Final proof generated")
proofVerifiedEvent := a.sendProofToNH(finalProof)

// mock prover sanity check
if string(finalProof.Public.NewStateRoot) == mockedStateRoot && string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot {
// This local exit root and state root come from the mock
// prover, use the one captured by the executor instead
finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}
log.Warnf("NewLocalExitRoot and NewStateRoot look like a mock values, using values from executor instead: LER: %v, SR: %v",
finalBatch.LocalExitRoot.TerminalString(), finalBatch.StateRoot.TerminalString())
finalProof.Public.NewStateRoot = finalBatch.StateRoot.Bytes()
finalProof.Public.NewLocalExitRoot = finalBatch.LocalExitRoot.Bytes()
}

return finalProof, nil
return finalProof, proofVerifiedEvent, nil
}

// tryBuildFinalProof checks if the provided proof is eligible to be used to
Expand Down Expand Up @@ -427,8 +532,8 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
)

// at this point we have an eligible proof, build the final one using it
finalProof, err := a.buildFinalProof(ctx, prover, proof)
if err != nil {
finalProof, proofVerifiedEvent, err := a.buildFinalProof(ctx, prover, proof)
if err != nil || proofVerifiedEvent.AttestationId == 0 {
err = fmt.Errorf("failed to build final proof, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
Expand All @@ -440,12 +545,13 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
recursiveProof: proof,
finalProof: finalProof,
}
a.finalProofQueue.Enqueue(finalProofElement{finalProof: msg, attestationId: proofVerifiedEvent.AttestationId, proofValue: proofVerifiedEvent.Value.Hex()})

select {
/*select {
case <-a.ctx.Done():
return false, a.ctx.Err()
case a.finalProof <- msg:
}
}*/

log.Debug("tryBuildFinalProof end")
return true, nil
Expand Down Expand Up @@ -920,13 +1026,15 @@ func (a *Aggregator) startProofVerification() {

// endProofVerification set verifyingProof to false to indicate that there is not proof verification in progress
func (a *Aggregator) endProofVerification() {
log.Debug("EndProofVerification...")
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.verifyingProof = false
}

// resetVerifyProofTime updates the timeout to verify a proof.
func (a *Aggregator) resetVerifyProofTime() {
log.Debug("ResetVerifyProofTime...")
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.TimeSendFinalProof = time.Now().Add(a.cfg.VerifyProofInterval.Duration)
Expand Down
11 changes: 6 additions & 5 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
configTypes "github.com/0xPolygonHermez/zkevm-node/config/types"
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/nhconnector"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/test/testutils"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestSendFinalProof(t *testing.T) {
stateMock := mocks.NewStateMock(t)
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
a.ctx, a.exit = context.WithCancel(context.Background())
m := mox{
Expand Down Expand Up @@ -685,7 +686,7 @@ func TestTryAggregateProofs(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -958,7 +959,7 @@ func TestTryGenerateBatchProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1235,7 +1236,7 @@ func TestTryBuildFinalProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1365,7 +1366,7 @@ func TestIsSynced(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down
3 changes: 3 additions & 0 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/state"
substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)
Expand Down Expand Up @@ -62,4 +63,6 @@ type stateInterface interface {
DeleteUngeneratedProofs(ctx context.Context, dbTx pgx.Tx) error
CleanupGeneratedProofs(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error)
IsAttestationPublishedOnL1(ctx context.Context, attestationId substrateTypes.U64, dbTx pgx.Tx) (bool, error)
DeletePublishedAttestationIds(ctx context.Context, attestationId substrateTypes.U64, dbTx pgx.Tx) error
}
11 changes: 6 additions & 5 deletions aggregator/mocks/mock_dbtx.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions aggregator/mocks/mock_etherman.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading