diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index 211bbfa7..8be8f56d 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -43,19 +43,31 @@ func Command() *cobra.Command { encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) - querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) - err = querier.Start() + appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) + err = appQuerier.Start() if err != nil { return err } defer func() { - err := querier.Stop() + err := appQuerier.Stop() if err != nil { logger.Error(err.Error()) } }() - vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce) + tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger) + err = tmQuerier.Start() + if err != nil { + return err + } + defer func(tmQuerier *rpc.TmQuerier) { + err := tmQuerier.Stop() + if err != nil { + logger.Error(err.Error()) + } + }(tmQuerier) + + vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce) if err != nil { logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract") return errors.Wrap( @@ -130,15 +142,29 @@ func Command() *cobra.Command { } // getStartingValset get the valset that will be used to init the bridge contract. -func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { +func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { switch startingNonce { case "latest": - return querier.QueryLatestValset(ctx) + vs, err := appQuerier.QueryLatestValset(ctx) + if err != nil { + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight)) + } + return vs, nil case "earliest": // TODO make the first nonce 1 a const - att, err := querier.QueryAttestationByNonce(ctx, 1) + att, err := appQuerier.QueryAttestationByNonce(ctx, 1) if err != nil { - return nil, err + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1) + if err != nil { + return nil, err + } + att = historicalAtt } vs, ok := att.(*types.Valset) if !ok { @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon if err != nil { return nil, err } - attestation, err := querier.QueryAttestationByNonce(ctx, nonce) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight)) if err != nil { return nil, err } if attestation == nil { return nil, types.ErrNilAttestation } - value, ok := attestation.(*types.Valset) - if ok { + switch value := attestation.(type) { + case *types.Valset: return value, nil + case *types.DataCommitment: + return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock) } - return querier.QueryLastValsetBeforeNonce(ctx, nonce) } + return nil, ErrNotFound } diff --git a/cmd/blobstream/deploy/config.go b/cmd/blobstream/deploy/config.go index f84cdc3b..47b479e6 100644 --- a/cmd/blobstream/deploy/config.go +++ b/cmd/blobstream/deploy/config.go @@ -17,6 +17,8 @@ const ( FlagEVMGasLimit = "evm.gas-limit" FlagCoreGRPCHost = "core.grpc.host" FlagCoreGRPCPort = "core.grpc.port" + FlagCoreRPCHost = "core.rpc.host" + FlagCoreRPCPort = "core.rpc.port" FlagStartingNonce = "starting-nonce" ServiceNameDeployer = "deployer" ) @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id") cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host") cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port") + cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host") + cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port") cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address") cmd.Flags().String( FlagStartingNonce, @@ -48,11 +52,12 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { type deployConfig struct { *base.Config - evmRPC, coreGRPC string - evmChainID uint64 - evmAccAddress string - startingNonce string - evmGasLimit uint64 + evmRPC string + coreRPC, coreGRPC string + evmChainID uint64 + evmAccAddress string + startingNonce string + evmGasLimit uint64 } func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { @@ -72,6 +77,11 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { return deployConfig{}, err } coreGRPCPort, err := cmd.Flags().GetUint(FlagCoreGRPCPort) + coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost) + if err != nil { + return deployConfig{}, err + } + coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort) if err != nil { return deployConfig{}, err } @@ -107,6 +117,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { evmAccAddress: evmAccAddr, evmChainID: evmChainID, coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort), + coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort), evmRPC: evmRPC, startingNonce: startingNonce, evmGasLimit: evmGasLimit, diff --git a/cmd/blobstream/deploy/errors.go b/cmd/blobstream/deploy/errors.go index f5d814b0..d5149d7e 100644 --- a/cmd/blobstream/deploy/errors.go +++ b/cmd/blobstream/deploy/errors.go @@ -2,4 +2,7 @@ package deploy import "errors" -var ErrUnmarshallValset = errors.New("couldn't unmarsall valset") +var ( + ErrUnmarshallValset = errors.New("couldn't unmarshall valset") + ErrNotFound = errors.New("not found") +) diff --git a/e2e/scripts/deploy_blobstream_contract.sh b/e2e/scripts/deploy_blobstream_contract.sh index b6270353..077ca8a9 100644 --- a/e2e/scripts/deploy_blobstream_contract.sh +++ b/e2e/scripts/deploy_blobstream_contract.sh @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..." --evm.account "${EVM_ACCOUNT}" \ --core.grpc.host "${CORE_GRPC_HOST}" \ --core.grpc.port "${CORE_GRPC_PORT}" \ + --core.rpc.host "${CORE_RPC_HOST}" \ + --core.rpc.port "${CORE_RPC_PORT}" \ --starting-nonce "${STARTING_NONCE}" \ --evm.rpc "${EVM_ENDPOINT}" \ --evm.passphrase=123 > /opt/output diff --git a/relayer/relayer.go b/relayer/relayer.go index 79c7cd78..f77a0593 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -148,7 +148,15 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx) if err != nil { - return nil, err + r.logger.Error("failed to query the last valset before nonce from p2p network. trying using an archive node", "err", err.Error()) + currentHeight, err := r.TmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight)) + if err != nil { + return nil, err + } } } switch att := attI.(type) { diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 1a4d873d..366a9e55 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -2,6 +2,11 @@ package rpc import ( "context" + "strconv" + + "github.com/celestiaorg/celestia-app/pkg/appconsts" + cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -14,6 +19,8 @@ import ( tmlog "github.com/tendermint/tendermint/libs/log" ) +var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds() + // AppQuerier queries the application for attestations and unbonding periods. type AppQuerier struct { blobStreamRPC string @@ -63,6 +70,57 @@ func (aq *AppQuerier) QueryAttestationByNonce(ctx context.Context, nonce uint64) return unmarshalledAttestation, nil } +// QueryHistoricalAttestationByNonce query an attestation by nonce from the state machine at a certain height. +func (aq *AppQuerier) QueryHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err != nil { + return nil, err + } + if atResp.Attestation == nil { + return nil, nil + } + + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + + return unmarshalledAttestation, nil +} + +// QueryRecursiveHistoricalAttestationByNonce query an attestation by nonce from the state machine +// via going over the history step by step starting from height. +func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + currentHeight := height + for currentHeight >= 1 { + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err == nil { + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + return unmarshalledAttestation, nil + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLatestAttestationNonce query the latest attestation nonce from the state machine. func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) @@ -78,6 +136,23 @@ func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, return resp.Nonce, nil } +// QueryHistoricalLatestAttestationNonce query the historical latest attestation nonce from the state machine at a certain nonce. +func (aq *AppQuerier) QueryHistoricalLatestAttestationNonce(ctx context.Context, height uint64) (uint64, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + resp, err := queryClient.LatestAttestationNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestAttestationNonceRequest{}, + grpc.Header(&header), + ) + if err != nil { + return 0, err + } + + return resp.Nonce, nil +} + // QueryDataCommitmentByNonce query a data commitment by its nonce. func (aq *AppQuerier) QueryDataCommitmentByNonce(ctx context.Context, nonce uint64) (*celestiatypes.DataCommitment, error) { attestation, err := aq.QueryAttestationByNonce(ctx, nonce) @@ -134,6 +209,24 @@ func (aq *AppQuerier) QueryValsetByNonce(ctx context.Context, nonce uint64) (*ce return value, nil } +// QueryHistoricalValsetByNonce query a historical valset by nonce. +func (aq *AppQuerier) QueryHistoricalValsetByNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + attestation, err := aq.QueryHistoricalAttestationByNonce(ctx, nonce, height) + if err != nil { + return nil, err + } + if attestation == nil { + return nil, types.ErrAttestationNotFound + } + + value, ok := attestation.(*celestiatypes.Valset) + if !ok { + return nil, types.ErrUnmarshalValset + } + + return value, nil +} + // QueryLatestValset query the latest recorded valset in the state machine. func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Valset, error) { latestNonce, err := aq.QueryLatestAttestationNonce(ctx) @@ -153,6 +246,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val return latestValset, nil } +// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history. +func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) { + currentHeight := height + for currentHeight >= 1 { + latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) + if err != nil { + return nil, err + } + + var latestValset *celestiatypes.Valset + if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { + latestValset = vs + } else { + latestValset, err = aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLastValsetBeforeNonce returns the last valset before nonce. // This will be needed when signing to know the validator set at that particular nonce. // the provided `nonce` can be a valset, but this will return the valset before it. @@ -170,6 +287,43 @@ func (aq *AppQuerier) QueryLastValsetBeforeNonce(ctx context.Context, nonce uint return resp.Valset, nil } +// QueryHistoricalLastValsetBeforeNonce returns the last historical valset before nonce for a certain height. +func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err != nil { + return nil, err + } + + return resp.Valset, nil +} + +// QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis. +func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + currentHeight := height + for currentHeight >= 1 { + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err == nil { + return resp.Valset, err + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLastUnbondingHeight query the last unbonding height from state machine. func (aq *AppQuerier) QueryLastUnbondingHeight(ctx context.Context) (int64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) diff --git a/rpc/errors.go b/rpc/errors.go index eabcf003..b91e0f6b 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -2,4 +2,7 @@ package rpc import "errors" -var ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") +var ( + ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") + ErrNotFound = errors.New("not found") +)