From 0aa18c90a960e7d21e3f18e43b53da41fbe19093 Mon Sep 17 00:00:00 2001 From: Mathieu Amiot Date: Mon, 29 Oct 2018 16:15:23 +0100 Subject: [PATCH] Released v0.1.8: Resolve to an error when payload exceeds server settings --- Cargo.toml | 2 +- src/client.rs | 59 +++++++++++++++++++++++++++---------- src/error.rs | 6 ++++ src/protocol/server/info.rs | 24 +++++++-------- 4 files changed, 62 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5444254..58796cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ license = "MIT/Apache-2.0" name = "nitox" readme = "README.md" repository = "https://github.com/YellowInnovation/nitox" -version = "0.1.7" +version = "0.1.8" [[bench]] harness = false diff --git a/src/client.rs b/src/client.rs index 4b0e21c..005afd4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -152,6 +152,8 @@ impl NatsClientOptions { pub struct NatsClient { /// Backup of options opts: NatsClientOptions, + /// Server info + server_info: Arc>>, /// Stream of the messages that are not caught for subscriptions (only system messages like PING/PONG should be here) other_rx: Box + Send + Sync>, /// Sink part to send commands @@ -219,26 +221,37 @@ impl NatsClient { let (tmp_other_tx, tmp_other_rx) = mpsc::unbounded(); let tx_inner = tx.clone(); + let client = NatsClient { + tx, + server_info: Arc::new(RwLock::new(None)), + other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)), + rx: Arc::new(rx), + opts, + }; + + let server_info_arc = Arc::clone(&client.server_info); + tokio_executor::spawn( other_rx .for_each(move |op| { - if let Op::PING = op { - tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ())); + match op { + Op::PING => { + tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ())); + let _ = tmp_other_tx.unbounded_send(op); + } + Op::INFO(server_info) => { + *server_info_arc.write() = Some(server_info); + } + op => { + let _ = tmp_other_tx.unbounded_send(op); + } } - let _ = tmp_other_tx.unbounded_send(op); future::ok(()) }).into_future() .map_err(|_| ()), ); - let client = NatsClient { - tx, - other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)), - rx: Arc::new(rx), - opts, - }; - future::ok(client) }) } @@ -267,7 +280,13 @@ impl NatsClient { /// /// Returns `impl Future` pub fn publish(&self, cmd: PubCommand) -> impl Future + Send + Sync { - self.tx.send(Op::PUB(cmd)) + if let Some(ref server_info) = *self.server_info.read() { + if cmd.payload.len() > server_info.max_payload as usize { + return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload))); + } + } + + Either::B(self.tx.send(Op::PUB(cmd))) } /// Send a UNSUB command to the server and de-register stream in the multiplexer @@ -333,6 +352,12 @@ impl NatsClient { subject: String, payload: Bytes, ) -> impl Future + Send + Sync { + if let Some(ref server_info) = *self.server_info.read() { + if payload.len() > server_info.max_payload as usize { + return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload))); + } + } + let inbox = PubCommand::generate_reply_to(); let pub_cmd = PubCommand { subject, @@ -370,10 +395,12 @@ impl NatsClient { future::ok(msg) }); - self.tx - .send(Op::SUB(sub_cmd)) - .and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd))) - .and_then(move |_| tx2.send(Op::PUB(pub_cmd))) - .and_then(move |_| stream) + Either::B( + self.tx + .send(Op::SUB(sub_cmd)) + .and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd))) + .and_then(move |_| tx2.send(Op::PUB(pub_cmd))) + .and_then(move |_| stream), + ) } } diff --git a/src/error.rs b/src/error.rs index 1a8be51..86d66ee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -53,6 +53,12 @@ pub enum NatsError { /// Something went wrong in one of the Reciever/Sender pairs #[fail(display = "InnerBrokenChain: the sender/receiver pair has been disconnected")] InnerBrokenChain, + /// The user supplied a too big payload for the server + #[fail( + display = "MaxPayloadOverflow: the given payload exceeds the server setting (max_payload_size = {})", + _0 + )] + MaxPayloadOverflow(u32), /// Generic string error #[fail(display = "GenericError: {}", _0)] GenericError(String), diff --git a/src/protocol/server/info.rs b/src/protocol/server/info.rs index 3bc3126..ac97a28 100644 --- a/src/protocol/server/info.rs +++ b/src/protocol/server/info.rs @@ -12,50 +12,50 @@ use serde_json as json; pub struct ServerInfo { /// The unique identifier of the NATS server #[builder(setter(into))] - server_id: String, + pub(crate) server_id: String, /// The version of the NATS server #[builder(setter(into))] - version: String, + pub(crate) version: String, /// The version of golang the NATS server was built with #[builder(setter(into))] - go: String, + pub(crate) go: String, /// The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be configured with /// `-client_advertise host:port` #[builder(setter(into))] - host: String, + pub(crate) host: String, /// The port number the NATS server is configured to listen on #[builder(setter(into))] - port: u32, + pub(crate) port: u32, /// Maximum payload size, in bytes, that the server will accept from the client. #[builder(setter(into))] - max_payload: u32, + pub(crate) max_payload: u32, /// An integer indicating the protocol version of the server. The server version 1.2.0 sets this to 1 to indicate /// that it supports the “Echo” feature. #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - proto: Option, + pub(crate) proto: Option, /// An optional unsigned integer (64 bits) representing the internal client identifier in the server. This can be /// used to filter client connections in monitoring, correlate with error logs, etc… #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - client_id: Option, + pub(crate) client_id: Option, /// If this is set, then the client should try to authenticate upon connect. #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - auth_required: Option, + pub(crate) auth_required: Option, /// If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be ssl_required and has /// been updated along with the protocol from SSL to TLS. #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - tls_required: Option, + pub(crate) tls_required: Option, /// If this is set, the client must provide a valid certificate during the TLS handshake. #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - tls_verify: Option, + pub(crate) tls_verify: Option, /// An optional list of server urls that a client can connect to. #[builder(default)] #[serde(skip_serializing_if = "Option::is_none")] - connect_urls: Option>, + pub(crate) connect_urls: Option>, } impl ServerInfo {