Skip to content

Commit

Permalink
Released v0.1.8: Resolve to an error when payload exceeds server sett…
Browse files Browse the repository at this point in the history
…ings
  • Loading branch information
Mathieu Amiot committed Oct 29, 2018
1 parent 6212c60 commit 0aa18c9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ license = "MIT/Apache-2.0"
name = "nitox"
readme = "README.md"
repository = "https://github.com/YellowInnovation/nitox"
version = "0.1.7"
version = "0.1.8"

[[bench]]
harness = false
Expand Down
59 changes: 43 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ impl NatsClientOptions {
pub struct NatsClient {
/// Backup of options
opts: NatsClientOptions,
/// Server info
server_info: Arc<RwLock<Option<ServerInfo>>>,
/// Stream of the messages that are not caught for subscriptions (only system messages like PING/PONG should be here)
other_rx: Box<dyn Stream<Item = Op, Error = NatsError> + Send + Sync>,
/// Sink part to send commands
Expand Down Expand Up @@ -219,26 +221,37 @@ impl NatsClient {

let (tmp_other_tx, tmp_other_rx) = mpsc::unbounded();
let tx_inner = tx.clone();
let client = NatsClient {
tx,
server_info: Arc::new(RwLock::new(None)),
other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)),
rx: Arc::new(rx),
opts,
};

let server_info_arc = Arc::clone(&client.server_info);

tokio_executor::spawn(
other_rx
.for_each(move |op| {
if let Op::PING = op {
tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ()));
match op {
Op::PING => {
tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ()));
let _ = tmp_other_tx.unbounded_send(op);
}
Op::INFO(server_info) => {
*server_info_arc.write() = Some(server_info);
}
op => {
let _ = tmp_other_tx.unbounded_send(op);
}
}

let _ = tmp_other_tx.unbounded_send(op);
future::ok(())
}).into_future()
.map_err(|_| ()),
);

let client = NatsClient {
tx,
other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)),
rx: Arc::new(rx),
opts,
};

future::ok(client)
})
}
Expand Down Expand Up @@ -267,7 +280,13 @@ impl NatsClient {
///
/// Returns `impl Future<Item = (), Error = NatsError>`
pub fn publish(&self, cmd: PubCommand) -> impl Future<Item = (), Error = NatsError> + Send + Sync {
self.tx.send(Op::PUB(cmd))
if let Some(ref server_info) = *self.server_info.read() {
if cmd.payload.len() > server_info.max_payload as usize {
return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
}
}

Either::B(self.tx.send(Op::PUB(cmd)))
}

/// Send a UNSUB command to the server and de-register stream in the multiplexer
Expand Down Expand Up @@ -333,6 +352,12 @@ impl NatsClient {
subject: String,
payload: Bytes,
) -> impl Future<Item = Message, Error = NatsError> + Send + Sync {
if let Some(ref server_info) = *self.server_info.read() {
if payload.len() > server_info.max_payload as usize {
return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
}
}

let inbox = PubCommand::generate_reply_to();
let pub_cmd = PubCommand {
subject,
Expand Down Expand Up @@ -370,10 +395,12 @@ impl NatsClient {
future::ok(msg)
});

self.tx
.send(Op::SUB(sub_cmd))
.and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd)))
.and_then(move |_| tx2.send(Op::PUB(pub_cmd)))
.and_then(move |_| stream)
Either::B(
self.tx
.send(Op::SUB(sub_cmd))
.and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd)))
.and_then(move |_| tx2.send(Op::PUB(pub_cmd)))
.and_then(move |_| stream),
)
}
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ pub enum NatsError {
/// Something went wrong in one of the Reciever/Sender pairs
#[fail(display = "InnerBrokenChain: the sender/receiver pair has been disconnected")]
InnerBrokenChain,
/// The user supplied a too big payload for the server
#[fail(
display = "MaxPayloadOverflow: the given payload exceeds the server setting (max_payload_size = {})",
_0
)]
MaxPayloadOverflow(u32),
/// Generic string error
#[fail(display = "GenericError: {}", _0)]
GenericError(String),
Expand Down
24 changes: 12 additions & 12 deletions src/protocol/server/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,50 @@ use serde_json as json;
pub struct ServerInfo {
/// The unique identifier of the NATS server
#[builder(setter(into))]
server_id: String,
pub(crate) server_id: String,
/// The version of the NATS server
#[builder(setter(into))]
version: String,
pub(crate) version: String,
/// The version of golang the NATS server was built with
#[builder(setter(into))]
go: String,
pub(crate) go: String,
/// The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be configured with
/// `-client_advertise host:port`
#[builder(setter(into))]
host: String,
pub(crate) host: String,
/// The port number the NATS server is configured to listen on
#[builder(setter(into))]
port: u32,
pub(crate) port: u32,
/// Maximum payload size, in bytes, that the server will accept from the client.
#[builder(setter(into))]
max_payload: u32,
pub(crate) max_payload: u32,
/// An integer indicating the protocol version of the server. The server version 1.2.0 sets this to 1 to indicate
/// that it supports the “Echo” feature.
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
proto: Option<u8>,
pub(crate) proto: Option<u8>,
/// An optional unsigned integer (64 bits) representing the internal client identifier in the server. This can be
/// used to filter client connections in monitoring, correlate with error logs, etc…
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
client_id: Option<u64>,
pub(crate) client_id: Option<u64>,
/// If this is set, then the client should try to authenticate upon connect.
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
auth_required: Option<bool>,
pub(crate) auth_required: Option<bool>,
/// If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be ssl_required and has
/// been updated along with the protocol from SSL to TLS.
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
tls_required: Option<bool>,
pub(crate) tls_required: Option<bool>,
/// If this is set, the client must provide a valid certificate during the TLS handshake.
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
tls_verify: Option<bool>,
pub(crate) tls_verify: Option<bool>,
/// An optional list of server urls that a client can connect to.
#[builder(default)]
#[serde(skip_serializing_if = "Option::is_none")]
connect_urls: Option<Vec<String>>,
pub(crate) connect_urls: Option<Vec<String>>,
}

impl ServerInfo {
Expand Down

0 comments on commit 0aa18c9

Please sign in to comment.