diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b5c59ce3e..4741a901e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,15 +3,21 @@ ## Unreleased Changes ### FEATURES - +- [relayer] + - Implement relaying for recv_packet ([#379]) + +- [relayer-cli] + - Packet CLIs for recv_packet ([#443]) + ### IMPROVEMENTS - -- Mock chain (implementing IBC handlers) and integration against CLI ([#158]) -- Relayer tests for client update (ping pong) against MockChain ([#381]) - +- [relayer] + - Mock chain (implementing IBC handlers) and integration against CLI ([#158]) + - Relayer tests for client update (ping pong) against MockChain ([#381]) [#158]: https://github.com/informalsystems/ibc-rs/issues/158 +[#379]: https://github.com/informalsystems/ibc-rs/issues/379 [#381]: https://github.com/informalsystems/ibc-rs/issues/381 +[#443]: https://github.com/informalsystems/ibc-rs/issues/443 ## v0.0.5 *December 2, 2020* diff --git a/modules/src/events.rs b/modules/src/events.rs index b025ba34b1..6b746ecc31 100644 --- a/modules/src/events.rs +++ b/modules/src/events.rs @@ -3,6 +3,7 @@ use crate::ics02_client::events::NewBlock; use crate::ics03_connection::events as ConnectionEvents; use crate::ics04_channel::events as ChannelEvents; use crate::ics20_fungible_token_transfer::events as TransferEvents; +use crate::Height as ICSHeight; use tendermint_rpc::event::{Event as RpcEvent, EventData as RpcEventData}; @@ -14,6 +15,22 @@ use tendermint::block::Height; use tracing::warn; +/// Events types +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum IBCEventType { + SendPacket, + RecvPacket, +} + +impl IBCEventType { + pub fn as_str(&self) -> &'static str { + match *self { + IBCEventType::SendPacket => "send_packet", + _ => "unhandled", + } + } +} + /// Events created by the IBC component of a chain, destined for a relayer. #[derive(Debug, Clone, Deserialize, Serialize)] pub enum IBCEvent { @@ -50,6 +67,27 @@ impl IBCEvent { pub fn to_json(&self) -> String { serde_json::to_string(self).unwrap() } + pub fn height(&self) -> Height { + match self { + IBCEvent::NewBlock(bl) => bl.height, + IBCEvent::UpdateClient(uc) => uc.height, + IBCEvent::SendPacketChannel(ev) => ev.envelope.height, + IBCEvent::ReceivePacketChannel(ev) => ev.height, + _ => { + unimplemented!() + } + } + } + pub fn set_height(&mut self, height: ICSHeight) { + match self { + IBCEvent::SendPacketChannel(ev) => { + ev.envelope.height = Height::try_from(height.version_height).unwrap() + } + _ => { + unimplemented!() + } + } + } } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/modules/src/ics02_client/height.rs b/modules/src/ics02_client/height.rs index a4e5291f17..114db36e75 100644 --- a/modules/src/ics02_client/height.rs +++ b/modules/src/ics02_client/height.rs @@ -4,8 +4,9 @@ use tendermint_proto::Protobuf; use crate::ics02_client::error::{Error, Kind}; use ibc_proto::ibc::core::client::v1::Height as RawHeight; +use serde_derive::{Deserialize, Serialize}; -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Height { /// Previously known as "epoch", and will be renamed to "revision" soon pub version_number: u64, @@ -121,10 +122,18 @@ impl From for RawHeight { impl std::fmt::Display for Height { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!( - f, - "epoch: {}, height: {}", - self.version_number, self.version_height - ) + write!(f, "{}-{}", self.version_number, self.version_height) + } +} + +impl TryFrom for Height { + type Error = anomaly::Error; + + fn try_from(value: String) -> Result { + let split: Vec<&str> = value.split('-').collect(); + Ok(Height { + version_number: split[0].parse::().unwrap(), + version_height: split[1].parse::().unwrap(), + }) } } diff --git a/modules/src/ics04_channel/channel.rs b/modules/src/ics04_channel/channel.rs index b8607057d4..1ddfc77add 100644 --- a/modules/src/ics04_channel/channel.rs +++ b/modules/src/ics04_channel/channel.rs @@ -6,6 +6,8 @@ use ibc_proto::ibc::core::channel::v1::Counterparty as RawCounterparty; use tendermint_proto::Protobuf; +use crate::events::IBCEventType; +use crate::ics02_client::height::Height; use anomaly::fail; use std::convert::{TryFrom, TryInto}; use std::str::FromStr; @@ -270,6 +272,17 @@ impl State { } } +/// Used for queries and not yet standardized in channel's query.proto +// todo -move to channel.rs? +#[derive(Clone, Debug)] +pub struct QueryPacketEventDataRequest { + pub event_id: IBCEventType, + pub channel_id: String, + pub port_id: String, + pub sequences: Vec, + pub height: Height, +} + /// Version validation, specific for channel (ICS4) opening handshake protocol. /// This field is supposed to be opaque to the core IBC protocol. No explicit validation necessary, /// and empty version is currently allowed by the specification (cf. ICS 004, v1). diff --git a/modules/src/ics04_channel/events.rs b/modules/src/ics04_channel/events.rs index 59627f3122..760fed0682 100644 --- a/modules/src/ics04_channel/events.rs +++ b/modules/src/ics04_channel/events.rs @@ -1,10 +1,11 @@ //! Types for the IBC events emitted from Tendermint Websocket by the channels module. use crate::attribute; use crate::events::{IBCEvent, RawObject}; +use crate::ics02_client::height::Height; use crate::ics24_host::identifier::{ChannelId, ConnectionId, PortId}; use anomaly::BoxError; use serde_derive::{Deserialize, Serialize}; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use tendermint::block; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -164,33 +165,51 @@ impl From for IBCEvent { } #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct SendPacket { +pub struct PacketEnvelope { pub height: block::Height, pub packet_src_port: PortId, pub packet_src_channel: ChannelId, pub packet_dst_port: PortId, pub packet_dst_channel: ChannelId, pub packet_sequence: u64, - pub packet_timeout_height: u64, + pub packet_timeout_height: Height, pub packet_timeout_stamp: u64, } -impl TryFrom for SendPacket { +impl TryFrom for PacketEnvelope { type Error = BoxError; fn try_from(obj: RawObject) -> Result { - Ok(SendPacket { + let height_str: String = attribute!(obj, "send_packet.packet_timeout_height"); + Ok(PacketEnvelope { height: obj.height, packet_src_port: attribute!(obj, "send_packet.packet_src_port"), packet_src_channel: attribute!(obj, "send_packet.packet_src_channel"), packet_dst_port: attribute!(obj, "send_packet.packet_dst_port"), packet_dst_channel: attribute!(obj, "send_packet.packet_dst_channel"), packet_sequence: attribute!(obj, "send_packet.packet_sequence"), - packet_timeout_height: attribute!(obj, "send_packet.packet_timeout_height"), + packet_timeout_height: height_str.try_into()?, packet_timeout_stamp: attribute!(obj, "send_packet.packet_timeout_timestamp"), }) } } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct SendPacket { + pub envelope: PacketEnvelope, + pub data: Vec, +} + +impl TryFrom for SendPacket { + type Error = BoxError; + fn try_from(obj: RawObject) -> Result { + let data_str: String = attribute!(obj, "send_packet.packet_data"); + Ok(SendPacket { + envelope: PacketEnvelope::try_from(obj)?, + data: Vec::from(data_str.as_str().as_bytes()), + }) + } +} + impl From for IBCEvent { fn from(v: SendPacket) -> Self { IBCEvent::SendPacketChannel(v) @@ -205,9 +224,8 @@ pub struct ReceivePacket { pub packet_dst_port: PortId, pub packet_dst_channel: ChannelId, pub packet_sequence: u64, - pub packet_timeout_height: u64, + pub packet_timeout_height: String, pub packet_timeout_stamp: u64, - pub packet_ack: String, } impl TryFrom for ReceivePacket { @@ -222,7 +240,6 @@ impl TryFrom for ReceivePacket { packet_sequence: attribute!(obj, "recv_packet.packet_sequence"), packet_timeout_height: attribute!(obj, "recv_packet.packet_timeout_height"), packet_timeout_stamp: attribute!(obj, "recv_packet.packet_timeout_timestamp"), - packet_ack: attribute!(obj, "recv_packet.packet_ack"), }) } } diff --git a/modules/src/ics04_channel/msgs/recv_packet.rs b/modules/src/ics04_channel/msgs/recv_packet.rs index 47bbc7b13c..0b7dee029f 100644 --- a/modules/src/ics04_channel/msgs/recv_packet.rs +++ b/modules/src/ics04_channel/msgs/recv_packet.rs @@ -22,16 +22,15 @@ pub struct MsgRecvPacket { } impl MsgRecvPacket { - // todo: Constructor not used yet. #[allow(dead_code, unreachable_code, unused_variables)] - fn new( + pub fn new( packet: Packet, proof: CommitmentProof, proof_height: Height, signer: AccountId, ) -> Result { Ok(Self { - packet: todo!(), + packet, proofs: Proofs::new(proof, None, None, proof_height) .map_err(|e| Kind::InvalidProof.context(e))?, signer, @@ -58,6 +57,10 @@ impl Msg for MsgRecvPacket { Ok(()) } + fn type_url(&self) -> String { + "/ibc.core.channel.v1.MsgRecvPacket".to_string() + } + fn get_signers(&self) -> Vec { vec![self.signer] } diff --git a/modules/src/ics04_channel/packet.rs b/modules/src/ics04_channel/packet.rs index 291ec8686b..a8832d77de 100644 --- a/modules/src/ics04_channel/packet.rs +++ b/modules/src/ics04_channel/packet.rs @@ -24,14 +24,14 @@ impl From for u64 { #[derive(Clone, Debug, PartialEq)] pub struct Packet { - sequence: Sequence, - source_port: PortId, - source_channel: ChannelId, - destination_port: PortId, - destination_channel: ChannelId, - data: Vec, - timeout_height: Height, - timeout_timestamp: u64, + pub sequence: Sequence, + pub source_port: PortId, + pub source_channel: ChannelId, + pub destination_port: PortId, + pub destination_channel: ChannelId, + pub data: Vec, + pub timeout_height: Height, + pub timeout_timestamp: u64, } impl TryFrom for Packet { diff --git a/modules/src/ics24_host/identifier.rs b/modules/src/ics24_host/identifier.rs index b519ddf5cb..03101f19b9 100644 --- a/modules/src/ics24_host/identifier.rs +++ b/modules/src/ics24_host/identifier.rs @@ -107,6 +107,12 @@ impl From for ChainId { } } +impl Default for ChainId { + fn default() -> Self { + "defaultChainId".to_string().parse().unwrap() + } +} + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct ClientId(String); diff --git a/proto-compiler/README.md b/proto-compiler/README.md index 068f2cfbbc..d1276558ff 100644 --- a/proto-compiler/README.md +++ b/proto-compiler/README.md @@ -9,7 +9,7 @@ The `ibc-proto-compiler` is a simple command-line tool to automate the compilati From within the `proto-compiler` directory, run the following command to clone the Cosmos SDK repository, and check out a specific commit: ```bash -$ cargo run -- clone-sdk --out /tmp/sdk --commit ce3994020a0d5c246016c8832ba4a668e8b7c77b +$ cargo run -- clone-sdk --out /tmp/sdk --commit 15324920548c2629e51d837bcefc1cbc40797c5d ``` Note: the full commit hash must be specified. diff --git a/proto-compiler/src/cmd/compile.rs b/proto-compiler/src/cmd/compile.rs index 13feae16dc..a426bcb7a1 100644 --- a/proto-compiler/src/cmd/compile.rs +++ b/proto-compiler/src/cmd/compile.rs @@ -105,6 +105,7 @@ impl CompileCmd { let proto_services_path = [ sdk_dir.join("proto/cosmos/auth/v1beta1/query.proto"), sdk_dir.join("proto/cosmos/staking/v1beta1/query.proto"), + sdk_dir.join("proto/ibc/core/channel/v1/query.proto"), ]; // List available paths for dependencies diff --git a/proto/src/prost/ibc.core.channel.v1.rs b/proto/src/prost/ibc.core.channel.v1.rs index 38bd149ba8..2e6f1410df 100644 --- a/proto/src/prost/ibc.core.channel.v1.rs +++ b/proto/src/prost/ibc.core.channel.v1.rs @@ -684,3 +684,4 @@ pub struct QueryNextSequenceReceiveResponse { #[prost(message, optional, tag = "4")] pub proof_height: ::std::option::Option, } +# [doc = r" Generated client implementations."] pub mod query_client { # ! [allow (unused_variables , dead_code , missing_docs)] use tonic :: codegen :: * ; # [doc = " Query provides defines the gRPC querier service"] pub struct QueryClient < T > { inner : tonic :: client :: Grpc < T > , } impl QueryClient < tonic :: transport :: Channel > { # [doc = r" Attempt to create a new client by connecting to a given endpoint."] pub async fn connect < D > (dst : D) -> Result < Self , tonic :: transport :: Error > where D : std :: convert :: TryInto < tonic :: transport :: Endpoint > , D :: Error : Into < StdError > , { let conn = tonic :: transport :: Endpoint :: new (dst) ? . connect () . await ? ; Ok (Self :: new (conn)) } } impl < T > QueryClient < T > where T : tonic :: client :: GrpcService < tonic :: body :: BoxBody > , T :: ResponseBody : Body + HttpBody + Send + 'static , T :: Error : Into < StdError > , < T :: ResponseBody as HttpBody > :: Error : Into < StdError > + Send , { pub fn new (inner : T) -> Self { let inner = tonic :: client :: Grpc :: new (inner) ; Self { inner } } pub fn with_interceptor (inner : T , interceptor : impl Into < tonic :: Interceptor >) -> Self { let inner = tonic :: client :: Grpc :: with_interceptor (inner , interceptor) ; Self { inner } } # [doc = " Channel queries an IBC Channel."] pub async fn channel (& mut self , request : impl tonic :: IntoRequest < super :: QueryChannelRequest > ,) -> Result < tonic :: Response < super :: QueryChannelResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/Channel") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " Channels queries all the IBC channels of a chain."] pub async fn channels (& mut self , request : impl tonic :: IntoRequest < super :: QueryChannelsRequest > ,) -> Result < tonic :: Response < super :: QueryChannelsResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/Channels") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " ConnectionChannels queries all the channels associated with a connection"] # [doc = " end."] pub async fn connection_channels (& mut self , request : impl tonic :: IntoRequest < super :: QueryConnectionChannelsRequest > ,) -> Result < tonic :: Response < super :: QueryConnectionChannelsResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/ConnectionChannels") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " ChannelClientState queries for the client state for the channel associated"] # [doc = " with the provided channel identifiers."] pub async fn channel_client_state (& mut self , request : impl tonic :: IntoRequest < super :: QueryChannelClientStateRequest > ,) -> Result < tonic :: Response < super :: QueryChannelClientStateResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/ChannelClientState") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " ChannelConsensusState queries for the consensus state for the channel"] # [doc = " associated with the provided channel identifiers."] pub async fn channel_consensus_state (& mut self , request : impl tonic :: IntoRequest < super :: QueryChannelConsensusStateRequest > ,) -> Result < tonic :: Response < super :: QueryChannelConsensusStateResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/ChannelConsensusState") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " PacketCommitment queries a stored packet commitment hash."] pub async fn packet_commitment (& mut self , request : impl tonic :: IntoRequest < super :: QueryPacketCommitmentRequest > ,) -> Result < tonic :: Response < super :: QueryPacketCommitmentResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/PacketCommitment") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " PacketCommitments returns the all the packet commitments hashes associated"] # [doc = " with a channel."] pub async fn packet_commitments (& mut self , request : impl tonic :: IntoRequest < super :: QueryPacketCommitmentsRequest > ,) -> Result < tonic :: Response < super :: QueryPacketCommitmentsResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/PacketCommitments") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " PacketAcknowledgement queries a stored packet acknowledgement hash."] pub async fn packet_acknowledgement (& mut self , request : impl tonic :: IntoRequest < super :: QueryPacketAcknowledgementRequest > ,) -> Result < tonic :: Response < super :: QueryPacketAcknowledgementResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/PacketAcknowledgement") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " UnreceivedPackets returns all the unrelayed IBC packets associated with a"] # [doc = " channel and sequences."] pub async fn unreceived_packets (& mut self , request : impl tonic :: IntoRequest < super :: QueryUnreceivedPacketsRequest > ,) -> Result < tonic :: Response < super :: QueryUnreceivedPacketsResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/UnreceivedPackets") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " UnrelayedAcks returns all the unrelayed IBC acknowledgements associated with a"] # [doc = " channel and sequences."] pub async fn unrelayed_acks (& mut self , request : impl tonic :: IntoRequest < super :: QueryUnrelayedAcksRequest > ,) -> Result < tonic :: Response < super :: QueryUnrelayedAcksResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/UnrelayedAcks") ; self . inner . unary (request . into_request () , path , codec) . await } # [doc = " NextSequenceReceive returns the next receive sequence for a given channel."] pub async fn next_sequence_receive (& mut self , request : impl tonic :: IntoRequest < super :: QueryNextSequenceReceiveRequest > ,) -> Result < tonic :: Response < super :: QueryNextSequenceReceiveResponse > , tonic :: Status > { self . inner . ready () . await . map_err (| e | { tonic :: Status :: new (tonic :: Code :: Unknown , format ! ("Service was not ready: {}" , e . into ())) }) ? ; let codec = tonic :: codec :: ProstCodec :: default () ; let path = http :: uri :: PathAndQuery :: from_static ("/ibc.core.channel.v1.Query/NextSequenceReceive") ; self . inner . unary (request . into_request () , path , codec) . await } } impl < T : Clone > Clone for QueryClient < T > { fn clone (& self) -> Self { Self { inner : self . inner . clone () , } } } impl < T > std :: fmt :: Debug for QueryClient < T > { fn fmt (& self , f : & mut std :: fmt :: Formatter < '_ >) -> std :: fmt :: Result { write ! (f , "QueryClient {{ ... }}") } } } \ No newline at end of file diff --git a/relayer-cli/Cargo.toml b/relayer-cli/Cargo.toml index ba4fd6ea3d..1d9796d2ad 100644 --- a/relayer-cli/Cargo.toml +++ b/relayer-cli/Cargo.toml @@ -9,6 +9,7 @@ authors = [ [dependencies] relayer = { path = "../relayer" } ibc = { path = "../modules" } +ibc-proto = { version = "0.5.0", path = "../proto" } anomaly = "0.2.0" gumdrop = "0.7" serde = { version = "1", features = ["serde_derive"] } diff --git a/relayer-cli/src/commands/query.rs b/relayer-cli/src/commands/query.rs index 9157ab7ef9..357dc6a900 100644 --- a/relayer-cli/src/commands/query.rs +++ b/relayer-cli/src/commands/query.rs @@ -5,6 +5,7 @@ use abscissa_core::{Command, Options, Runnable}; mod channel; mod client; mod connection; +mod packet; /// `query` subcommand #[derive(Command, Debug, Options, Runnable)] @@ -20,6 +21,10 @@ pub enum QueryCmd { /// The `query channel` subcommand #[options(help = "query channel")] Channel(QueryChannelCmds), + + /// The `query packet` subcommand + #[options(help = "query packets")] + Packet(QueryPacketCmds), } #[derive(Command, Debug, Options, Runnable)] @@ -50,3 +55,18 @@ pub enum QueryChannelCmds { #[options(help = "query channel end")] End(channel::QueryChannelEndCmd), } + +#[derive(Command, Debug, Options, Runnable)] +pub enum QueryPacketCmds { + /// The `query packet commitments` subcommand + #[options(help = "query packet commitments")] + Commitments(packet::QueryPacketCommitmentsCmd), + + /// The `query packet commitment` subcommand + #[options(help = "query packet commitment")] + Commitment(packet::QueryPacketCommitmentCmd), + + /// The `query unreceived packets` subcommand + #[options(help = "query unreceived packets")] + UnreceivedPackets(packet::QueryUnreceivedPacketsCmd), +} diff --git a/relayer-cli/src/commands/query/packet.rs b/relayer-cli/src/commands/query/packet.rs new file mode 100644 index 0000000000..26bdd2d925 --- /dev/null +++ b/relayer-cli/src/commands/query/packet.rs @@ -0,0 +1,273 @@ +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime as TokioRuntime; + +use abscissa_core::{Command, Options, Runnable}; + +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; + +use ibc::ics24_host::identifier::{ChannelId, PortId}; +use ibc::Height; + +use relayer::chain::{Chain, CosmosSDKChain, QueryPacketOptions}; +use relayer::config::{ChainConfig, Config}; + +use crate::error::{Error, Kind}; +use crate::prelude::*; + +#[derive(Clone, Command, Debug, Options)] +pub struct QueryPacketCommitmentsCmd { + #[options(free, help = "identifier of the chain to query")] + chain_id: String, + + #[options(free, help = "identifier of the port to query")] + port_id: PortId, + + #[options(free, help = "identifier of the channel to query")] + channel_id: ChannelId, + + #[options(help = "height of the state to query", short = "h")] + height: Option, +} + +impl QueryPacketCommitmentsCmd { + fn validate_options( + &self, + config: &Config, + ) -> Result<(ChainConfig, QueryPacketOptions), String> { + let dest_chain_config = config + .chains + .iter() + .find(|c| c.id == self.chain_id.parse().unwrap()) + .ok_or_else(|| "missing destination chain configuration".to_string())?; + + let opts = QueryPacketOptions { + port_id: self.port_id.clone(), + channel_id: self.channel_id.clone(), + height: self.height.unwrap_or(0_u64), + }; + + Ok((dest_chain_config.clone(), opts)) + } +} + +// cargo run --bin relayer -- -c relayer/tests/config/fixtures/simple_config.toml query packet commitments ibc-0 transfer ibconexfer --height 3 +impl Runnable for QueryPacketCommitmentsCmd { + fn run(&self) { + let config = app_config(); + + let (chain_config, opts) = match self.validate_options(&config) { + Err(err) => { + status_err!("invalid options: {}", err); + return; + } + Ok(result) => result, + }; + status_info!("Options", "{:?}", opts); + + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); + let chain = CosmosSDKChain::bootstrap(chain_config, rt).unwrap(); + + let grpc_request = QueryPacketCommitmentsRequest { + port_id: opts.port_id.to_string(), + channel_id: opts.channel_id.to_string(), + pagination: None, + }; + + let res: Result<(Vec, Height), Error> = chain + .query_packet_commitments(grpc_request) + .map_err(|e| Kind::Query.context(e).into()); + + match res { + Ok(cs) => status_info!( + "Result for packet commitments query at height", + "{:?} {:#?}", + cs.0, + cs.1 + ), + Err(e) => status_info!("Error encountered on packet commitments query:", "{}", e), + } + } +} + +#[derive(Clone, Command, Debug, Options)] +pub struct QueryPacketCommitmentCmd { + #[options(free, help = "identifier of the chain to query")] + chain_id: String, + + #[options(free, help = "identifier of the port to query")] + port_id: PortId, + + #[options(free, help = "identifier of the channel to query")] + channel_id: ChannelId, + + #[options(free, help = "sequence of packet to query")] + sequence: u64, + + #[options(help = "height of the state to query", short = "h")] + height: Option, +} + +impl QueryPacketCommitmentCmd { + fn validate_options( + &self, + config: &Config, + ) -> Result<(ChainConfig, QueryPacketOptions, u64), String> { + let dest_chain_config = config + .chains + .iter() + .find(|c| c.id == self.chain_id.parse().unwrap()) + .ok_or_else(|| "missing destination chain configuration".to_string())?; + + let opts = QueryPacketOptions { + port_id: self.port_id.clone(), + channel_id: self.channel_id.clone(), + height: self.height.unwrap_or(0_u64), + }; + + Ok((dest_chain_config.clone(), opts, self.sequence)) + } +} + +impl Runnable for QueryPacketCommitmentCmd { + fn run(&self) { + let config = app_config(); + + let (chain_config, opts, sequence) = match self.validate_options(&config) { + Err(err) => { + status_err!("invalid options: {}", err); + return; + } + Ok(result) => result, + }; + status_info!("Options", "{:?}", opts); + + // run without proof: + // cargo run --bin relayer -- -c relayer/tests/config/fixtures/simple_config.toml query packet commitments ibc-0 transfer ibconexfer --height 3 + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); + let chain = CosmosSDKChain::bootstrap(chain_config, rt).unwrap(); + + let res = chain.proven_packet_commitment( + &opts.port_id, + &opts.channel_id, + sequence, + Height::new(0, opts.height), + ); + + match res { + Ok(cs) => status_info!( + "Result for packet commitments query at height", + "{:?} {:#?}", + cs.0, + cs.1 + ), + Err(e) => status_info!("Error encountered on packet commitments query:", "{}", e), + } + } +} + +/// This command does the following: +/// 1. queries the source chain to get the counterparty channel and port identifiers (needed in 3) +/// 2. queries the source chain for all packet commitmments/ sequences for a given port and channel +/// 3. queries the destination chain for the unreceived sequences out of the list obtained in 2. +#[derive(Clone, Command, Debug, Options)] +pub struct QueryUnreceivedPacketsCmd { + #[options( + free, + help = "identifier of the chain to query the unreceived sequences" + )] + dst_chain_id: String, + + #[options( + free, + help = "identifier of the chain where sent sequences are queried" + )] + src_chain_id: String, + + #[options(free, help = "identifier of the port to query on source chain")] + port_id: PortId, + + #[options(free, help = "identifier of the channel to query on source chain")] + channel_id: ChannelId, +} + +impl QueryUnreceivedPacketsCmd { + fn validate_options( + &self, + config: &Config, + ) -> Result<(ChainConfig, ChainConfig, QueryPacketOptions), String> { + let src_chain_config = config + .chains + .iter() + .find(|c| c.id == self.src_chain_id.parse().unwrap()) + .ok_or_else(|| "missing destination chain configuration".to_string())?; + + let dst_chain_config = config + .chains + .iter() + .find(|c| c.id == self.dst_chain_id.parse().unwrap()) + .ok_or_else(|| "missing destination chain configuration".to_string())?; + + let opts = QueryPacketOptions { + port_id: self.port_id.clone(), + channel_id: self.channel_id.clone(), + height: 0_u64, + }; + + Ok((dst_chain_config.clone(), src_chain_config.clone(), opts)) + } +} + +impl Runnable for QueryUnreceivedPacketsCmd { + fn run(&self) { + let config = app_config(); + + let (dst_chain_config, src_chain_config, opts) = match self.validate_options(&config) { + Err(err) => { + status_err!("invalid options: {}", err); + return; + } + Ok(result) => result, + }; + status_info!("Options", "{:?}", opts); + + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); + let src_chain = CosmosSDKChain::bootstrap(src_chain_config, rt.clone()).unwrap(); + let dst_chain = CosmosSDKChain::bootstrap(dst_chain_config, rt).unwrap(); + + // get the channel information from source chain + let channel = src_chain + .query_channel(&opts.port_id, &opts.channel_id, Height::zero()) + .unwrap(); + + // get the packet commitments on source chain + let commitments_request = QueryPacketCommitmentsRequest { + port_id: opts.port_id.to_string(), + channel_id: opts.channel_id.to_string(), + pagination: None, + }; + + // extract the sequences + let sequences: Vec = src_chain + .query_packet_commitments(commitments_request) + .unwrap() + .0 + .into_iter() + .map(|v| v.sequence) + .collect(); + + let request = QueryUnreceivedPacketsRequest { + port_id: channel.counterparty().port_id().to_string(), + channel_id: channel.counterparty().channel_id().unwrap().to_string(), + packet_commitment_sequences: sequences, + }; + + let res = dst_chain.query_unreceived_packets(request); + + match res { + Ok(cs) => status_info!("Result for unreceived packets query", "{:?}", cs), + Err(e) => status_info!("Error encountered on packet commitments query:", "{}", e), + } + } +} diff --git a/relayer-cli/src/commands/tx.rs b/relayer-cli/src/commands/tx.rs index 8fbafc40c8..1cd9c589a9 100644 --- a/relayer-cli/src/commands/tx.rs +++ b/relayer-cli/src/commands/tx.rs @@ -6,6 +6,7 @@ use crate::commands::tx::client::{TxCreateClientCmd, TxUpdateClientCmd}; mod channel; mod client; mod connection; +mod packet; /// `tx` subcommand #[derive(Command, Debug, Options, Runnable)] @@ -64,4 +65,8 @@ pub enum TxRawCommands { /// The `tx raw chan-confirm` subcommand #[options(help = "tx raw chan-confirm")] ChanConfirm(channel::TxRawChanConfirmCmd), + + /// The `tx raw packet-recv` subcommand + #[options(help = "tx raw packet-recv")] + PacketRecv(packet::TxRawPacketRecvCmd), } diff --git a/relayer-cli/src/commands/tx/channel.rs b/relayer-cli/src/commands/tx/channel.rs index eed31d1440..0eccd99a9f 100644 --- a/relayer-cli/src/commands/tx/channel.rs +++ b/relayer-cli/src/commands/tx/channel.rs @@ -92,7 +92,7 @@ macro_rules! chan_open_cmd { let (dst_chain, _) = ChainRuntime::::spawn(dst_chain_config.clone()).unwrap(); - let res: Result = + let res: Result, Error> = $func(dst_chain, src_chain, &opts).map_err(|e| Kind::Tx.context(e).into()); match res { diff --git a/relayer-cli/src/commands/tx/client.rs b/relayer-cli/src/commands/tx/client.rs index df4dcf8abd..35acaeec96 100644 --- a/relayer-cli/src/commands/tx/client.rs +++ b/relayer-cli/src/commands/tx/client.rs @@ -52,7 +52,7 @@ impl Runnable for TxCreateClientCmd { let (src_chain, _) = ChainRuntime::::spawn(src_chain_config).unwrap(); let (dst_chain, _) = ChainRuntime::::spawn(dst_chain_config).unwrap(); - let res: Result = + let res: Result, Error> = build_create_client_and_send(&dst_chain, &src_chain, &opts) .map_err(|e| Kind::Tx.context(e).into()); @@ -102,12 +102,12 @@ impl Runnable for TxUpdateClientCmd { let (src_chain, _) = ChainRuntime::::spawn(src_chain_config).unwrap(); let (dst_chain, _) = ChainRuntime::::spawn(dst_chain_config).unwrap(); - let res: Result = + let res: Result, Error> = build_update_client_and_send(&dst_chain, &src_chain, &opts) .map_err(|e| Kind::Tx.context(e).into()); match res { - Ok(receipt) => status_ok!("Success client updated: {:?}", receipt), + Ok(receipt) => status_ok!("Success client updated: {:?}", &receipt[0]), Err(e) => status_err!("client update failed: {}", e), } } diff --git a/relayer-cli/src/commands/tx/connection.rs b/relayer-cli/src/commands/tx/connection.rs index ce9043380f..40a8f0301e 100644 --- a/relayer-cli/src/commands/tx/connection.rs +++ b/relayer-cli/src/commands/tx/connection.rs @@ -81,11 +81,11 @@ macro_rules! conn_open_cmd { let (dst_chain, _) = ChainRuntime::::spawn(dst_chain_config.clone()).unwrap(); - let res: Result = + let res: Result, Error> = $func(dst_chain, src_chain, &opts).map_err(|e| Kind::Tx.context(e).into()); match res { - Ok(receipt) => status_ok!("Result: ", "{:?} - {:?}", $dbg_string, receipt), + Ok(receipt) => status_ok!("Result: ", "{:?} - {:?}", $dbg_string, receipt[0]), Err(e) => status_err!("Failed with Error: {:?} - {:?}", $dbg_string, e), } } diff --git a/relayer-cli/src/commands/tx/packet.rs b/relayer-cli/src/commands/tx/packet.rs new file mode 100644 index 0000000000..7d8634cb77 --- /dev/null +++ b/relayer-cli/src/commands/tx/packet.rs @@ -0,0 +1,83 @@ +use crate::prelude::*; + +use abscissa_core::{Command, Options, Runnable}; +use relayer::config::Config; + +use crate::error::{Error, Kind}; +use ibc::ics24_host::identifier::{ChannelId, ClientId, PortId}; +use relayer::chain::runtime::ChainRuntime; +use relayer::chain::CosmosSDKChain; +use relayer::link::{build_and_send_recv_packet_messages, PacketOptions}; + +#[derive(Clone, Command, Debug, Options)] +pub struct TxRawPacketRecvCmd { + #[options(free, help = "identifier of the destination chain")] + dest_chain_id: String, + + #[options(free, help = "identifier of the destination client")] + dest_client_id: ClientId, + + #[options(free, help = "identifier of the source chain")] + src_chain_id: String, + + #[options(free, help = "identifier of the source port")] + src_port_id: PortId, + + #[options(free, help = "identifier of the source channel")] + src_channel_id: ChannelId, +} + +impl TxRawPacketRecvCmd { + fn validate_options(&self, config: &Config) -> Result { + let dest_chain_config = config + .chains + .iter() + .find(|c| c.id == self.dest_chain_id.parse().unwrap()) + .ok_or_else(|| "missing destination chain configuration".to_string())?; + + let src_chain_config = config + .chains + .iter() + .find(|c| c.id == self.src_chain_id.parse().unwrap()) + .ok_or_else(|| "missing src chain configuration".to_string())?; + + let opts = PacketOptions { + dst_chain_config: dest_chain_config.clone(), + src_chain_config: src_chain_config.clone(), + dst_client_id: self.dest_client_id.clone(), + src_port_id: self.src_port_id.clone(), + src_channel_id: self.src_channel_id.clone(), + }; + + Ok(opts) + } +} + +impl Runnable for TxRawPacketRecvCmd { + fn run(&self) { + let config = app_config(); + + let opts = match self.validate_options(&config) { + Err(err) => { + status_err!("invalid options: {}", err); + return; + } + Ok(result) => result, + }; + status_info!("Message", "{:?}", opts); + + let (src_chain, _) = + ChainRuntime::::spawn(opts.src_chain_config.clone()).unwrap(); + let (dst_chain, _) = + ChainRuntime::::spawn(opts.dst_chain_config.clone()).unwrap(); + + let res: Result, Error> = + build_and_send_recv_packet_messages(&dst_chain, &src_chain, &opts) + .map_err(|e| Kind::Tx.context(e).into()); + + match res { + Ok(receipt) => status_info!("packet recv, result: ", "{:#?}", receipt), + Err(e) => status_info!("packet recv failed, error: ", "{}", e), + } + } +} diff --git a/relayer-cli/tests/integration.rs b/relayer-cli/tests/integration.rs index f08c6a50e0..3631430f71 100644 --- a/relayer-cli/tests/integration.rs +++ b/relayer-cli/tests/integration.rs @@ -34,7 +34,9 @@ fn simd_config() -> Config { key_name: "testkey".to_string(), store_prefix: "ibc".to_string(), client_ids: vec!["ethbridge".to_string()], - gas: 200000, + gas: Some(200000), + max_msg_num: None, + max_tx_size: None, trust_threshold: Default::default(), trusting_period: default::trusting_period(), clock_drift: default::clock_drift(), diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 6781a05bb5..ed726a0b4e 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -7,6 +7,7 @@ authors = [ ] [dependencies] +subtle-encoding = "0.5" ibc = { path = "../modules" } ibc-proto = { version = "0.5.0", path = "../proto" } anomaly = "0.2.0" diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index c204b7f51c..313df4bd9f 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -22,20 +22,20 @@ use tendermint_proto::Protobuf; use tendermint::account::Id as AccountId; use tendermint::block::Height; -use ibc::ics02_client::header::Header; +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; +use ibc::ics02_client::header::Header; use ibc::ics02_client::state::{ClientState, ConsensusState}; use ibc::ics03_connection::connection::ConnectionEnd; - use ibc::ics03_connection::version::get_compatible_versions; +use ibc::ics04_channel::channel::{ChannelEnd, QueryPacketEventDataRequest}; use ibc::ics23_commitment::commitment::{CommitmentPrefix, CommitmentProof}; +use ibc::ics23_commitment::merkle::MerkleProof; use ibc::ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId}; use ibc::ics24_host::Path; - use ibc::proofs::{ConsensusProof, Proofs}; - -use ibc::ics04_channel::channel::ChannelEnd; -use ibc::ics23_commitment::merkle::MerkleProof; use ibc::Height as ICSHeight; use crate::config::ChainConfig; @@ -44,6 +44,7 @@ use crate::error::{Error, Kind}; use crate::event::monitor::EventBatch; use crate::keyring::store::{KeyEntry, KeyRing}; use crate::light_client::LightClient; +use ibc::events::IBCEvent; /// Generic query response type /// TODO - will slowly move to GRPC protobuf specs for queries @@ -54,6 +55,14 @@ pub struct QueryResponse { pub height: Height, } +/// Packet query options +#[derive(Debug)] +pub struct QueryPacketOptions { + pub port_id: PortId, + pub channel_id: ChannelId, + pub height: u64, +} + /// Defines a blockchain as understood by the relayer pub trait Chain: Sized { /// Type of light blocks for this chain @@ -98,8 +107,8 @@ pub trait Chain: Sized { /// Perform a generic ICS `query`, and return the corresponding response data. fn query(&self, data: Path, height: ICSHeight, prove: bool) -> Result; - /// Send a transaction with `msgs` to chain. - fn send_tx(&mut self, proto_msgs: Vec) -> Result; + /// Sends one or more transactions with `msgs` to chain. + fn send_msgs(&mut self, proto_msgs: Vec) -> Result, Error>; fn get_signer(&mut self) -> Result; @@ -301,4 +310,38 @@ pub trait Chain: Sized { Ok(Proofs::new(channel_proof, None, None, height).map_err(|_| Kind::MalformedProof)?) } + + fn proven_packet_commitment( + &self, + port_id: &PortId, + channel_id: &ChannelId, + sequence: u64, + height: ICSHeight, + ) -> Result<(Vec, MerkleProof), Error> { + let res = self + .query( + Path::Commitments { + port_id: port_id.clone(), + channel_id: channel_id.clone(), + sequence, + }, + height, + true, + ) + .map_err(|e| Kind::Query.context(e))?; + + Ok((res.value, res.proof)) + } + + fn query_packet_commitments( + &self, + request: QueryPacketCommitmentsRequest, + ) -> Result<(Vec, ICSHeight), Error>; + + fn query_unreceived_packets( + &self, + request: QueryUnreceivedPacketsRequest, + ) -> Result, Error>; + + fn query_txs(&self, request: QueryPacketEventDataRequest) -> Result, Error>; } diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 93abd04cd2..4dba252983 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1,5 +1,6 @@ use std::{ convert::TryFrom, + convert::TryInto, future::Future, str::FromStr, sync::{Arc, Mutex}, @@ -7,6 +8,8 @@ use std::{ time::Duration, }; +use subtle_encoding::base64; + use anomaly::fail; use bitcoin::hashes::hex::ToHex; @@ -14,29 +17,38 @@ use crossbeam_channel as channel; use prost::Message; use prost_types::Any; use tokio::runtime::Runtime as TokioRuntime; +use tonic::codegen::http::Uri; use tendermint_proto::crypto::ProofOps; use tendermint_proto::Protobuf; +use tendermint_rpc::query::Query; + use tendermint::abci::Path as TendermintABCIPath; use tendermint::account::Id as AccountId; use tendermint::block::Height; use tendermint::consensus::Params; use tendermint_light_client::types::LightBlock as TMLightBlock; -use tendermint_rpc::Client; -use tendermint_rpc::HttpClient; +use tendermint_rpc::{Client, HttpClient, Order}; use ibc_proto::cosmos::base::v1beta1::Coin; use ibc_proto::cosmos::tx::v1beta1::mode_info::{Single, Sum}; use ibc_proto::cosmos::tx::v1beta1::{AuthInfo, Fee, ModeInfo, SignDoc, SignerInfo, TxBody, TxRaw}; +// Support for GRPC +use ibc_proto::cosmos::auth::v1beta1::{BaseAccount, QueryAccountRequest}; +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; use ibc::downcast; +use ibc::events::{IBCEvent, IBCEventType}; use ibc::ics02_client::client_def::{AnyClientState, AnyConsensusState}; +use ibc::ics04_channel::channel::QueryPacketEventDataRequest; +use ibc::ics04_channel::events::{PacketEnvelope, SendPacket}; use ibc::ics07_tendermint::client_state::ClientState; use ibc::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState; -use ibc::ics07_tendermint::consensus_state::ConsensusState; use ibc::ics07_tendermint::header::Header as TMHeader; use ibc::ics23_commitment::commitment::CommitmentPrefix; @@ -58,9 +70,10 @@ use crate::keyring::store::{KeyEntry, KeyRing, KeyRingOperations, StoreBackend}; use crate::light_client::tendermint::LightClient as TMLightClient; use crate::light_client::LightClient; -// Support for GRPC -use ibc_proto::cosmos::auth::v1beta1::{BaseAccount, QueryAccountRequest}; -use tonic::codegen::http::Uri; +// TODO size this properly +const DEFAULT_MAX_GAS: u64 = 300000; +const DEFAULT_MAX_MSG_NUM: usize = 30; +const DEFAULT_MAX_TX_SIZE: usize = 2 * 1048576; // 2 MBytes pub struct CosmosSDKChain { config: ChainConfig, @@ -120,93 +133,8 @@ impl CosmosSDKChain { fn block_on(&self, f: F) -> Result { Ok(self.rt.lock().map_err(|_| Kind::PoisonedMutex)?.block_on(f)) } -} - -impl Chain for CosmosSDKChain { - type LightBlock = TMLightBlock; - type Header = TMHeader; - type ConsensusState = ConsensusState; - type ClientState = ClientState; - - fn bootstrap(config: ChainConfig, rt: Arc>) -> Result { - let rpc_client = - HttpClient::new(config.rpc_addr.clone()).map_err(|e| Kind::Rpc.context(e))?; - - // Initialize key store and load key - let key_store = KeyRing::init(StoreBackend::Test, config.clone()) - .map_err(|e| Kind::KeyBase.context(e))?; - - Ok(Self { - rt, - config, - keybase: key_store, - rpc_client, - }) - } - - // TODO use a simpler approach to create the light client - #[allow(clippy::type_complexity)] - fn init_light_client( - &self, - ) -> Result<(Box>, Option>), Error> { - let (lc, supervisor) = TMLightClient::from_config(&self.config, true)?; - - let supervisor_thread = thread::spawn(move || supervisor.run().unwrap()); - - Ok((Box::new(lc), Some(supervisor_thread))) - } - - fn init_event_monitor( - &self, - rt: Arc>, - ) -> Result< - ( - channel::Receiver, - Option>, - ), - Error, - > { - let (mut event_monitor, event_receiver) = - EventMonitor::new(self.config.id.clone(), self.config.rpc_addr.clone(), rt)?; - - event_monitor.subscribe().unwrap(); - let monitor_thread = thread::spawn(move || event_monitor.run()); - - Ok((event_receiver, Some(monitor_thread))) - } - - fn id(&self) -> &ChainId { - &self.config().id - } - - fn keybase(&self) -> &KeyRing { - &self.keybase - } - - fn query(&self, data: Path, height: ICSHeight, prove: bool) -> Result { - let path = TendermintABCIPath::from_str(IBC_QUERY_PATH).unwrap(); - - let height = - Height::try_from(height.version_height).map_err(|e| Kind::InvalidHeight.context(e))?; - - if !data.is_provable() & prove { - return Err(Kind::Store - .context("requested proof for a path in the privateStore") - .into()); - } - - let response = - self.block_on(abci_query(&self, path, data.to_string(), height, prove))??; - - // TODO - Verify response proof, if requested. - if prove {} - - Ok(response) - } - /// Send a transaction that includes the specified messages - /// TODO - split the messages in multiple Tx-es such that they don't exceed some max size - fn send_tx(&mut self, proto_msgs: Vec) -> Result { + fn send_tx(&self, proto_msgs: Vec) -> Result { let key = self .keybase() .get_key() @@ -254,7 +182,7 @@ impl Chain for CosmosSDKChain { let fee = Some(Fee { amount: vec![coin], - gas_limit: 150000, + gas_limit: self.gas(), payer: "".to_string(), granter: "".to_string(), }); @@ -298,72 +226,130 @@ impl Chain for CosmosSDKChain { Ok(response) } - /// Get the account for the signer - fn get_signer(&mut self) -> Result { - // Get the key from key seed file - let key = self - .keybase() - .get_key() - .map_err(|e| Kind::KeyBase.context(e))?; + fn gas(&self) -> u64 { + self.config.gas.unwrap_or(DEFAULT_MAX_GAS) + } - let signer: AccountId = - AccountId::from_str(&key.address.to_hex()).map_err(|e| Kind::KeyBase.context(e))?; + fn max_msg_num(&self) -> usize { + self.config.max_msg_num.unwrap_or(DEFAULT_MAX_MSG_NUM) + } - Ok(signer) + fn max_tx_size(&self) -> usize { + self.config.max_tx_size.unwrap_or(DEFAULT_MAX_TX_SIZE) } +} - /// Get the signing key - fn get_key(&mut self) -> Result { - // Get the key from key seed file - let key = self - .keybase() - .get_key() +impl Chain for CosmosSDKChain { + type LightBlock = TMLightBlock; + type Header = TMHeader; + type ConsensusState = TMConsensusState; + type ClientState = ClientState; + + fn bootstrap(config: ChainConfig, rt: Arc>) -> Result { + let rpc_client = + HttpClient::new(config.rpc_addr.clone()).map_err(|e| Kind::Rpc.context(e))?; + + // Initialize key store and load key + let key_store = KeyRing::init(StoreBackend::Test, config.clone()) .map_err(|e| Kind::KeyBase.context(e))?; - Ok(key) + Ok(Self { + rt, + config, + keybase: key_store, + rpc_client, + }) } - fn build_client_state(&self, height: ICSHeight) -> Result { - // Build the client state. - let client_state = ibc::ics07_tendermint::client_state::ClientState::new( - self.id().to_string(), - self.config.trust_threshold, - self.config.trusting_period, - self.unbonding_period()?, - Duration::from_millis(3000), // TODO - get it from src config when avail - height, - ICSHeight::zero(), - self.query_consensus_params()?, - "upgrade/upgradedClient".to_string(), - false, - false, - ) - .map_err(|e| Kind::BuildClientStateFailure.context(e))?; + // TODO use a simpler approach to create the light client + #[allow(clippy::type_complexity)] + fn init_light_client( + &self, + ) -> Result<(Box>, Option>), Error> { + let (lc, supervisor) = TMLightClient::from_config(&self.config, true)?; - Ok(client_state) + let supervisor_thread = thread::spawn(move || supervisor.run().unwrap()); + + Ok((Box::new(lc), Some(supervisor_thread))) } - fn build_consensus_state( + fn init_event_monitor( &self, - light_block: Self::LightBlock, - ) -> Result { - Ok(TMConsensusState::from(light_block.signed_header.header)) + rt: Arc>, + ) -> Result< + ( + channel::Receiver, + Option>, + ), + Error, + > { + let (mut event_monitor, event_receiver) = + EventMonitor::new(self.config.id.clone(), self.config.rpc_addr.clone(), rt)?; + + event_monitor.subscribe().unwrap(); + let monitor_thread = thread::spawn(move || event_monitor.run()); + + Ok((event_receiver, Some(monitor_thread))) } - fn build_header( - &self, - trusted_light_block: Self::LightBlock, - target_light_block: Self::LightBlock, - ) -> Result { - let trusted_height = - ICSHeight::new(self.id().version(), trusted_light_block.height().into()); + fn id(&self) -> &ChainId { + &self.config().id + } - Ok(TMHeader { - trusted_height, - signed_header: target_light_block.signed_header.clone(), - validator_set: target_light_block.validators, - trusted_validator_set: trusted_light_block.validators, - }) + fn keybase(&self) -> &KeyRing { + &self.keybase + } + + fn query(&self, data: Path, height: ICSHeight, prove: bool) -> Result { + let path = TendermintABCIPath::from_str(IBC_QUERY_PATH).unwrap(); + + let height = + Height::try_from(height.version_height).map_err(|e| Kind::InvalidHeight.context(e))?; + + if !data.is_provable() & prove { + return Err(Kind::Store + .context("requested proof for a path in the privateStore") + .into()); + } + + let response = + self.block_on(abci_query(&self, path, data.to_string(), height, prove))??; + + // TODO - Verify response proof, if requested. + if prove {} + + Ok(response) + } + + /// Send one or more transactions that include all the specified messages + fn send_msgs(&mut self, proto_msgs: Vec) -> Result, Error> { + if proto_msgs.is_empty() { + return Ok(vec!["No messages to send".to_string()]); + } + let mut res = vec![]; + + let mut n = 0; + let mut size = 0; + let mut msg_batch = vec![]; + for msg in proto_msgs.iter() { + msg_batch.append(&mut vec![msg.clone()]); + let mut buf = Vec::new(); + prost::Message::encode(msg, &mut buf).unwrap(); + n += 1; + size += buf.len(); + if n >= self.max_msg_num() || size >= self.max_tx_size() { + let result = self.send_tx(msg_batch)?; + res.append(&mut vec![result]); + n = 0; + size = 0; + msg_batch = vec![]; + } + } + if !msg_batch.is_empty() { + let result = self.send_tx(msg_batch)?; + res.append(&mut vec![result]); + } + Ok(res) } /// Query the latest height the chain is at via a RPC query @@ -454,6 +440,263 @@ impl Chain for CosmosSDKChain { Ok((consensus_state, res.proof)) } + + fn build_client_state(&self, height: ICSHeight) -> Result { + // Build the client state. + let client_state = ibc::ics07_tendermint::client_state::ClientState::new( + self.id().to_string(), + self.config.trust_threshold, + self.config.trusting_period, + self.unbonding_period()?, + Duration::from_millis(3000), // TODO - get it from src config when avail + height, + ICSHeight::zero(), + self.query_consensus_params()?, + "upgrade/upgradedClient".to_string(), + false, + false, + ) + .map_err(|e| Kind::BuildClientStateFailure.context(e))?; + + Ok(client_state) + } + + fn build_consensus_state( + &self, + light_block: Self::LightBlock, + ) -> Result { + Ok(TMConsensusState::from(light_block.signed_header.header)) + } + + fn build_header( + &self, + trusted_light_block: Self::LightBlock, + target_light_block: Self::LightBlock, + ) -> Result { + let trusted_height = + ICSHeight::new(self.id().version(), trusted_light_block.height().into()); + + Ok(TMHeader { + trusted_height, + signed_header: target_light_block.signed_header.clone(), + validator_set: target_light_block.validators, + trusted_validator_set: trusted_light_block.validators, + }) + } + + /// Get the account for the signer + fn get_signer(&mut self) -> Result { + // Get the key from key seed file + let key = self + .keybase() + .get_key() + .map_err(|e| Kind::KeyBase.context(e))?; + + let signer: AccountId = + AccountId::from_str(&key.address.to_hex()).map_err(|e| Kind::KeyBase.context(e))?; + + Ok(signer) + } + + /// Get the signing key + fn get_key(&mut self) -> Result { + // Get the key from key seed file + let key = self + .keybase() + .get_key() + .map_err(|e| Kind::KeyBase.context(e))?; + + Ok(key) + } + /// Queries the packet commitment hashes associated with a channel. + /// TODO - move the chain trait + /// Note: the result Vec has an awkward name but fixed in a future IBC proto variant + /// It will move to Vec + fn query_packet_commitments( + &self, + request: QueryPacketCommitmentsRequest, + ) -> Result<(Vec, ICSHeight), Error> { + let grpc_addr = + Uri::from_str(&self.config().grpc_addr).map_err(|e| Kind::Grpc.context(e))?; + let mut client = self + .block_on( + ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(grpc_addr), + )? + .map_err(|e| Kind::Grpc.context(e))?; + + let request = tonic::Request::new(request); + + let response = self + .block_on(client.packet_commitments(request))? + .map_err(|e| Kind::Grpc.context(e))? + .into_inner(); + + let pc = response.commitments; + + let height = response + .height + .ok_or_else(|| Kind::Grpc.context("missing height in response"))? + .try_into() + .map_err(|_| Kind::Grpc.context("invalid height in response"))?; + + Ok((pc, height)) + } + + /// Queries the packet commitment hashes associated with a channel. + /// TODO - move the chain trait + fn query_unreceived_packets( + &self, + request: QueryUnreceivedPacketsRequest, + ) -> Result, Error> { + let grpc_addr = + Uri::from_str(&self.config().grpc_addr).map_err(|e| Kind::Grpc.context(e))?; + let mut client = self + .block_on( + ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(grpc_addr), + )? + .map_err(|e| Kind::Grpc.context(e))?; + + let request = tonic::Request::new(request); + + let response = self + .block_on(client.unreceived_packets(request))? + .map_err(|e| Kind::Grpc.context(e))? + .into_inner(); + + Ok(response.sequences) + } + + /// Queries the packet data for all packets with sequences included in the request. + /// Note - there is no way to format the query such that it asks for Tx-es with either + /// sequence (the query conditions can only be AND-ed) + /// There is a possibility to include "<=" and ">=" conditions but it doesn't work with + /// string attributes (sequence is emmitted as a string). + /// Therefore, here we perform one tx_search for each query. Alternatively, a single query + /// for all packets could be performed but it would return all packets ever sent. + fn query_txs(&self, request: QueryPacketEventDataRequest) -> Result, Error> { + let mut result: Vec = vec![]; + for seq in request.sequences.iter() { + // query all Tx-es that include events related to packet with given port, channel and sequence + let response = self + .block_on(self.rpc_client.tx_search( + packet_query(&request, seq)?, + false, + 1, + 1, + Order::Ascending, + )) + .unwrap() + .unwrap(); // todo + + let mut events = packet_from_tx_search_response(&request, seq, &response)? + .map_or(vec![], |v| vec![v]); + result.append(&mut events); + } + Ok(result) + } +} + +fn packet_query(request: &QueryPacketEventDataRequest, seq: &u64) -> Result { + Ok(tendermint_rpc::query::Query::eq( + format!("{}.packet_src_channel", request.event_id.as_str()), + request.channel_id.clone(), + ) + .and_eq( + format!("{}.packet_src_port", request.event_id.as_str()), + request.port_id.clone(), + ) + .and_eq( + format!("{}.packet_sequence", request.event_id.as_str()), + seq.to_string(), + )) +} + +// Extract the packet events from the query_tx RPC response. The response includes the full set of events +// from the Tx-es where there is at least one request query match. +// For example, the query request asks for the Tx for packet with sequence 3, and both 3 and 4 were +// committed in one Tx. In this case the response includes the events for 3 and 4. +fn packet_from_tx_search_response( + request: &QueryPacketEventDataRequest, + seq: &u64, + response: &tendermint_rpc::endpoint::tx_search::Response, +) -> Result, Error> { + for r in response.txs.iter() { + let height = r.height; + if height.value() > request.height.version_height { + continue; + } + let mut envelope = PacketEnvelope { + height: r.height, + packet_src_port: Default::default(), + packet_src_channel: Default::default(), + packet_dst_port: Default::default(), + packet_dst_channel: Default::default(), + packet_sequence: 0, + packet_timeout_height: Default::default(), + packet_timeout_stamp: 0, // todo - decoding + }; + for e in r.clone().tx_result.events.iter() { + if e.type_str != request.event_id.as_str() { + continue; + } + for a in e.attributes.iter() { + let key = String::from_utf8(base64::decode(a.key.to_string().as_bytes()).unwrap()) + .unwrap(); + let value = + String::from_utf8(base64::decode(a.value.to_string().as_bytes()).unwrap()) + .unwrap(); + match key.as_str() { + "packet_src_port" => envelope.packet_src_port = value.parse().unwrap(), + "packet_src_channel" => envelope.packet_src_channel = value.parse().unwrap(), + "packet_dst_port" => envelope.packet_dst_port = value.parse().unwrap(), + "packet_dst_channel" => envelope.packet_dst_channel = value.parse().unwrap(), + "packet_sequence" => envelope.packet_sequence = value.parse::().unwrap(), + "packet_timeout_height" => { + let to: Vec<&str> = value.split('-').collect(); + envelope.packet_timeout_height = ibc_proto::ibc::core::client::v1::Height { + version_number: to[0].parse::().unwrap(), + version_height: to[1].parse::().unwrap(), + } + .try_into() + .unwrap(); + } + _ => {} + }; + } + + if envelope.packet_src_port.as_str() != request.port_id.as_str() + || envelope.packet_src_channel.as_str() != request.channel_id.as_str() + || envelope.packet_sequence != *seq + { + continue; + } + match request.event_id { + IBCEventType::SendPacket => { + let mut data = vec![]; + for a in e.attributes.iter() { + let key = String::from_utf8( + base64::decode(a.key.to_string().as_bytes()).unwrap(), + ) + .unwrap(); + let value = String::from_utf8( + base64::decode(a.value.to_string().as_bytes()).unwrap(), + ) + .unwrap(); + match key.as_str() { + "packet_data" => data = Vec::from(value.as_bytes()), + _ => continue, + }; + } + return Ok(Some(IBCEvent::SendPacketChannel(SendPacket { + envelope, + data, + }))); + } + _ => continue, + } + } + } + Ok(None) } /// Perform a generic `abci_query`, and return the corresponding deserialized response data. diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 2665f6ad9d..d97b4fa59b 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -2,10 +2,15 @@ use std::sync::Arc; use crossbeam_channel as channel; +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; + use ibc::{ + events::IBCEvent, ics02_client::client_def::{AnyClientState, AnyConsensusState, AnyHeader}, ics03_connection::connection::ConnectionEnd, - ics04_channel::channel::ChannelEnd, + ics04_channel::channel::{ChannelEnd, QueryPacketEventDataRequest}, ics24_host::identifier::{ChannelId, ConnectionId, PortId}, proofs::Proofs, }; @@ -18,15 +23,14 @@ use ibc::{ // FIXME: the handle should not depend on tendermint-specific types use tendermint::account::Id as AccountId; -use crate::connection::ConnectionMsgType; -use crate::{error::Error, event::monitor::EventBatch}; -// use crate::foreign_client::ForeignClient; +use super::QueryResponse; +use crate::connection::ConnectionMsgType; use crate::keyring::store::KeyEntry; - -use super::QueryResponse; +use crate::{error::Error, event::monitor::EventBatch}; mod prod; + pub use prod::ProdChainHandle; pub type Subscription = channel::Receiver>; @@ -56,9 +60,9 @@ pub enum HandleInput { reply_to: ReplyTo, }, - SendTx { + SendMsgs { proto_msgs: Vec, - reply_to: ReplyTo, + reply_to: ReplyTo>, }, // GetHeader { @@ -172,6 +176,29 @@ pub enum HandleInput { height: Height, reply_to: ReplyTo, }, + + ProvenPacketCommitment { + port_id: PortId, + channel_id: ChannelId, + sequence: u64, + height: Height, + reply_to: ReplyTo<(Vec, MerkleProof)>, + }, + + QueryPacketCommitments { + request: QueryPacketCommitmentsRequest, + reply_to: ReplyTo<(Vec, Height)>, + }, + + QueryUnreceivedPackets { + request: QueryUnreceivedPacketsRequest, + reply_to: ReplyTo>, + }, + + QueryPacketEventData { + request: QueryPacketEventDataRequest, + reply_to: ReplyTo>, + }, } pub trait ChainHandle: Clone + Send + Sync { @@ -182,7 +209,7 @@ pub trait ChainHandle: Clone + Send + Sync { fn subscribe(&self, chain_id: ChainId) -> Result; /// Send a transaction with `msgs` to chain. - fn send_tx(&self, proto_msgs: Vec) -> Result; + fn send_msgs(&self, proto_msgs: Vec) -> Result, Error>; fn get_minimal_set(&self, from: Height, to: Height) -> Result, Error>; @@ -262,4 +289,24 @@ pub trait ChainHandle: Clone + Send + Sync { channel_id: &ChannelId, height: Height, ) -> Result; + + fn proven_packet_commitment( + &self, + port_id: &PortId, + channel_id: &ChannelId, + sequence: u64, + height: Height, + ) -> Result<(Vec, MerkleProof), Error>; + + fn query_packet_commitments( + &self, + request: QueryPacketCommitmentsRequest, + ) -> Result<(Vec, Height), Error>; + + fn query_unreceived_packets( + &self, + request: QueryUnreceivedPacketsRequest, + ) -> Result, Error>; + + fn query_txs(&self, request: QueryPacketEventDataRequest) -> Result, Error>; } diff --git a/relayer/src/chain/handle/prod.rs b/relayer/src/chain/handle/prod.rs index a6ae4ec17f..c069d2b33f 100644 --- a/relayer/src/chain/handle/prod.rs +++ b/relayer/src/chain/handle/prod.rs @@ -2,10 +2,15 @@ use std::fmt::Debug; use crossbeam_channel as channel; +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; + use ibc::{ + events::IBCEvent, ics02_client::client_def::{AnyClientState, AnyConsensusState, AnyHeader}, ics03_connection::connection::ConnectionEnd, - ics04_channel::channel::ChannelEnd, + ics04_channel::channel::{ChannelEnd, QueryPacketEventDataRequest}, ics23_commitment::commitment::CommitmentPrefix, ics23_commitment::merkle::MerkleProof, ics24_host::identifier::ChainId, @@ -18,6 +23,8 @@ use ibc::{ // FIXME: the handle should not depend on tendermint-specific types use tendermint::account::Id as AccountId; +use super::{reply_channel, ChainHandle, HandleInput, ReplyTo, Subscription}; + use crate::{ chain::QueryResponse, connection::ConnectionMsgType, @@ -25,8 +32,6 @@ use crate::{ keyring::store::KeyEntry, }; -use super::{reply_channel, ChainHandle, HandleInput, ReplyTo, Subscription}; - #[derive(Debug, Clone)] pub struct ProdChainHandle { chain_id: ChainId, @@ -77,8 +82,8 @@ impl ChainHandle for ProdChainHandle { }) } - fn send_tx(&self, proto_msgs: Vec) -> Result { - self.send(|reply_to| HandleInput::SendTx { + fn send_msgs(&self, proto_msgs: Vec) -> Result, Error> { + self.send(|reply_to| HandleInput::SendMsgs { proto_msgs, reply_to, }) @@ -264,4 +269,38 @@ impl ChainHandle for ProdChainHandle { reply_to, }) } + + fn proven_packet_commitment( + &self, + port_id: &PortId, + channel_id: &ChannelId, + sequence: u64, + height: Height, + ) -> Result<(Vec, MerkleProof), Error> { + self.send(|reply_to| HandleInput::ProvenPacketCommitment { + port_id: port_id.clone(), + channel_id: channel_id.clone(), + sequence, + height, + reply_to, + }) + } + + fn query_packet_commitments( + &self, + request: QueryPacketCommitmentsRequest, + ) -> Result<(Vec, Height), Error> { + self.send(|reply_to| HandleInput::QueryPacketCommitments { request, reply_to }) + } + + fn query_unreceived_packets( + &self, + request: QueryUnreceivedPacketsRequest, + ) -> Result, Error> { + self.send(|reply_to| HandleInput::QueryUnreceivedPackets { request, reply_to }) + } + + fn query_txs(&self, request: QueryPacketEventDataRequest) -> Result, Error> { + self.send(|reply_to| HandleInput::QueryPacketEventData { request, reply_to }) + } } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 39ef49f42e..3f4ee30bdc 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -1,15 +1,23 @@ use std::ops::Add; use std::sync::{Arc, Mutex}; +use std::thread; use std::time::Duration; use crossbeam_channel as channel; use prost_types::Any; +use tokio::runtime::Runtime; + use tendermint::account::Id; use tendermint_testgen::light_block::TMLightBlock; -use tokio::runtime::Runtime; + +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; use ibc::downcast; +use ibc::events::IBCEvent; use ibc::ics02_client::client_def::AnyClientState; +use ibc::ics04_channel::channel::QueryPacketEventDataRequest; use ibc::ics07_tendermint::client_state::ClientState as TendermintClientState; use ibc::ics07_tendermint::consensus_state::ConsensusState as TendermintConsensusState; use ibc::ics07_tendermint::header::Header as TendermintHeader; @@ -29,7 +37,6 @@ use crate::error::{Error, Kind}; use crate::event::monitor::EventBatch; use crate::keyring::store::{KeyEntry, KeyRing}; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; -use std::thread; /// The representation of a mocked chain as the relayer sees it. /// The relayer runtime and the light client will engage with the MockChain to query/send tx; the @@ -93,12 +100,13 @@ impl Chain for MockChain { unimplemented!() } - fn send_tx(&mut self, proto_msgs: Vec) -> Result { + fn send_msgs(&mut self, proto_msgs: Vec) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. - self.context + Ok(vec![self + .context .send(proto_msgs) .map(|_| "OK".to_string()) // TODO: establish success return codes. - .map_err(|e| Kind::Rpc.context(e).into()) + .map_err(|e| Kind::Rpc.context(e))?]) } fn get_signer(&mut self) -> Result { @@ -190,6 +198,24 @@ impl Chain for MockChain { ) -> Result<(Self::ConsensusState, MerkleProof), Error> { unimplemented!() } + + fn query_packet_commitments( + &self, + _request: QueryPacketCommitmentsRequest, + ) -> Result<(Vec, Height), Error> { + unimplemented!() + } + + fn query_unreceived_packets( + &self, + _request: QueryUnreceivedPacketsRequest, + ) -> Result, Error> { + unimplemented!() + } + + fn query_txs(&self, _request: QueryPacketEventDataRequest) -> Result, Error> { + unimplemented!() + } } // For integration tests with the modules @@ -212,7 +238,9 @@ pub mod test_utils { key_name: "".to_string(), store_prefix: "".to_string(), client_ids: vec![], - gas: 0, + gas: None, + max_msg_num: None, + max_tx_size: None, clock_drift: Duration::from_secs(5), trusting_period: Duration::from_secs(14 * 24 * 60 * 60), // 14 days trust_threshold: Default::default(), diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index b426d499ac..56bd388dc7 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -7,14 +7,19 @@ use crossbeam_channel as channel; use tokio::runtime::Runtime as TokioRuntime; +use ibc_proto::ibc::core::channel::v1::{ + PacketAckCommitment, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; + use ibc::{ + events::IBCEvent, ics02_client::{ client_def::{AnyClientState, AnyConsensusState, AnyHeader}, header::Header, state::{ClientState, ConsensusState}, }, ics03_connection::connection::ConnectionEnd, - ics04_channel::channel::ChannelEnd, + ics04_channel::channel::{ChannelEnd, QueryPacketEventDataRequest}, ics23_commitment::{commitment::CommitmentPrefix, merkle::MerkleProof}, ics24_host::identifier::ChannelId, ics24_host::identifier::PortId, @@ -27,6 +32,11 @@ use ibc::{ // FIXME: the handle should not depend on tendermint-specific types use tendermint::account::Id as AccountId; +use super::{ + handle::{ChainHandle, HandleInput, ProdChainHandle, ReplyTo, Subscription}, + Chain, QueryResponse, +}; + use crate::{ config::ChainConfig, connection::ConnectionMsgType, @@ -36,11 +46,6 @@ use crate::{ light_client::LightClient, }; -use super::{ - handle::{ChainHandle, HandleInput, ProdChainHandle, ReplyTo, Subscription}, - Chain, QueryResponse, -}; - pub struct Threads { pub light_client: Option>, pub chain_runtime: thread::JoinHandle<()>, @@ -159,8 +164,8 @@ impl ChainRuntime { self.query(path, height, prove, reply_to)? }, - Ok(HandleInput::SendTx { proto_msgs, reply_to }) => { - self.send_tx(proto_msgs, reply_to)? + Ok(HandleInput::SendMsgs { proto_msgs, reply_to }) => { + self.send_msgs(proto_msgs, reply_to)? }, Ok(HandleInput::GetMinimalSet { from, to, reply_to }) => { @@ -242,6 +247,22 @@ impl ChainRuntime { self.proven_client_consensus(client_id, consensus_height, height, reply_to)? }, + Ok(HandleInput::ProvenPacketCommitment { port_id, channel_id, sequence, height, reply_to }) => { + self.proven_packet_commitment(port_id, channel_id, sequence, height, reply_to)? + }, + + Ok(HandleInput::QueryPacketCommitments { request, reply_to }) => { + self.query_packet_commitments(request, reply_to)? + }, + + Ok(HandleInput::QueryUnreceivedPackets { request, reply_to }) => { + self.query_unreceived_packets(request, reply_to)? + }, + + Ok(HandleInput::QueryPacketEventData { request, reply_to }) => { + self.query_txs(request, reply_to)? + }, + Err(_e) => todo!(), // TODO: Handle error? } }, @@ -290,12 +311,12 @@ impl ChainRuntime { Ok(()) } - fn send_tx( + fn send_msgs( &mut self, proto_msgs: Vec, - reply_to: ReplyTo, + reply_to: ReplyTo>, ) -> Result<(), Error> { - let result = self.chain.send_tx(proto_msgs); + let result = self.chain.send_msgs(proto_msgs); reply_to .send(result) @@ -603,4 +624,65 @@ impl ChainRuntime { Ok(()) } + + fn proven_packet_commitment( + &self, + port_id: PortId, + channel_id: ChannelId, + sequence: u64, + height: Height, + reply_to: ReplyTo<(Vec, MerkleProof)>, + ) -> Result<(), Error> { + let result = self + .chain + .proven_packet_commitment(&port_id, &channel_id, sequence, height); + + reply_to + .send(result) + .map_err(|e| Kind::Channel.context(e))?; + + Ok(()) + } + + fn query_packet_commitments( + &self, + request: QueryPacketCommitmentsRequest, + reply_to: ReplyTo<(Vec, Height)>, + ) -> Result<(), Error> { + let result = self.chain.query_packet_commitments(request); + + reply_to + .send(result) + .map_err(|e| Kind::Channel.context(e))?; + + Ok(()) + } + + fn query_unreceived_packets( + &self, + request: QueryUnreceivedPacketsRequest, + reply_to: ReplyTo>, + ) -> Result<(), Error> { + let result = self.chain.query_unreceived_packets(request); + + reply_to + .send(result) + .map_err(|e| Kind::Channel.context(e))?; + + Ok(()) + } + + fn query_txs( + &self, + request: QueryPacketEventDataRequest, + reply_to: ReplyTo>, + ) -> Result<(), Error> { + let result = self.chain.query_txs(request); + + reply_to + .send(result) + .map_err(|e| Kind::Channel.context(e))?; + + Ok(()) + } } diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 913471d1f5..5dfab960dc 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -3,6 +3,7 @@ use std::time::SystemTime; use prost_types::Any; use thiserror::Error; +use tracing::info; use ibc_proto::ibc::core::channel::v1::MsgChannelOpenAck as RawMsgChannelOpenAck; use ibc_proto::ibc::core::channel::v1::MsgChannelOpenConfirm as RawMsgChannelOpenConfirm; @@ -79,7 +80,6 @@ impl ChannelConfigSide { } #[derive(Clone, Debug)] - pub struct ChannelConfig { pub ordering: Order, pub a_config: ChannelConfigSide, @@ -114,7 +114,7 @@ impl ChannelConfig { #[derive(Clone, Debug)] pub struct Channel { - config: ChannelConfig, + pub config: ChannelConfig, } impl ChannelConfig { @@ -219,24 +219,24 @@ impl Channel { (None, None) => { // Init to src match build_chan_init_and_send(a_chain.clone(), b_chain.clone(), &flipped) { - Err(e) => println!("{:?} Failed ChanInit {:?}", e, config.a_end()), - Ok(_) => println!("{} ChanInit {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ChanInit {:?}", e, config.a_end()), + Ok(_) => info!("{} ChanInit {:?}", done, config.a_end()), } } (Some(a_channel), None) => { // Try to dest assert!(a_channel.state_matches(&State::Init)); match build_chan_try_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ChanTry {:?}", e, config.b_end()), - Ok(_) => println!("{} ChanTry {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ChanTry {:?}", e, config.b_end()), + Ok(_) => info!("{} ChanTry {:?}", done, config.b_end()), } } (None, Some(b_channel)) => { // Try to src assert!(b_channel.state_matches(&State::Init)); match build_chan_try_and_send(a_chain.clone(), b_chain.clone(), &flipped) { - Err(e) => println!("{:?} Failed ChanTry {:?}", e, config.a_end()), - Ok(_) => println!("{} ChanTry {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ChanTry {:?}", e, config.a_end()), + Ok(_) => info!("{} ChanTry {:?}", done, config.a_end()), } } (Some(a_channel), Some(b_channel)) => { @@ -246,16 +246,16 @@ impl Channel { // Try to dest match build_chan_try_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ChanTry {:?}", e, config.b_end()), - Ok(_) => println!("{} ChanTry {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ChanTry {:?}", e, config.b_end()), + Ok(_) => info!("{} ChanTry {:?}", done, config.b_end()), } } (&State::TryOpen, &State::Init) => { // Ack to dest match build_chan_ack_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ChanAck {:?}", e, config.b_end()), - Ok(_) => println!("{} ChanAck {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ChanAck {:?}", e, config.b_end()), + Ok(_) => info!("{} ChanAck {:?}", done, config.b_end()), } } (&State::Init, &State::TryOpen) | (&State::TryOpen, &State::TryOpen) => { @@ -265,8 +265,8 @@ impl Channel { b_chain.clone(), &flipped, ) { - Err(e) => println!("{:?} Failed ChanAck {:?}", e, config.a_end()), - Ok(_) => println!("{} ChanAck {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ChanAck {:?}", e, config.a_end()), + Ok(_) => info!("{} ChanAck {:?}", done, config.a_end()), } } (&State::Open, &State::TryOpen) => { @@ -277,9 +277,9 @@ impl Channel { &config, ) { Err(e) => { - println!("{:?} Failed ChanConfirm {:?}", e, config.b_end()) + info!("{:?} Failed ChanConfirm {:?}", e, config.b_end()) } - Ok(_) => println!("{} ChanConfirm {:?}", done, config.b_end()), + Ok(_) => info!("{} ChanConfirm {:?}", done, config.b_end()), } } (&State::TryOpen, &State::Open) => { @@ -289,12 +289,12 @@ impl Channel { b_chain.clone(), &flipped, ) { - Err(e) => println!("{:?} ChanConfirm {:?}", e, flipped), - Ok(_) => println!("{} ChanConfirm {:?}", done, flipped), + Err(e) => info!("{:?} ChanConfirm {:?}", e, flipped), + Ok(_) => info!("{} ChanConfirm {:?}", done, flipped), } } (&State::Open, &State::Open) => { - println!( + info!( "{} {} {} Channel handshake finished for {:#?}", done, done, done, config ); @@ -304,7 +304,7 @@ impl Channel { } } } - println!("elapsed time {:?}\n", now.elapsed().unwrap().as_secs()); + info!("elapsed time {:?}\n", now.elapsed().unwrap().as_secs()); } Err(ChannelError::Failed(format!( @@ -375,9 +375,9 @@ pub fn build_chan_init_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ChannelConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_chan_init(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } fn check_destination_channel_state( @@ -564,9 +564,9 @@ pub fn build_chan_try_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ChannelConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_chan_try(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } pub fn build_chan_ack( @@ -647,9 +647,9 @@ pub fn build_chan_ack_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ChannelConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_chan_ack(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } pub fn build_chan_confirm( @@ -728,7 +728,7 @@ pub fn build_chan_confirm_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ChannelConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_chan_confirm(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } diff --git a/relayer/src/config.rs b/relayer/src/config.rs index c7e9fef16d..a3484e14da 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -90,8 +90,9 @@ pub struct ChainConfig { pub key_name: String, pub store_prefix: String, pub client_ids: Vec, - #[serde(default = "default::gas")] - pub gas: u64, + pub gas: Option, + pub max_msg_num: Option, + pub max_tx_size: Option, #[serde(default = "default::clock_drift", with = "humantime_serde")] pub clock_drift: Duration, #[serde(default = "default::trusting_period", with = "humantime_serde")] diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 68542a039c..1e5ece5e2f 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -2,6 +2,7 @@ use prost_types::Any; use std::str::FromStr; use std::time::SystemTime; use thiserror::Error; +use tracing::info; use ibc_proto::ibc::core::connection::v1::MsgConnectionOpenAck as RawMsgConnectionOpenAck; use ibc_proto::ibc::core::connection::v1::MsgConnectionOpenConfirm as RawMsgConnectionOpenConfirm; @@ -190,22 +191,22 @@ impl Connection { (None, None) => { // Init to src match build_conn_init_and_send(a_chain.clone(), b_chain.clone(), &flipped) { - Err(e) => println!("{:?} Failed ConnInit {:?}", e, config.a_end()), - Ok(_) => println!("{} ConnInit {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ConnInit {:?}", e, config.a_end()), + Ok(_) => info!("{} ConnInit {:?}", done, config.a_end()), } } (Some(a_connection), None) => { assert!(a_connection.state_matches(&State::Init)); match build_conn_try_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ConnTry {:?}", e, config.b_end()), - Ok(_) => println!("{} ConnTry {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ConnTry {:?}", e, config.b_end()), + Ok(_) => info!("{} ConnTry {:?}", done, config.b_end()), } } (None, Some(b_connection)) => { assert!(b_connection.state_matches(&State::Init)); match build_conn_try_and_send(a_chain.clone(), b_chain.clone(), &flipped) { - Err(e) => println!("{:?} Failed ConnTry {:?}", e, config.a_end()), - Ok(_) => println!("{} ConnTry {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ConnTry {:?}", e, config.a_end()), + Ok(_) => info!("{} ConnTry {:?}", done, config.a_end()), } } (Some(a_connection), Some(b_connection)) => { @@ -214,16 +215,16 @@ impl Connection { // Try to dest match build_conn_try_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ConnTry {:?}", e, config.b_end()), - Ok(_) => println!("{} ConnTry {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ConnTry {:?}", e, config.b_end()), + Ok(_) => info!("{} ConnTry {:?}", done, config.b_end()), } } (&State::TryOpen, &State::Init) => { // Ack to dest match build_conn_ack_and_send(b_chain.clone(), a_chain.clone(), &config) { - Err(e) => println!("{:?} Failed ConnAck {:?}", e, config.b_end()), - Ok(_) => println!("{} ConnAck {:?}", done, config.b_end()), + Err(e) => info!("{:?} Failed ConnAck {:?}", e, config.b_end()), + Ok(_) => info!("{} ConnAck {:?}", done, config.b_end()), } } (&State::Init, &State::TryOpen) | (&State::TryOpen, &State::TryOpen) => { @@ -233,8 +234,8 @@ impl Connection { b_chain.clone(), &flipped, ) { - Err(e) => println!("{:?} Failed ConnAck {:?}", e, config.a_end()), - Ok(_) => println!("{} ConnAck {:?}", done, config.a_end()), + Err(e) => info!("{:?} Failed ConnAck {:?}", e, config.a_end()), + Ok(_) => info!("{} ConnAck {:?}", done, config.a_end()), } } (&State::Open, &State::TryOpen) => { @@ -245,9 +246,9 @@ impl Connection { &config, ) { Err(e) => { - println!("{:?} Failed ConnConfirm {:?}", e, config.b_end()) + info!("{:?} Failed ConnConfirm {:?}", e, config.b_end()) } - Ok(_) => println!("{} ConnConfirm {:?}", done, config.b_end()), + Ok(_) => info!("{} ConnConfirm {:?}", done, config.b_end()), } } (&State::TryOpen, &State::Open) => { @@ -257,12 +258,12 @@ impl Connection { b_chain.clone(), &flipped, ) { - Err(e) => println!("{:?} ConnConfirm {:?}", e, config.a_end()), - Ok(_) => println!("{} ConnConfirm {:?}", done, config.a_end()), + Err(e) => info!("{:?} ConnConfirm {:?}", e, config.a_end()), + Ok(_) => info!("{} ConnConfirm {:?}", done, config.a_end()), } } (&State::Open, &State::Open) => { - println!( + info!( "{} {} {} Connection handshake finished for [{:#?}]", done, done, done, config ); @@ -272,7 +273,7 @@ impl Connection { } } } - println!("elapsed time {:?}\n", now.elapsed().unwrap().as_secs()); + info!("elapsed time {:?}\n", now.elapsed().unwrap().as_secs()); } Err(ConnectionError::Failed(format!( @@ -336,9 +337,9 @@ pub fn build_conn_init_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ConnectionConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_conn_init(dst_chain.clone(), src_chain, opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } fn check_destination_connection_state( @@ -473,7 +474,7 @@ pub fn build_conn_try( &opts.src().client_id(), src_client_target_height, )?; - src_chain.send_tx(client_msgs)?; + src_chain.send_msgs(client_msgs)?; // Build message(s) for updating client on destination let ics_target_height = src_chain.query_latest_height()?; @@ -525,9 +526,9 @@ pub fn build_conn_try_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ConnectionConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_conn_try(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } /// Attempts to build a MsgConnOpenAck. @@ -573,7 +574,7 @@ pub fn build_conn_ack( &opts.src().client_id(), src_client_target_height, )?; - src_chain.send_tx(client_msgs)?; + src_chain.send_msgs(client_msgs)?; // Build message(s) for updating client on destination let ics_target_height = src_chain.query_latest_height()?; @@ -617,9 +618,9 @@ pub fn build_conn_ack_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ConnectionConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_conn_ack(dst_chain.clone(), src_chain, opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } /// Attempts to build a MsgConnOpenConfirm. @@ -694,7 +695,7 @@ pub fn build_conn_confirm_and_send( dst_chain: impl ChainHandle, src_chain: impl ChainHandle, opts: &ConnectionConfig, -) -> Result { +) -> Result, Error> { let dst_msgs = build_conn_confirm(dst_chain.clone(), src_chain, &opts)?; - Ok(dst_chain.send_tx(dst_msgs)?) + Ok(dst_chain.send_msgs(dst_msgs)?) } diff --git a/relayer/src/error.rs b/relayer/src/error.rs index ef56b01ed0..8384d201e4 100644 --- a/relayer/src/error.rs +++ b/relayer/src/error.rs @@ -102,6 +102,10 @@ pub enum Kind { #[error("Failed to build channel open confirm {0}: {1}")] ChanOpenConfirm(ChannelId, String), + /// Packet recv failure + #[error("Failed to build packet recv {0}: {1}")] + PacketRecv(ChannelId, String), + /// A message transaction failure #[error("Message transaction failure: {0}")] MessageTransaction(String), diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 561b9d9f4e..0a132004ce 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -202,7 +202,6 @@ impl EventMonitor { height, events, }; - self.tx_batch.send(batch)?; } } diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index f90582c5d8..f44f7989f5 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -1,5 +1,6 @@ use prost_types::Any; use thiserror::Error; +use tracing::info; use ibc::ics02_client::header::Header; use ibc::ics02_client::msgs::create_client::MsgCreateAnyClient; @@ -75,7 +76,7 @@ impl ForeignClient { ForeignClientError::ClientCreate(format!("Create client failed ({:?})", e)) })?; } - println!( + info!( "{} Client on {:?} is created {:?}\n", done, config.chain, config.id ); @@ -168,10 +169,10 @@ pub fn build_create_client_and_send( dst_chain: &impl ChainHandle, src_chain: &impl ChainHandle, opts: &ForeignClientConfig, -) -> Result { +) -> Result, Error> { let new_msg = build_create_client(dst_chain, src_chain, opts.client_id())?; - Ok(dst_chain.send_tx(vec![new_msg.to_any::()])?) + Ok(dst_chain.send_msgs(vec![new_msg.to_any::()])?) } pub fn build_update_client( @@ -203,7 +204,7 @@ pub fn build_update_client_and_send( dst_chain: &impl ChainHandle, src_chain: &impl ChainHandle, opts: &ForeignClientConfig, -) -> Result { +) -> Result, Error> { let new_msgs = build_update_client( dst_chain, src_chain, @@ -211,7 +212,7 @@ pub fn build_update_client_and_send( src_chain.query_latest_height()?, )?; - Ok(dst_chain.send_tx(new_msgs)?) + Ok(dst_chain.send_msgs(new_msgs)?) } /// Tests the integration of crates `relayer` plus `relayer-cli` against crate `ibc`. These tests diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index 1a85ec987f..e586484a89 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -23,7 +23,6 @@ pub mod foreign_client; pub mod keyring; pub mod keys; pub mod light_client; -// pub mod link; -pub mod msgs; +pub mod link; pub mod relay; pub mod util; diff --git a/relayer/src/link.rs b/relayer/src/link.rs index ea6f1a42ee..13f20e856a 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -1,34 +1,36 @@ -use std::ops::Range; - -use itertools::Itertools; -use retry::{delay::Fixed, retry}; +use prost_types::Any; use thiserror::Error; +use tracing::{error, info}; + +use ibc_proto::ibc::core::channel::v1::{ + MsgRecvPacket as RawMsgRecvPacket, QueryPacketCommitmentsRequest, QueryUnreceivedPacketsRequest, +}; use ibc::{ + downcast, + events::{IBCEvent, IBCEventType}, + ics04_channel::channel::{QueryPacketEventDataRequest, State}, + ics04_channel::events::SendPacket, + ics04_channel::msgs::recv_packet::MsgRecvPacket, + ics04_channel::packet::Packet, + ics23_commitment::commitment::CommitmentProof, ics24_host::identifier::{ChainId, ChannelId, ClientId, PortId}, + tx_msg::Msg, Height, }; -use tendermint::Signature; -use crate::chain::handle::ChainHandle; -use crate::chain::{Chain, CosmosSDKChain}; +use crate::chain::handle::{ChainHandle, Subscription}; use crate::channel::{Channel, ChannelError}; +use crate::config::ChainConfig; use crate::connection::ConnectionError; -use crate::foreign_client::{ForeignClient, ForeignClientError}; -use crate::msgs::{ClientUpdate, Datagram, Packet, Transaction}; -use crate::util::iter::SplitResults; - -// TODO: move to config -const MAX_RETRIES: usize = 10_usize; +use crate::error::{Error, Kind}; +use crate::foreign_client::build_update_client; #[derive(Debug, Error)] pub enum LinkError { #[error("Failed")] Failed, - #[error("Foreign client error")] - ForeignClientError(#[from] ForeignClientError), - #[error("ConnectionError:")] ConnectionError(#[from] ConnectionError), @@ -36,135 +38,361 @@ pub enum LinkError { ChannelError(#[from] ChannelError), #[error("ChainError:")] - ChainError(#[from] crate::error::Error), + ChainError(#[from] Error), #[error("exhausted max number of retries:")] RetryError, } -pub enum Order { - Ordered, - Unordered, +pub struct Link { + pub channel: Channel, } -// XXX: There is redundency in the configuration here that can be eliminated by simply passing in -// the dependent objects (ForeignClient, Channel, etc) -// Noting that Links are products of channels -pub struct ConfigSide { - chain_id: ChainId, - client_id: ClientId, - channel_id: ChannelId, - port_id: PortId, +fn send_update_client_and_msgs( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + msgs: &mut Vec, + height: &Height, + client_id: &ClientId, +) -> Result<(), Error> { + if !msgs.is_empty() { + let update_height = height.increment(); + let mut msgs_to_send = build_update_client(dst_chain, src_chain, client_id, update_height)?; + msgs_to_send.append(msgs); + info!("sending {:#?} messages", msgs_to_send.len()); + let res = dst_chain.send_msgs(msgs_to_send)?; + info!("result {:#?}", res); + } + Ok(()) } -pub struct LinkConfig { - src_config: ConfigSide, - dst_config: ConfigSide, - order: Order, -} +impl Link { + pub fn new(channel: Channel) -> Link { + Link { channel } + } -impl LinkConfig { - pub fn new(src_config: ConfigSide, dst_config: ConfigSide, order: Order) -> LinkConfig { - Self { - src_config, - dst_config, - order, + pub fn client_of_chain(&self, chain_id: ChainId) -> Option<&ClientId> { + if chain_id == self.channel.config.a_config.chain_id().clone() { + Some(&self.channel.config.a_config.client_id()) + } else if chain_id == self.channel.config.b_config.chain_id().clone() { + Some(&self.channel.config.b_config.client_id()) + } else { + None } } -} - -pub struct Link { - src_chain: Box, - dst_chain: Box, - foreign_client: ForeignClient, -} -impl Link { - pub fn new( - src_chain: impl ChainHandle + 'static, - dst_chain: impl ChainHandle + 'static, - foreign_client: ForeignClient, - _channel: Channel, // We probably need some config here - _config: LinkConfig, - ) -> Result { - // XXX: Validate the inputs - - Ok(Link { - foreign_client, - src_chain: Box::new(src_chain), - dst_chain: Box::new(dst_chain), - }) + // Determines if the event received from the chain `from_chain` is relevant and + // should be processed. + // Only events for a port/channel matching one of the channel ends should be processed. + fn must_process_event(&self, from_chain: &impl ChainHandle, event: &IBCEvent) -> bool { + match event { + IBCEvent::SendPacketChannel(send_packet_ev) => { + self.channel.config.a_config.chain_id().clone() == from_chain.id() + && self.channel.config.a_config.channel_id().clone() + == send_packet_ev.envelope.packet_src_channel + && self.channel.config.a_config.port_id().clone() + == send_packet_ev.envelope.packet_src_port + || self.channel.config.b_config.chain_id().clone() == from_chain.id() + && self.channel.config.b_config.channel_id().clone() + == send_packet_ev.envelope.packet_src_channel + && self.channel.config.b_config.port_id().clone() + == send_packet_ev.envelope.packet_src_port + } + _ => false, + } } - pub fn run(self) -> Result<(), LinkError> { - // XXX: subscriptions are per channel - // Subscriptions have to buffer events as packets can be sent before channels are - // established - // Can subscriptions operate as queues? - let subscription = self.src_chain.subscribe(self.dst_chain.id())?; - let signature = todo!(); - - // XXX: What about Packet Acks for ordered channels - for (chain_id, target_height, events) in subscription.iter() { - let (datagrams, errors) = events - .into_iter() - .map(|event| self.src_chain.create_packet(event)) - .map_results(Datagram::Packet) - .split_results(); - - // TODO: Report the errors? - - let mut tries = 0..MAX_RETRIES; - - let result = retry(Fixed::from_millis(100), || { - if let Some(attempt) = tries.next() { - self.step(target_height, datagrams.clone(), signature) - } else { - Err(LinkError::RetryError) + fn relay_from_events( + &self, + src_chain: &impl ChainHandle, + dst_chain: &impl ChainHandle, + src_subscription: &Subscription, + ) -> Result<(), LinkError> { + let mut prev_height = Height::zero(); + let mut prev_msgs = vec![]; + + // Iterate through the IBC Events, build the message for each and collect all at same height. + // Send a multi message transaction with these, prepending the client update + for batch in src_subscription.try_iter().collect::>().iter() { + for event in batch.events.iter() { + if !self.must_process_event(src_chain, event) { + continue; } - }); + let packet_msg = handle_packet_event(dst_chain, src_chain, event)?; - match result { - Ok(_) => { - println!("Submission successful"); - Ok(()) + // TODO add ICS height to IBC event + let event_height = Height { + version_height: u64::from(event.height()), + version_number: ChainId::chain_version(src_chain.id().to_string().as_str()), + }; + if prev_height == Height::zero() { + prev_height = event_height; + } + if event_height > prev_height { + send_update_client_and_msgs( + dst_chain, + src_chain, + &mut prev_msgs, + &prev_height, + self.client_of_chain(dst_chain.id()).unwrap(), + )?; + prev_height = event_height; } - Err(problem) => { - println!("Submission failed attempt with {:?}", problem); - Err(LinkError::Failed) + if let Some(msg) = packet_msg { + prev_msgs.append(&mut vec![msg]); } - }?; + } } - Ok(()) + Ok(send_update_client_and_msgs( + dst_chain, + src_chain, + &mut prev_msgs, + &prev_height, + self.client_of_chain(dst_chain.id()).unwrap(), + )?) } - fn step( + pub fn run( &self, - target_height: Height, - mut datagrams: Vec, - signature: Signature, + a_chain: impl ChainHandle, + b_chain: impl ChainHandle, ) -> Result<(), LinkError> { - let height = self.dst_chain.query_latest_height(&self.foreign_client)?; - // XXX: Check that height > target_height, no client update needed - let signed_headers = self.src_chain.get_minimal_set(height, target_height)?; + info!("relaying packets for link {:#?}", self.channel.config); + + let a_subscription = &a_chain.subscribe(a_chain.id())?; + let b_subscription = &b_chain.subscribe(b_chain.id())?; + loop { + self.relay_from_events(&a_chain, &b_chain, a_subscription)?; + self.relay_from_events(&b_chain, &a_chain, b_subscription)?; + } + } +} + +fn handle_packet_event( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + event: &IBCEvent, +) -> Result, Error> { + match event { + IBCEvent::SendPacketChannel(send_packet_ev) => { + info!("received event {:#?}", send_packet_ev.envelope); + let msg = build_packet_recv_msg_from_send_event(dst_chain, src_chain, &send_packet_ev) + .unwrap(); + Ok(Some(msg.to_any::())) + } + _ => Ok(None), + } +} + +fn build_packet_recv_msg_from_send_event( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + event: &SendPacket, +) -> Result { + let packet = Packet { + sequence: event.envelope.clone().packet_sequence.into(), + source_port: event.envelope.clone().packet_src_port, + source_channel: event.envelope.clone().packet_src_channel, + destination_port: event.envelope.clone().packet_dst_port, + destination_channel: event.envelope.clone().packet_dst_channel, + timeout_height: event.envelope.clone().packet_timeout_height, + timeout_timestamp: event.envelope.clone().packet_timeout_stamp, + data: event.clone().data, + }; + + // TODO - change event types to return ICS height + let event_height = Height::new( + ChainId::chain_version(src_chain.id().to_string().as_str()), + u64::from(event.envelope.height), + ); + + // Get signer + let signer = dst_chain + .get_signer() + .map_err(|e| Kind::KeyBase.context(e))?; - let client_update = ClientUpdate::new(signed_headers); + let (_, proof) = src_chain + .proven_packet_commitment( + &event.envelope.packet_src_port, + &event.envelope.packet_src_channel, + event.envelope.packet_sequence, + event_height, + ) + .map_err(|e| Kind::MalformedProof.context(e))?; - datagrams.push(Datagram::ClientUpdate(client_update)); + let msg = MsgRecvPacket::new( + packet.clone(), + CommitmentProof::from(proof), + event_height.increment(), + signer, + ) + .map_err(|e| { + Kind::PacketRecv( + packet.source_channel, + "error while building the recv_packet".to_string(), + ) + .context(e) + })?; - // We are missing fields here like gas and account - let transaction = Transaction::new(datagrams); - let signed_transaction = transaction.sign(signature); - let encoded_transaction = signed_transaction.encode(); + Ok(msg) +} + +fn build_packet_recv_msgs( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + src_channel_id: &ChannelId, + src_port: &PortId, + src_height: Height, + sequences: &[u64], +) -> Result, Error> { + if sequences.is_empty() { + return Ok(vec![]); + } + // Set the height of the queries at height - 1 + let query_height = src_height + .decrement() + .map_err(|e| Kind::InvalidHeight.context(e))?; + + let mut events = src_chain.query_txs(QueryPacketEventDataRequest { + event_id: IBCEventType::SendPacket, + port_id: src_port.to_string(), + channel_id: src_channel_id.to_string(), + sequences: Vec::from(sequences), + height: query_height, + })?; + + let mut packet_sequences = vec![]; + for event in events.iter() { + let send_event = downcast!(event => IBCEvent::SendPacketChannel) + .ok_or_else(|| Kind::Query.context("unexpected query tx response"))?; + + packet_sequences.append(&mut vec![send_event.envelope.packet_sequence]); + } + info!("received from query_txs {:?}", packet_sequences); + + let mut msgs = vec![]; + for event in events.iter_mut() { + event.set_height(query_height); + if let Some(new_msg) = handle_packet_event(dst_chain, src_chain, event)? { + msgs.append(&mut vec![new_msg]); + } + } + Ok(msgs) +} + +#[derive(Clone, Debug)] +pub struct PacketOptions { + pub dst_chain_config: ChainConfig, + pub src_chain_config: ChainConfig, + pub dst_client_id: ClientId, + pub src_port_id: PortId, + pub src_channel_id: ChannelId, +} + +fn target_height_and_sequences_of_recv_packets( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + opts: &PacketOptions, +) -> Result<(Vec, Height), Error> { + let src_channel = src_chain + .query_channel(&opts.src_port_id, &opts.src_channel_id, Height::default()) + .map_err(|e| { + Kind::PacketRecv( + opts.src_channel_id.clone(), + "source channel does not exist on source".into(), + ) + .context(e) + })?; - // Submission failure cases - // - The full node can fail - // + TODO: The link will fail, and signale recreation with a different full node - // - The transaction can be rejected because the client is not up to date - // + Retry this loop - self.dst_chain.submit(encoded_transaction)?; + let dst_channel_id = src_channel.counterparty().channel_id.ok_or_else(|| { + Kind::PacketRecv( + opts.src_channel_id.clone(), + "missing counterparty channel id".into(), + ) + })?; - Ok(()) + let dst_channel = dst_chain + .query_channel( + &src_channel.counterparty().port_id, + &dst_channel_id, + Height::default(), + ) + .map_err(|e| { + Kind::PacketRecv( + dst_channel_id.clone(), + "channel does not exist on destination chain".into(), + ) + .context(e) + })?; + + if dst_channel.state().clone() != State::Open { + return Err(Kind::PacketRecv( + dst_channel_id, + "channel on destination not in open state".into(), + ) + .into()); } + + let pc_request = QueryPacketCommitmentsRequest { + port_id: src_channel.counterparty().port_id.to_string(), + channel_id: opts.src_channel_id.to_string(), + pagination: None, + }; + + let (packet_commitments, query_height) = src_chain.query_packet_commitments(pc_request)?; + + if packet_commitments.is_empty() { + return Ok((vec![], Height::zero())); + } + + let mut src_sequences = vec![]; + for pc in packet_commitments.iter() { + src_sequences.append(&mut vec![pc.sequence]); + } + info!( + "packets that still have commitments on source {:?}", + src_sequences + ); + + let request = QueryUnreceivedPacketsRequest { + port_id: src_channel.counterparty().port_id.to_string(), + channel_id: dst_channel_id.to_string(), + packet_commitment_sequences: src_sequences, + }; + + let packets_to_send = dst_chain.query_unreceived_packets(request)?; + + info!( + "packets to send out of the ones with commitments on source {:?}", + packets_to_send + ); + + Ok((packets_to_send, query_height)) +} + +pub fn build_and_send_recv_packet_messages( + dst_chain: &impl ChainHandle, + src_chain: &impl ChainHandle, + opts: &PacketOptions, +) -> Result, Error> { + let (sequences, height) = + target_height_and_sequences_of_recv_packets(dst_chain, src_chain, opts)?; + if sequences.is_empty() { + return Ok(vec!["No sent packets on source chain".to_string()]); + } + + let mut msgs = build_update_client(dst_chain, src_chain, &opts.dst_client_id.clone(), height)?; + + let mut packet_msgs = build_packet_recv_msgs( + dst_chain, + src_chain, + &opts.src_channel_id, + &opts.src_port_id, + height, + &sequences, + )?; + + msgs.append(&mut packet_msgs); + Ok(dst_chain.send_msgs(msgs)?) } diff --git a/relayer/src/msgs.rs b/relayer/src/msgs.rs index ecd83d4484..8b13789179 100644 --- a/relayer/src/msgs.rs +++ b/relayer/src/msgs.rs @@ -1,58 +1 @@ -pub use ibc::events::IBCEvent; -use ibc::ics02_client::client_def::AnyHeader; -pub use tendermint::Signature; -// What is the actual type here? -#[derive(Clone)] -pub enum Datagram { - NoOp(), - Packet(Packet), - ClientUpdate(ClientUpdate), -} - -pub type Datagrams = Vec; - -#[derive(Clone)] -pub struct Packet { - // type -// packetData -// seq: number, -// timeout: height, -// timeoutStampt: timestamp, -// commitmentProof: {proof, commitment -} - -pub struct Transaction {} - -impl Transaction { - pub fn new(_datagrams: Vec) -> Transaction { - // calculate the gas - Self {} - } - - pub fn sign(self, _signature: Signature) -> SignedTransaction { - SignedTransaction {} - } -} - -#[derive(Copy, Clone)] -pub struct EncodedTransaction {} - -pub struct SignedTransaction {} - -impl SignedTransaction { - pub fn encode(self) -> EncodedTransaction { - EncodedTransaction {} - } -} - -#[derive(Clone)] -pub struct ClientUpdate { - signed_headers: Vec, -} - -impl ClientUpdate { - pub fn new(signed_headers: Vec) -> ClientUpdate { - ClientUpdate { signed_headers } - } -} diff --git a/relayer/src/relay.rs b/relayer/src/relay.rs index 40e04c34eb..674bac6176 100644 --- a/relayer/src/relay.rs +++ b/relayer/src/relay.rs @@ -4,6 +4,7 @@ use crate::chain::handle::ChainHandle; use crate::channel::{Channel, ChannelConfig}; use crate::connection::{Connection, ConnectionConfig}; use crate::foreign_client::{ForeignClient, ForeignClientConfig}; +use crate::link::Link; pub(crate) const MAX_ITER: u32 = 10; @@ -37,18 +38,16 @@ pub fn channel_relay( )?; // Setup the channel over the connection - let _channel = Channel::new(a_chain_handle, b_chain_handle, connection, chan_cfg)?; - - // TODO: Re-enable `link` module in `relayer/src/lib.rs` - // let link = Link::new( - // a_chain_handle, - // b_chain_handle, - // client_on_src, // Actual dependecy - // channel, // Semantic dependecy - // LinkConfig::new(todo!(), todo!(), todo!()), - // )?; - - // link.run()?; + let channel = Channel::new( + a_chain_handle.clone(), + b_chain_handle.clone(), + connection, + chan_cfg, + )?; + + let link = Link::new(channel); + + link.run(a_chain_handle, b_chain_handle)?; Ok(()) } diff --git a/relayer/tests/config/fixtures/relayer_conf_example.toml b/relayer/tests/config/fixtures/relayer_conf_example.toml index 3907ad51a1..2e4e540d6a 100644 --- a/relayer/tests/config/fixtures/relayer_conf_example.toml +++ b/relayer/tests/config/fixtures/relayer_conf_example.toml @@ -14,6 +14,8 @@ client_ids = [ 'clA2', ] gas = 200000 +max_msg_num = 4 +max_tx_size = 1048576 clock_drift = '5s' trusting_period = '14days' @@ -29,7 +31,6 @@ account_prefix = 'cosmos' key_name = 'testkey' store_prefix = 'ibc' client_ids = ['clB1'] -gas = 200000 clock_drift = '5s' trusting_period = '14days'