Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: use historical data to query attestations
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Oct 31, 2023
1 parent adf739b commit c2f01ef
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 20 deletions.
56 changes: 44 additions & 12 deletions cmd/blobstream/deploy/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,31 @@ func Command() *cobra.Command {

encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)

querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = querier.Start()
appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = appQuerier.Start()
if err != nil {
return err
}
defer func() {
err := querier.Stop()
err := appQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}()

vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce)
tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger)
err = tmQuerier.Start()
if err != nil {
return err
}
defer func(tmQuerier *rpc.TmQuerier) {
err := tmQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}(tmQuerier)

vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce)
if err != nil {
logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract")
return errors.Wrap(
Expand Down Expand Up @@ -130,15 +142,29 @@ func Command() *cobra.Command {
}

// getStartingValset get the valset that will be used to init the bridge contract.
func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
switch startingNonce {
case "latest":
return querier.QueryLatestValset(ctx)
vs, err := appQuerier.QueryLatestValset(ctx)
if err != nil {
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight))
}
return vs, nil
case "earliest":
// TODO make the first nonce 1 a const
att, err := querier.QueryAttestationByNonce(ctx, 1)
att, err := appQuerier.QueryAttestationByNonce(ctx, 1)
if err != nil {
return nil, err
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1)
if err != nil {
return nil, err
}
att = historicalAtt
}
vs, ok := att.(*types.Valset)
if !ok {
Expand All @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon
if err != nil {
return nil, err
}
attestation, err := querier.QueryAttestationByNonce(ctx, nonce)
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight))
if err != nil {
return nil, err
}
if attestation == nil {
return nil, types.ErrNilAttestation
}
value, ok := attestation.(*types.Valset)
if ok {
switch value := attestation.(type) {
case *types.Valset:
return value, nil
case *types.DataCommitment:
return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock)
}
return querier.QueryLastValsetBeforeNonce(ctx, nonce)
}
return nil, ErrNotFound
}
21 changes: 16 additions & 5 deletions cmd/blobstream/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
FlagEVMGasLimit = "evm.gas-limit"
FlagCoreGRPCHost = "core.grpc.host"
FlagCoreGRPCPort = "core.grpc.port"
FlagCoreRPCHost = "core.rpc.host"
FlagCoreRPCPort = "core.rpc.port"
FlagStartingNonce = "starting-nonce"
ServiceNameDeployer = "deployer"
)
Expand All @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {
cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id")
cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host")
cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port")
cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host")
cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port")
cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address")
cmd.Flags().String(
FlagStartingNonce,
Expand All @@ -48,11 +52,12 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {

type deployConfig struct {
*base.Config
evmRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
evmRPC string
coreRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
}

func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
Expand All @@ -72,6 +77,11 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
return deployConfig{}, err
}
coreGRPCPort, err := cmd.Flags().GetUint(FlagCoreGRPCPort)

Check failure on line 79 in cmd/blobstream/deploy/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ineffectual assignment to err (ineffassign)

Check failure on line 79 in cmd/blobstream/deploy/config.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

ineffectual assignment to err (ineffassign)
coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost)
if err != nil {
return deployConfig{}, err
}
coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort)
if err != nil {
return deployConfig{}, err
}
Expand Down Expand Up @@ -107,6 +117,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
evmAccAddress: evmAccAddr,
evmChainID: evmChainID,
coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort),
coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort),
evmRPC: evmRPC,
startingNonce: startingNonce,
evmGasLimit: evmGasLimit,
Expand Down
5 changes: 4 additions & 1 deletion cmd/blobstream/deploy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ package deploy

import "errors"

var ErrUnmarshallValset = errors.New("couldn't unmarsall valset")
var (
ErrUnmarshallValset = errors.New("couldn't unmarshall valset")
ErrNotFound = errors.New("not found")
)
2 changes: 2 additions & 0 deletions e2e/scripts/deploy_blobstream_contract.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..."
--evm.account "${EVM_ACCOUNT}" \
--core.grpc.host "${CORE_GRPC_HOST}" \
--core.grpc.port "${CORE_GRPC_PORT}" \
--core.rpc.host "${CORE_RPC_HOST}" \
--core.rpc.port "${CORE_RPC_PORT}" \
--starting-nonce "${STARTING_NONCE}" \
--evm.rpc "${EVM_ENDPOINT}" \
--evm.passphrase=123 > /opt/output
Expand Down
10 changes: 9 additions & 1 deletion relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt
r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error())
previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx)
if err != nil {
return nil, err
r.logger.Error("failed to query the last valset before nonce from p2p network. trying using an archive node", "err", err.Error())
currentHeight, err := r.TmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight))
if err != nil {
return nil, err
}
}
}
switch att := attI.(type) {
Expand Down
154 changes: 154 additions & 0 deletions rpc/app_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package rpc

import (
"context"
"strconv"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc"
"google.golang.org/grpc/metadata"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -14,6 +19,8 @@ import (
tmlog "github.com/tendermint/tendermint/libs/log"
)

var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds()

// AppQuerier queries the application for attestations and unbonding periods.
type AppQuerier struct {
blobStreamRPC string
Expand Down Expand Up @@ -63,6 +70,57 @@ func (aq *AppQuerier) QueryAttestationByNonce(ctx context.Context, nonce uint64)
return unmarshalledAttestation, nil
}

// QueryHistoricalAttestationByNonce query an attestation by nonce from the state machine at a certain height.
func (aq *AppQuerier) QueryHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err != nil {
return nil, err
}
if atResp.Attestation == nil {
return nil, nil
}

unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
}

return unmarshalledAttestation, nil
}

// QueryRecursiveHistoricalAttestationByNonce query an attestation by nonce from the state machine
// via going over the history step by step starting from height.
func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
for currentHeight >= 1 {
var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err == nil {
unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
}
return unmarshalledAttestation, nil
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLatestAttestationNonce query the latest attestation nonce from the state machine.
func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
Expand All @@ -78,6 +136,23 @@ func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64,
return resp.Nonce, nil
}

// QueryHistoricalLatestAttestationNonce query the historical latest attestation nonce from the state machine at a certain nonce.
func (aq *AppQuerier) QueryHistoricalLatestAttestationNonce(ctx context.Context, height uint64) (uint64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

var header metadata.MD
resp, err := queryClient.LatestAttestationNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestAttestationNonceRequest{},
grpc.Header(&header),
)
if err != nil {
return 0, err
}

return resp.Nonce, nil
}

// QueryDataCommitmentByNonce query a data commitment by its nonce.
func (aq *AppQuerier) QueryDataCommitmentByNonce(ctx context.Context, nonce uint64) (*celestiatypes.DataCommitment, error) {
attestation, err := aq.QueryAttestationByNonce(ctx, nonce)
Expand Down Expand Up @@ -134,6 +209,24 @@ func (aq *AppQuerier) QueryValsetByNonce(ctx context.Context, nonce uint64) (*ce
return value, nil
}

// QueryHistoricalValsetByNonce query a historical valset by nonce.
func (aq *AppQuerier) QueryHistoricalValsetByNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
attestation, err := aq.QueryHistoricalAttestationByNonce(ctx, nonce, height)
if err != nil {
return nil, err
}
if attestation == nil {
return nil, types.ErrAttestationNotFound
}

value, ok := attestation.(*celestiatypes.Valset)
if !ok {
return nil, types.ErrUnmarshalValset
}

return value, nil
}

// QueryLatestValset query the latest recorded valset in the state machine.
func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Valset, error) {
latestNonce, err := aq.QueryLatestAttestationNonce(ctx)
Expand All @@ -153,6 +246,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val
return latestValset, nil
}

// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history.
func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) {
currentHeight := height
for currentHeight >= 1 {
latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight)
if err != nil {
return nil, err
}

var latestValset *celestiatypes.Valset
if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil {

Check failure on line 259 in rpc/app_querier.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4006: this value of `vs` is never used (staticcheck)

Check failure on line 259 in rpc/app_querier.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

SA4006: this value of `vs` is never used (staticcheck)
latestValset = vs

Check failure on line 260 in rpc/app_querier.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4006: this value of `latestValset` is never used (staticcheck)

Check failure on line 260 in rpc/app_querier.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

SA4006: this value of `latestValset` is never used (staticcheck)
} else {
latestValset, err = aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight)
if err == nil {
return latestValset, nil
}
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLastValsetBeforeNonce returns the last valset before nonce.
// This will be needed when signing to know the validator set at that particular nonce.
// the provided `nonce` can be a valset, but this will return the valset before it.
Expand All @@ -170,6 +287,43 @@ func (aq *AppQuerier) QueryLastValsetBeforeNonce(ctx context.Context, nonce uint
return resp.Valset, nil
}

// QueryHistoricalLastValsetBeforeNonce returns the last historical valset before nonce for a certain height.
func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err != nil {
return nil, err
}

return resp.Valset, nil
}

// QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis.
func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
for currentHeight >= 1 {
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err == nil {
return resp.Valset, err
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLastUnbondingHeight query the last unbonding height from state machine.
func (aq *AppQuerier) QueryLastUnbondingHeight(ctx context.Context) (int64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
Expand Down
Loading

0 comments on commit c2f01ef

Please sign in to comment.