From f2df3b44973148f6e159b3fd9709f6eff67f59f4 Mon Sep 17 00:00:00 2001 From: Mathieu Amiot Date: Wed, 7 Nov 2018 22:07:13 +0100 Subject: [PATCH 1/6] WIP --- Cargo.toml | 13 + README.md | 25 +- build.rs | 13 + src/{ => client}/client.rs | 160 +-------- src/client/mod.rs | 16 + src/client/multiplexer.rs | 80 +++++ src/client/sender.rs | 38 ++ src/lib.rs | 11 +- src/streaming/error.rs | 38 ++ src/streaming/mod.rs | 10 + src/streaming/protocol/nats-streaming.proto | 138 ++++++++ src/streaming/streaming_client.rs | 368 ++++++++++++++++++++ src/streaming/streaming_protocol.rs | 1 + src/streaming/subscription.rs | 110 ++++++ 14 files changed, 871 insertions(+), 150 deletions(-) create mode 100644 build.rs rename src/{ => client}/client.rs (68%) create mode 100644 src/client/mod.rs create mode 100644 src/client/multiplexer.rs create mode 100644 src/client/sender.rs create mode 100644 src/streaming/error.rs create mode 100644 src/streaming/mod.rs create mode 100644 src/streaming/protocol/nats-streaming.proto create mode 100644 src/streaming/streaming_client.rs create mode 100644 src/streaming/streaming_protocol.rs create mode 100644 src/streaming/subscription.rs diff --git a/Cargo.toml b/Cargo.toml index 419b8aa..903c612 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,14 @@ name = "nitox" readme = "README.md" repository = "https://github.com/YellowInnovation/nitox" version = "0.1.10" +build = "build.rs" + +[package.metadata.docs.rs] +features = ["nats-streaming"] + +[features] +default = ["nats-streaming"] +nats-streaming = ["prost", "prost-derive", "prost-build"] [[bench]] harness = false @@ -45,6 +53,8 @@ tokio-executor = "0.1" tokio-tcp = "0.1" tokio-tls = "0.2" url = "1.7" +prost = { version = "0.4", optional = true } +prost-derive = { version = "0.4", optional = true } [dependencies.serde_json] features = ["preserve_order"] @@ -55,6 +65,9 @@ criterion = "0.2" env_logger = "0.6" tokio = "0.1" +[build-dependencies] +prost-build = { version = "0.4", optional = true } + [[example]] name = "request_with_reply" diff --git a/README.md b/README.md index 29ab7cb..dc41959 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Nitox - Tokio-based async NATS client +# Nitox - Tokio-based async NATS / NATS Streaming Server client [![Crates.io](https://img.shields.io/crates/v/nitox.svg)](https://crates.io/crates/nitox) [![docs.rs](https://docs.rs/nitox/badge.svg)](https://docs.rs/nitox) @@ -11,12 +11,12 @@ A lot of features are currently missing, so feel free to contribute and help us Missing features: -- [x] Find a way to integration test the reconnection mechanism - but it has actually been hand-tested and works -- [x] Auto-pruning of subscriptions being unsubscribed after X messages - It's actually a bug, since a stream stays open albeit sleeping +- [x] Find a way to integration test the reconnection mechanism - The 10M requests integration test provokes a slow consumer and a disconnection, which is handled well +- [x] Auto-pruning of subscriptions being unsubscribed after X messages - [ ] Handle verbose mode - [x] Handle pedantic mode - Should work OOB since we're closely following the protocol (Edit: it does) -- [ ] Switch parsing to using `nom` - We're not sure we can handle very weird clients; we're fine talking to official ones right now -- [ ] Add support for NATS Streaming Server - Should be pretty easy with `prost` since we already have the async architecture going on +- [x] Switch parsing to using `nom` - well, our parser is speedy/robust enough so I don't think it's worth the effort +- [x] Add support for NATS Streaming Server - Available under the `nats-streaming` feature flag! *There's a small extra in the `tests/` folder, some of our integration tests rely on a custom NATS server implemented with `tokio` that only implements a subset of the protocol to fit our needs for the integration testing.* @@ -28,7 +28,14 @@ Here: [http://docs.rs/nitox](http://docs.rs/nitox) ```toml [dependencies] -nitox = "0.1" +nitox = "0.2" +``` + +With NATS Streaming Server support enabled + +```toml +[dependencies] +nitox = { version = "0.2", features = ["nats-streaming"] } ``` ## Usage @@ -37,7 +44,7 @@ nitox = "0.1" extern crate nitox; extern crate futures; use futures::{prelude::*, future}; -use nitox::{NatsClient, NatsClientOptions, NatsError, commands::*}; +use nitox::{NatsClient, NatsClientOptions, NatsError, commands::*, streaming::*}; fn connect_to_nats() -> impl Future { // Defaults as recommended per-spec, but you can customize them @@ -57,6 +64,10 @@ fn connect_to_nats() -> impl Future { // Client has sent its CONNECT command and is ready for usage future::ok(client) }) + .and_then(|client| { + // Also, you can switch to NATS Streaming Server client seamlessly + future::ok(NatsStreamingClient::from(client)) + }) } ``` diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..5ce864c --- /dev/null +++ b/build.rs @@ -0,0 +1,13 @@ +#[cfg(feature = "nats-streaming")] +extern crate prost_build; + +fn main() -> Result<(), std::io::Error> { + if cfg!(feature = "nats-streaming") == false { + return Ok(()); + } + + prost_build::compile_protos(&["src/streaming/protocol/nats-streaming.proto"], &["src/"])?; + + println!("Protobuf translation done."); + Ok(()) +} diff --git a/src/client.rs b/src/client/client.rs similarity index 68% rename from src/client.rs rename to src/client/client.rs index 9be1f9c..1fe1fd7 100644 --- a/src/client.rs +++ b/src/client/client.rs @@ -3,138 +3,26 @@ use bytes::Bytes; use futures::{ future::{self, Either}, prelude::*, - stream, sync::mpsc, Future, }; use parking_lot::RwLock; use std::{ - collections::HashMap, net::{SocketAddr, ToSocketAddrs}, str::FromStr, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use tokio_executor; use url::Url; +use super::{NatsClientMultiplexer, NatsClientSender, NatsSink, NatsStream}; use error::NatsError; use net::*; use protocol::{commands::*, Op}; -/// Sink (write) part of a TCP stream -type NatsSink = stream::SplitSink; -/// Stream (read) part of a TCP stream -type NatsStream = stream::SplitStream; -/// Useless pretty much, just for code semantics -type NatsSubscriptionId = String; - -/// Keep-alive for the sink, also supposed to take care of handling verbose messaging, but can't for now -#[derive(Clone, Debug)] -struct NatsClientSender { - tx: mpsc::UnboundedSender, - verbose: bool, -} - -impl NatsClientSender { - pub fn new(sink: NatsSink) -> Self { - let (tx, rx) = mpsc::unbounded(); - let rx = rx.map_err(|_| NatsError::InnerBrokenChain); - let work = sink.send_all(rx).map(|_| ()).map_err(|_| ()); - tokio_executor::spawn(work); - - NatsClientSender { tx, verbose: false } - } - - #[allow(dead_code)] - pub fn set_verbose(&mut self, verbose: bool) { - self.verbose = verbose; - } - - /// Sends an OP to the server - pub fn send(&self, op: Op) -> impl Future { - //let _verbose = self.verbose.clone(); - self.tx - .unbounded_send(op) - .map_err(|_| NatsError::InnerBrokenChain) - .into_future() - } -} - -#[derive(Debug)] -struct SubscriptionSink { - tx: mpsc::UnboundedSender, - max_count: Option, - count: u32, -} - -/// Internal multiplexer for incoming streams and subscriptions. Quite a piece of code, with almost no overhead yay -#[derive(Debug)] -struct NatsClientMultiplexer { - other_tx: Arc>, - subs_tx: Arc>>, -} - -impl NatsClientMultiplexer { - pub fn new(stream: NatsStream) -> (Self, mpsc::UnboundedReceiver) { - let subs_tx: Arc>> = - Arc::new(RwLock::new(HashMap::default())); - - let (other_tx, other_rx) = mpsc::unbounded(); - let other_tx = Arc::new(other_tx); - - let stx_inner = Arc::clone(&subs_tx); - let otx_inner = Arc::clone(&other_tx); - - // Here we filter the incoming TCP stream Messages by subscription ID and sending it to the appropriate Sender - let work_tx = stream - .for_each(move |op| { - match op { - Op::MSG(msg) => { - debug!(target: "nitox", "Found MSG from global Stream {:?}", msg); - if let Some(s) = (*stx_inner.read()).get(&msg.sid) { - debug!(target: "nitox", "Found multiplexed receiver to send to {}", msg.sid); - let _ = s.tx.unbounded_send(msg); - } - } - // Forward the rest of the messages to the owning client - op => { - debug!(target: "nitox", "Sending OP to the rest of the queue: {:?}", op); - let _ = otx_inner.unbounded_send(op); - } - } - - future::ok::<(), NatsError>(()) - }) - .map(|_| ()) - .map_err(|_| ()); - - tokio_executor::spawn(work_tx); - - (NatsClientMultiplexer { subs_tx, other_tx }, other_rx) - } - - pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream + Send + Sync { - let (tx, rx) = mpsc::unbounded(); - (*self.subs_tx.write()).insert( - sid, - SubscriptionSink { - tx, - max_count: None, - count: 0, - }, - ); - - rx.map_err(|_| NatsError::InnerBrokenChain) - } - - pub fn remove_sid(&self, sid: &str) { - if let Some(mut tx) = (*self.subs_tx.write()).remove(sid) { - let _ = tx.tx.close(); - drop(tx); - } - } -} - /// Options that are to be given to the client for initialization #[derive(Debug, Default, Clone, Builder)] #[builder(setter(into))] @@ -155,7 +43,11 @@ impl NatsClientOptions { /// the system messages that are forwarded on the `Stream` that the client implements pub struct NatsClient { /// Backup of options - opts: NatsClientOptions, + pub(crate) opts: NatsClientOptions, + /// Ack for verbose + got_ack: Arc, + /// Verbose setting + verbose: AtomicBool, /// Server info server_info: Arc>>, /// Stream of the messages that are not caught for subscriptions (only system messages like PING/PONG should be here) @@ -188,8 +80,6 @@ impl Stream for NatsClient { impl NatsClient { /// Creates a client and initiates a connection to the server - /// - /// Returns `impl Future` pub fn from_options(opts: NatsClientOptions) -> impl Future + Send + Sync { let tls_required = opts.connect_command.tls_required; @@ -231,10 +121,14 @@ impl NatsClient { server_info: Arc::new(RwLock::new(None)), other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)), rx: Arc::new(rx), + verbose: AtomicBool::from(opts.connect_command.verbose), + got_ack: Arc::new(AtomicBool::default()), opts, }; let server_info_arc = Arc::clone(&client.server_info); + let ack_arc = Arc::clone(&client.got_ack); + let is_verbose = client.verbose.load(Ordering::SeqCst); tokio_executor::spawn( other_rx @@ -247,6 +141,11 @@ impl NatsClient { Op::INFO(server_info) => { *server_info_arc.write() = Some(server_info); } + Op::OK => { + if is_verbose { + ack_arc.store(true, Ordering::SeqCst); + } + } op => { let _ = tmp_other_tx.unbounded_send(op); } @@ -263,28 +162,13 @@ impl NatsClient { } /// Sends the CONNECT command to the server to setup connection - /// - /// Returns `impl Future` pub fn connect(self) -> impl Future + Send + Sync { self.tx .send(Op::CONNECT(self.opts.connect_command.clone())) .and_then(move |_| future::ok(self)) } - /// Send a raw command to the server - /// - /// Returns `impl Future` - #[deprecated( - since = "0.1.4", - note = "Using this method prevents the library to track what you are sending to the server and causes memory leaks in case of subscriptions/unsubs, it'll be fully removed in v0.2.0" - )] - pub fn send(self, op: Op) -> impl Future { - self.tx.send(op).and_then(move |_| future::ok(self)) - } - /// Send a PUB command to the server - /// - /// Returns `impl Future` pub fn publish(&self, cmd: PubCommand) -> impl Future + Send + Sync { if let Some(ref server_info) = *self.server_info.read() { if cmd.payload.len() > server_info.max_payload as usize { @@ -296,8 +180,6 @@ impl NatsClient { } /// Send a UNSUB command to the server and de-register stream in the multiplexer - /// - /// Returns `impl Future` pub fn unsubscribe(&self, cmd: UnsubCommand) -> impl Future + Send + Sync { let mut unsub_now = true; if let Some(max) = cmd.max_msgs { @@ -320,8 +202,6 @@ impl NatsClient { } /// Send a SUB command and register subscription stream in the multiplexer and return that `Stream` in a future - /// - /// Returns `impl Future>` pub fn subscribe( &self, cmd: SubCommand, @@ -361,8 +241,6 @@ impl NatsClient { } /// Performs a request to the server following the Request/Reply pattern. Returns a future containing the MSG that will be replied at some point by a third party - /// - /// Returns `impl Future` pub fn request( &self, subject: String, diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..7e54710 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,16 @@ +use futures::stream; +use net::*; + +/// Sink (write) part of a TCP stream +type NatsSink = stream::SplitSink; +/// Stream (read) part of a TCP stream +type NatsStream = stream::SplitStream; +/// Useless pretty much, just for code semantics +type NatsSubscriptionId = String; + +mod sender; +pub(crate) use self::sender::*; +mod multiplexer; +pub(crate) use self::multiplexer::*; +mod client; +pub use self::client::*; diff --git a/src/client/multiplexer.rs b/src/client/multiplexer.rs new file mode 100644 index 0000000..dad4622 --- /dev/null +++ b/src/client/multiplexer.rs @@ -0,0 +1,80 @@ +use futures::{future, prelude::*, sync::mpsc}; +use parking_lot::RwLock; +use std::{collections::HashMap, sync::Arc}; +use tokio_executor; + +use super::NatsStream; +use super::NatsSubscriptionId; +use error::NatsError; +use protocol::{commands::*, Op}; + +#[derive(Debug)] +pub(crate) struct SubscriptionSink { + tx: mpsc::UnboundedSender, + pub(crate) max_count: Option, + pub(crate) count: u32, +} + +/// Internal multiplexer for incoming streams and subscriptions. Quite a piece of code, with almost no overhead yay +#[derive(Debug)] +pub(crate) struct NatsClientMultiplexer { + pub(crate) other_tx: Arc>, + pub(crate) subs_tx: Arc>>, +} + +impl NatsClientMultiplexer { + pub fn new(stream: NatsStream) -> (Self, mpsc::UnboundedReceiver) { + let subs_tx: Arc>> = + Arc::new(RwLock::new(HashMap::default())); + + let (other_tx, other_rx) = mpsc::unbounded(); + let other_tx = Arc::new(other_tx); + + let stx_inner = Arc::clone(&subs_tx); + let otx_inner = Arc::clone(&other_tx); + + // Here we filter the incoming TCP stream Messages by subscription ID and sending it to the appropriate Sender + let work_tx = stream + .for_each(move |op| { + match op { + Op::MSG(msg) => { + debug!(target: "nitox", "Found MSG from global Stream {:?}", msg); + if let Some(s) = (*stx_inner.read()).get(&msg.sid) { + debug!(target: "nitox", "Found multiplexed receiver to send to {}", msg.sid); + let _ = s.tx.unbounded_send(msg); + } + } + // Forward the rest of the messages to the owning client + op => { + debug!(target: "nitox", "Sending OP to the rest of the queue: {:?}", op); + let _ = otx_inner.unbounded_send(op); + } + } + + future::ok::<(), NatsError>(()) + }).map(|_| ()) + .map_err(|_| ()); + + tokio_executor::spawn(work_tx); + + (NatsClientMultiplexer { subs_tx, other_tx }, other_rx) + } + + pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream + Send + Sync { + let (tx, rx) = mpsc::unbounded(); + (*self.subs_tx.write()).insert( + sid, + SubscriptionSink { + tx, + max_count: None, + count: 0, + }, + ); + + rx.map_err(|_| NatsError::InnerBrokenChain) + } + + pub fn remove_sid(&self, sid: &str) { + (*self.subs_tx.write()).remove(sid); + } +} diff --git a/src/client/sender.rs b/src/client/sender.rs new file mode 100644 index 0000000..4763952 --- /dev/null +++ b/src/client/sender.rs @@ -0,0 +1,38 @@ +use futures::{prelude::*, sync::mpsc, Future}; +use tokio_executor; + +use super::NatsSink; +use error::NatsError; +use protocol::Op; + +/// Keep-alive for the sink, also supposed to take care of handling verbose messaging, but can't for now +#[derive(Clone, Debug)] +pub(crate) struct NatsClientSender { + tx: mpsc::UnboundedSender, + verbose: bool, +} + +impl NatsClientSender { + pub fn new(sink: NatsSink) -> Self { + let (tx, rx) = mpsc::unbounded(); + let rx = rx.map_err(|_| NatsError::InnerBrokenChain); + let work = sink.send_all(rx).map(|_| ()).map_err(|_| ()); + tokio_executor::spawn(work); + + NatsClientSender { tx, verbose: false } + } + + #[allow(dead_code)] + pub fn set_verbose(&mut self, verbose: bool) { + self.verbose = verbose; + } + + /// Sends an OP to the server + pub fn send(&self, op: Op) -> impl Future { + //let _verbose = self.verbose.clone(); + self.tx + .unbounded_send(op) + .map_err(|_| NatsError::InnerBrokenChain) + .into_future() + } +} diff --git a/src/lib.rs b/src/lib.rs index 248e3bd..6f35705 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,18 +99,25 @@ extern crate tokio_tcp; extern crate tokio_tls; extern crate url; +#[cfg(feature = "nats-streaming")] +extern crate prost; +#[cfg(feature = "nats-streaming")] +#[macro_use] +extern crate prost_derive; + #[macro_use] mod error; // TODO: Handle verbose mode -// TODO: Switch parsing to using `nom` -// TODO: Support NATS Streaming Server pub use self::error::*; pub mod codec; mod protocol; pub use self::protocol::*; +#[cfg(feature = "nats-streaming")] +pub mod streaming; + pub(crate) mod net; mod client; diff --git a/src/streaming/error.rs b/src/streaming/error.rs new file mode 100644 index 0000000..0151093 --- /dev/null +++ b/src/streaming/error.rs @@ -0,0 +1,38 @@ +use NatsError; + +/// Error enum for all cases of internal/external errors occuring during client execution +#[derive(Debug, Fail)] +pub enum NatsStreamingError { + #[fail(display = "NatsError: {}", _0)] + NatsError(NatsError), + #[fail(display = "ProtobufDecodeError: {}", _0)] + ProtobufDecodeError(prost::DecodeError), + #[fail(display = "ProtobufEncodeError: {}", _0)] + ProtobufEncodeError(prost::EncodeError), + #[fail(display = "CannotAck for GUID {}", _0)] + CannotAck(String), + #[fail(display = "ServerError: {}", _0)] + ServerError(String), + #[fail(display = "Please provide a Cluster ID")] + MissingClusterId, + #[fail(display = "An error has occured in the Subscription Stream")] + SubscriptionError, +} + +impl From> for NatsStreamingError { + fn from(_: futures::sync::mpsc::SendError) -> Self { + NatsStreamingError::SubscriptionError + } +} + +from_error!(NatsError, NatsStreamingError, NatsStreamingError::NatsError); +from_error!( + prost::DecodeError, + NatsStreamingError, + NatsStreamingError::ProtobufDecodeError +); +from_error!( + prost::EncodeError, + NatsStreamingError, + NatsStreamingError::ProtobufEncodeError +); diff --git a/src/streaming/mod.rs b/src/streaming/mod.rs new file mode 100644 index 0000000..5e7f8e6 --- /dev/null +++ b/src/streaming/mod.rs @@ -0,0 +1,10 @@ +pub(crate) mod streaming_protocol; + +pub mod error; +mod streaming_client; +mod subscription; + +pub mod client { + pub use super::streaming_client::*; + pub use super::subscription::*; +} diff --git a/src/streaming/protocol/nats-streaming.proto b/src/streaming/protocol/nats-streaming.proto new file mode 100644 index 0000000..4741110 --- /dev/null +++ b/src/streaming/protocol/nats-streaming.proto @@ -0,0 +1,138 @@ +// Copyright 2016-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Uses https://github.com/gogo/protobuf +// compiled via `protoc -I=. -I=$GOPATH/src --gogofaster_out=. protocol.proto` + +syntax = "proto3"; +package nats.streaming; + +// How messages are delivered to the STAN cluster +message PubMsg { + string clientID = 1; // ClientID + string guid = 2; // guid + string subject = 3; // subject + string reply = 4; // optional reply + bytes data = 5; // payload + bytes connID = 6; // Connection ID. For servers that know about this field, clientID can be omitted + + bytes sha256 = 10; // optional sha256 of data +} + +// Used to ACK to publishers +message PubAck { + string guid = 1; // guid + string error = 2; // err string, empty/omitted if no error +} + +// Msg struct. Sequence is assigned for global ordering by +// the cluster after the publisher has been acknowledged. +message MsgProto { + uint64 sequence = 1; // globally ordered sequence number for the subject's channel + string subject = 2; // subject + string reply = 3; // optional reply + bytes data = 4; // payload + int64 timestamp = 5; // received timestamp + bool redelivered = 6; // Flag specifying if the message is being redelivered + + uint32 CRC32 = 10; // optional IEEE CRC32 +} + +// Ack will deliver an ack for a delivered msg. +message Ack { + string subject = 1; // Subject + uint64 sequence = 2; // Sequence to acknowledge +} + +// Connection Request +message ConnectRequest { + string clientID = 1; // Client name/identifier. + string heartbeatInbox = 2; // Inbox for server initiated heartbeats. + int32 protocol = 3; // Protocol the client is at. + bytes connID = 4; // Connection ID, a way to uniquely identify a connection (no connection should ever have the same) + int32 pingInterval = 5; // Interval at which client wishes to send PINGs (expressed in seconds). + int32 pingMaxOut = 6; // Maximum number of PINGs without a response after which the connection can be considered lost. +} + +// Response to a client connect +message ConnectResponse { + string pubPrefix = 1; // Prefix to use when publishing to this STAN cluster + string subRequests = 2; // Subject to use for subscription requests + string unsubRequests = 3; // Subject to use for unsubscribe requests + string closeRequests = 4; // Subject for closing the stan connection + string error = 5; // err string, empty/omitted if no error + string subCloseRequests = 6; // Subject to use for subscription close requests + string pingRequests = 7; // Subject to use for PING requests + int32 pingInterval = 8; // Interval at which client should send PINGs (expressed in seconds). + int32 pingMaxOut = 9; // Maximum number of PINGs without a response after which the connection can be considered lost + int32 protocol = 10; // Protocol version the server is at + + string publicKey = 100; // Possibly used to sign acks, etc. +} + +// PING from client to server +message Ping { + bytes connID = 1; // Connection ID +} + +// PING response from the server +message PingResponse { + string error = 1; // Error string, empty/omitted if no error +} + +// Enum for start position type. +enum StartPosition { + NewOnly = 0; + LastReceived = 1; + TimeDeltaStart = 2; + SequenceStart = 3; + First = 4; + } + +// Protocol for a client to subscribe +message SubscriptionRequest { + string clientID = 1; // ClientID + string subject = 2; // Formal subject to subscribe to, e.g. foo.bar + string qGroup = 3; // Optional queue group + string inbox = 4; // Inbox subject to deliver messages on + int32 maxInFlight = 5; // Maximum inflight messages without an ack allowed + int32 ackWaitInSecs = 6; // Timeout for receiving an ack from the client + string durableName = 7; // Optional durable name which survives client restarts + StartPosition startPosition = 10; // Start position + uint64 startSequence = 11; // Optional start sequence number + int64 startTimeDelta = 12; // Optional start time +} + +// Response for SubscriptionRequest and UnsubscribeRequests +message SubscriptionResponse { + string ackInbox = 2; // ackInbox for sending acks + string error = 3; // err string, empty/omitted if no error +} + +// Protocol for a clients to unsubscribe. Will return a SubscriptionResponse +message UnsubscribeRequest { + string clientID = 1; // ClientID + string subject = 2; // subject for the subscription + string inbox = 3; // Inbox subject to identify subscription + string durableName = 4; // Optional durable name which survives client restarts +} + +// Protocol for a client to close a connection +message CloseRequest { + string clientID = 1; // Client name provided to Connect() requests +} + +// Response for CloseRequest +message CloseResponse { + string error = 1; // err string, empty/omitted if no error +} diff --git a/src/streaming/streaming_client.rs b/src/streaming/streaming_client.rs new file mode 100644 index 0000000..1e72a2e --- /dev/null +++ b/src/streaming/streaming_client.rs @@ -0,0 +1,368 @@ +use super::{ + client::{StreamingSubscription, StreamingSubscriptionSettings}, + error::NatsStreamingError, + streaming_protocol as streaming, +}; +use bytes::{Bytes, BytesMut}; +use client::NatsClient; +use futures::{ + future::{self, Either}, + prelude::*, + sync::oneshot, +}; +use parking_lot::RwLock; +use prost::Message; +use protocol::commands; +use rand::Rng; +use std::collections::HashMap; +use std::sync::Arc; +use NatsError; + +static DISCOVER_PREFIX: &'static str = "_STAN.discover"; +static ACK_PREFIX: &'static str = "_NITOX.acks"; + +#[derive(Debug, Clone, Default)] +pub(crate) struct NatsStreamingClientConfiguration { + pub(crate) pub_prefix: String, + pub(crate) sub_requests: String, + pub(crate) unsub_requests: String, + pub(crate) close_requests: String, + pub(crate) sub_close_requests: String, + pub(crate) ping_requests: String, + pub(crate) hb_subject: String, + pub(crate) ack_subject: String, +} + +impl NatsStreamingClientConfiguration { + pub(crate) fn assign_from_connect_resp(&mut self, resp: streaming::ConnectResponse) { + self.pub_prefix = resp.pub_prefix; + self.sub_requests = resp.sub_requests; + self.unsub_requests = resp.unsub_requests; + self.close_requests = resp.close_requests; + self.sub_close_requests = resp.sub_close_requests; + self.ping_requests = resp.ping_requests; + } +} + +#[derive(Debug, Clone, Default, Builder)] +#[builder(default)] +pub struct SubscribeOptions { + queue_group: Option, + #[builder(default = "16384")] + max_in_flight: i32, + #[builder(default = "30000")] + ack_max_wait_in_secs: i32, + durable_name: Option, + start_position: streaming::StartPosition, + start_sequence: Option, + start_sequence_delta: Option, +} + +#[derive(Debug)] +pub struct NatsStreamingClient { + pub(crate) nats: Arc, + ack: Arc>>>>, + pub(crate) client_id: String, + cluster_id: Option, + pub(crate) config: Arc>, +} + +impl From for NatsStreamingClient { + fn from(client: NatsClient) -> Self { + NatsStreamingClient { + nats: Arc::new(client), + ack: Arc::new(RwLock::new(HashMap::new())), + client_id: format!("nitox.streaming.{}", Self::generate_guid()), + cluster_id: None, + config: Arc::new(RwLock::new(NatsStreamingClientConfiguration { + hb_subject: Self::generate_guid(), + ack_subject: format!("{}.{}", ACK_PREFIX, Self::generate_guid()), + ..Default::default() + })), + } + } +} + +impl NatsStreamingClient { + pub(crate) fn encode_message(msg: T) -> Result { + let mut buf = BytesMut::with_capacity(msg.encoded_len()); + msg.encode(&mut buf)?; + Ok(buf.freeze()) + } + + pub(crate) fn generate_guid() -> String { + let mut rng = rand::thread_rng(); + rng.sample_iter(&rand::distributions::Alphanumeric).take(16).collect() + } + + pub fn try_eject_streaming(mut self) -> Result { + match Arc::try_unwrap(self.nats) { + Ok(nats) => Ok(nats), + Err(nats_arc) => { + self.nats = nats_arc; + Err(self) + } + } + } + + pub fn cluster_id(mut self, cluster_id: String) -> Self { + self.cluster_id = Some(cluster_id); + self + } + + fn setup_hb(&self) { + let nats_hb = Arc::clone(&self.nats); + tokio_executor::spawn( + self.nats + .subscribe( + commands::SubCommand::builder() + .subject((*self.config.read()).hb_subject.clone()) + .build() + .unwrap(), + ).from_err() + .and_then(|hb_stream| { + hb_stream + .for_each(move |msg| { + if let Some(reply) = msg.reply_to { + Either::A( + nats_hb.publish(commands::PubCommand::builder().subject(reply).build().unwrap()), + ) + } else { + Either::B(future::err(NatsError::ServerDisconnected(None))) + } + }).into_future() + }).map(|_| ()) + .map_err(|_| ()), + ); + } + + fn setup_ack(&self) { + let ack_arc = Arc::clone(&self.ack); + tokio_executor::spawn( + self.nats + .subscribe( + commands::SubCommand::builder() + .subject((*self.config.read()).ack_subject.clone()) + .build() + .unwrap(), + ).from_err() + .and_then(|ack_stream| { + ack_stream + .from_err() + .for_each(move |msg| match streaming::PubAck::decode(msg.payload) { + Ok(ack) => { + if let Some(inner_ack) = (*ack_arc.write()).remove(&ack.guid) { + let err = if ack.error.len() > 0 { + Some(ack.error.clone()) + } else { + None + }; + + future::result( + inner_ack.send(err).map_err(|_| NatsStreamingError::CannotAck(ack.guid)), + ) + } else { + future::ok(()) + } + } + Err(decode_err) => future::err(decode_err.into()), + }).into_future() + }).map(|_| ()) + .map_err(|_| ()), + ); + } + + pub fn connect(self) -> impl Future { + if self.cluster_id.is_none() { + return Either::A(future::err(NatsStreamingError::MissingClusterId)); + } + + self.setup_hb(); + self.setup_ack(); + + let connect_buf = match Self::encode_message(streaming::ConnectRequest { + client_id: self.client_id.clone(), + heartbeat_inbox: (*self.config.read()).hb_subject.clone(), + ..Default::default() + }) { + Ok(buf) => buf, + Err(e) => { + return Either::A(future::err(e.into())); + } + }; + + Either::B( + self.nats + .request( + format!("{}.{}", DISCOVER_PREFIX, self.cluster_id.clone().unwrap()), + connect_buf, + ).from_err() + .and_then(move |msg| { + future::result(streaming::ConnectResponse::decode(&msg.payload).map_err(|e| e.into())) + }).map(move |resp| { + (*self.config.write()).assign_from_connect_resp(resp); + + self + }), + ) + } + + pub fn publish(&self, subject: String, payload: Bytes) -> impl Future { + let internal_subject = format!("{}.{}", (*self.config.read()).pub_prefix, subject); + let guid = Self::generate_guid(); + + let pub_buf = match Self::encode_message(streaming::PubMsg { + client_id: self.client_id.clone(), + guid: guid.clone(), + subject, + data: payload.to_vec(), + ..Default::default() + }) { + Ok(buf) => buf, + Err(e) => { + return Either::A(future::err(e.into())); + } + }; + + let (tx, rx) = oneshot::channel(); + + (*self.ack.write()).insert(guid, tx); + + let pub_cmd = commands::PubCommand::builder() + .subject(internal_subject) + .payload(pub_buf) + .reply_to(Some((*self.config.read()).ack_subject.clone())) + .build() + .unwrap(); + + Either::B( + self.nats + .publish(pub_cmd) + .and_then(|_| rx.into_future().map_err(|_| NatsError::InnerBrokenChain)) + .from_err() + .and_then(|maybe_err| { + if let Some(err) = maybe_err { + future::err(NatsStreamingError::CannotAck(err)) + } else { + future::ok(()) + } + }), + ) + } + + pub fn subscribe( + &self, + subject: String, + mut opts: SubscribeOptions, + ) -> impl Future { + let sub_inbox = Self::generate_guid(); + let mut sub_request = streaming::SubscriptionRequest { + client_id: self.client_id.clone(), + subject: subject.clone(), + inbox: sub_inbox.clone(), + max_in_flight: opts.max_in_flight, + ack_wait_in_secs: opts.ack_max_wait_in_secs, + start_position: opts.start_position as i32, + ..Default::default() + }; + + if let Some(qgroup) = opts.queue_group.take() { + sub_request.q_group = qgroup; + } + + if let Some(durable_name) = opts.durable_name.take() { + sub_request.durable_name = durable_name; + } + + match opts.start_position { + streaming::StartPosition::TimeDeltaStart => { + if let Some(start_sequence_delta) = opts.start_sequence_delta.take() { + sub_request.start_time_delta = start_sequence_delta; + } + } + streaming::StartPosition::SequenceStart => { + if let Some(start_sequence) = opts.start_sequence.take() { + sub_request.start_sequence = start_sequence; + } + } + _ => (), + } + + let sub_request_buf = match Self::encode_message(sub_request) { + Ok(buf) => buf, + Err(e) => { + return Either::A(future::err(e.into())); + } + }; + + let sub_command = commands::SubCommand::builder().subject(sub_inbox).build().unwrap(); + + let sub_sid = sub_command.sid.clone(); + + let nats = Arc::clone(&self.nats); + let nats_ack = Arc::clone(&self.nats); + let sub_requests = self.config.read().sub_requests.clone(); + let client_id = self.client_id.clone(); + let sub_config = Arc::clone(&self.config); + + Either::B(self.nats.subscribe(sub_command).from_err().and_then(move |sub_stream| { + nats.request(sub_requests, sub_request_buf) + .from_err() + .and_then(|msg| { + future::result(streaming::SubscriptionResponse::decode(&msg.payload).map_err(|e| e.into())) + }).and_then(move |resp| { + let ack_inbox = resp.ack_inbox; + + let (tx, rx) = futures::sync::mpsc::unbounded(); + + let settings = StreamingSubscriptionSettings::builder() + .sid(sub_sid) + .subject(subject) + .ack_inbox(ack_inbox.clone()) + .client_id(client_id) + .build() + .unwrap(); + + tokio_executor::spawn( + sub_stream + .map_err(|e| NatsStreamingError::from(e)) + .and_then(move |msg| { + let msg_pbuf = match streaming::MsgProto::decode(&msg.payload) { + Ok(msg) => msg, + Err(e) => { + return Either::A(future::err(e.into())); + } + }; + + let sub_request_buf = match Self::encode_message(streaming::Ack { + subject: msg_pbuf.subject.clone(), + sequence: msg_pbuf.sequence.clone(), + ..Default::default() + }) { + Ok(buf) => buf, + Err(e) => { + return Either::A(future::err(e.into())); + } + }; + + let ack_pub_msg = commands::PubCommand::builder() + .subject(ack_inbox.clone()) + .payload(sub_request_buf) + .build() + .unwrap(); + + Either::B(nats_ack.publish(ack_pub_msg).from_err().map(move |_| msg_pbuf)) + }).forward(tx) + .map(|_| ()) + .map_err(|_| ()), + ); + + future::ok(StreamingSubscription::new(Arc::clone(&nats), sub_config, rx, settings)) + }) + })) + } + + /*pub fn close(self) -> impl Future { + + }*/ +} diff --git a/src/streaming/streaming_protocol.rs b/src/streaming/streaming_protocol.rs new file mode 100644 index 0000000..3d88e54 --- /dev/null +++ b/src/streaming/streaming_protocol.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/nats.streaming.rs")); diff --git a/src/streaming/subscription.rs b/src/streaming/subscription.rs new file mode 100644 index 0000000..d27c44e --- /dev/null +++ b/src/streaming/subscription.rs @@ -0,0 +1,110 @@ +use super::{client::NatsStreamingClientConfiguration, error::NatsStreamingError, streaming_protocol as streaming}; +use bytes::BytesMut; +use client::NatsClient; +use futures::{ + future::{self, Either}, + prelude::*, + sync::mpsc::UnboundedReceiver, +}; +use parking_lot::RwLock; +use prost::Message; +use protocol::commands; +use std::sync::Arc; + +#[derive(Debug, Clone, Builder, Default)] +pub(crate) struct StreamingSubscriptionSettings { + sid: String, + subject: String, + ack_inbox: String, + client_id: String, +} + +impl StreamingSubscriptionSettings { + pub fn builder() -> StreamingSubscriptionSettingsBuilder { + StreamingSubscriptionSettingsBuilder::default() + } +} + +#[derive(Debug)] +pub struct StreamingSubscription { + nats: Arc, + config: Arc>, + rx: UnboundedReceiver, + settings: StreamingSubscriptionSettings, +} + +impl Stream for StreamingSubscription { + type Item = streaming::MsgProto; + type Error = NatsStreamingError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.rx.poll().map_err(|_| NatsStreamingError::SubscriptionError) + } +} + +impl StreamingSubscription { + pub(crate) fn new( + nats: Arc, + config: Arc>, + rx: UnboundedReceiver, + settings: StreamingSubscriptionSettings, + ) -> Self { + StreamingSubscription { + nats, + config, + rx, + settings, + } + } + + fn unsub_or_close(self, close: bool) -> impl Future { + let subject = if close { + self.config.read().sub_close_requests.clone() + } else { + self.config.read().unsub_requests.clone() + }; + + let unsub_cmd = commands::UnsubCommand::builder() + .sid(self.settings.sid.clone()) + .build() + .unwrap(); + + self.nats + .unsubscribe(unsub_cmd) + .from_err() + .and_then(move |_| { + let unsub_req = streaming::UnsubscribeRequest { + client_id: self.settings.client_id, + subject: self.settings.subject, + inbox: self.settings.ack_inbox, + ..Default::default() + }; + + let mut buf = BytesMut::with_capacity(unsub_req.encoded_len()); + match unsub_req.encode(&mut buf) { + Err(encode_err) => { + return Either::B(future::err(encode_err.into())); + }, + _ => () + } + + Either::A(self.nats.request(subject, buf.freeze()).from_err()) + }).and_then(|msg| { + future::result(streaming::SubscriptionResponse::decode(&msg.payload).map_err(|e| e.into())) + }).and_then(|sub_res| { + if sub_res.error.len() > 0 { + future::err(NatsStreamingError::ServerError(sub_res.error)) + } else { + future::ok(()) + } + }) + } + + pub fn unsubscribe(self) -> impl Future { + self.unsub_or_close(false) + } + + pub fn close(self) -> impl Future { + self.unsub_or_close(true) + } +} From 39cc55fd25459591d5d4ed922024cbbf055ca1d0 Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Wed, 16 Jan 2019 14:56:01 -0600 Subject: [PATCH 2/6] Make streaming_protocol code public (you need it in order to consume). --- src/streaming/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/streaming/mod.rs b/src/streaming/mod.rs index 5e7f8e6..8b66a46 100644 --- a/src/streaming/mod.rs +++ b/src/streaming/mod.rs @@ -1,7 +1,6 @@ -pub(crate) mod streaming_protocol; - pub mod error; mod streaming_client; +pub mod streaming_protocol; mod subscription; pub mod client { From a6faf301a9259960cb0183fb26f94c1e08cf4a45 Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Wed, 16 Jan 2019 22:07:40 -0600 Subject: [PATCH 3/6] Client IDs can not contain '.'. --- src/streaming/streaming_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streaming/streaming_client.rs b/src/streaming/streaming_client.rs index 1e72a2e..721d545 100644 --- a/src/streaming/streaming_client.rs +++ b/src/streaming/streaming_client.rs @@ -72,7 +72,7 @@ impl From for NatsStreamingClient { NatsStreamingClient { nats: Arc::new(client), ack: Arc::new(RwLock::new(HashMap::new())), - client_id: format!("nitox.streaming.{}", Self::generate_guid()), + client_id: format!("nitox_streaming_{}", Self::generate_guid()), cluster_id: None, config: Arc::new(RwLock::new(NatsStreamingClientConfiguration { hb_subject: Self::generate_guid(), From cd3c6867ec3a53392829ca4282ba3c3ffe952ece Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Thu, 17 Jan 2019 23:53:24 -0600 Subject: [PATCH 4/6] This may do the trick for manual & auto ack (defaults to auto). --- src/streaming/streaming_client.rs | 120 +++++++++++++++++------------- src/streaming/subscription.rs | 34 ++++++++- 2 files changed, 101 insertions(+), 53 deletions(-) diff --git a/src/streaming/streaming_client.rs b/src/streaming/streaming_client.rs index 721d545..4e44fa5 100644 --- a/src/streaming/streaming_client.rs +++ b/src/streaming/streaming_client.rs @@ -1,10 +1,4 @@ -use super::{ - client::{StreamingSubscription, StreamingSubscriptionSettings}, - error::NatsStreamingError, - streaming_protocol as streaming, -}; use bytes::{Bytes, BytesMut}; -use client::NatsClient; use futures::{ future::{self, Either}, prelude::*, @@ -12,11 +6,20 @@ use futures::{ }; use parking_lot::RwLock; use prost::Message; -use protocol::commands; use rand::Rng; use std::collections::HashMap; use std::sync::Arc; -use NatsError; + +use crate::{ + NatsError, + client::NatsClient, + protocol::commands, + streaming::{ + client::{StreamingMessage, StreamingSubscription, StreamingSubscriptionSettings}, + error::NatsStreamingError, + streaming_protocol as streaming, + }, +}; static DISCOVER_PREFIX: &'static str = "_STAN.discover"; static ACK_PREFIX: &'static str = "_NITOX.acks"; @@ -52,12 +55,29 @@ pub struct SubscribeOptions { max_in_flight: i32, #[builder(default = "30000")] ack_max_wait_in_secs: i32, + ack_mode: SubscriptionAckMode, durable_name: Option, start_position: streaming::StartPosition, start_sequence: Option, start_sequence_delta: Option, } +/// Control whether a subscription will automatically or manually ack received messages. +/// +/// By default, subscriptions will be setup to automatically ack messages which are received on +/// the stream. Use `Manual` mode to control the acks yourself. +#[derive(Debug, Clone)] +pub enum SubscriptionAckMode { + Auto, + Manual, +} + +impl Default for SubscriptionAckMode { + fn default() -> Self { + SubscriptionAckMode::Auto + } +} + #[derive(Debug)] pub struct NatsStreamingClient { pub(crate) nats: Arc, @@ -250,11 +270,9 @@ impl NatsStreamingClient { ) } - pub fn subscribe( - &self, - subject: String, - mut opts: SubscribeOptions, - ) -> impl Future { + pub fn subscribe(&self, subject: String, mut opts: SubscribeOptions) + -> impl Future + { let sub_inbox = Self::generate_guid(); let mut sub_request = streaming::SubscriptionRequest { client_id: self.client_id.clone(), @@ -304,6 +322,7 @@ impl NatsStreamingClient { let sub_requests = self.config.read().sub_requests.clone(); let client_id = self.client_id.clone(); let sub_config = Arc::clone(&self.config); + let ack_mode = opts.ack_mode.clone(); Either::B(self.nats.subscribe(sub_command).from_err().and_then(move |sub_stream| { nats.request(sub_requests, sub_request_buf) @@ -311,52 +330,53 @@ impl NatsStreamingClient { .and_then(|msg| { future::result(streaming::SubscriptionResponse::decode(&msg.payload).map_err(|e| e.into())) }).and_then(move |resp| { - let ack_inbox = resp.ack_inbox; + // Setup sink for decoding received messages & auto acking if needed. let (tx, rx) = futures::sync::mpsc::unbounded(); + let ack_inbox_autoack = resp.ack_inbox.clone(); + tokio_executor::spawn(sub_stream + .map_err(|e| NatsStreamingError::from(e)) + .and_then(move |msg| { + let msg_pbuf = match streaming::MsgProto::decode(&msg.payload) { + Ok(msg) => msg, + Err(e) => { + return Err(NatsStreamingError::from(e)); + } + }; + + let sub_request_buf = match Self::encode_message(streaming::Ack { + subject: msg_pbuf.subject.clone(), + sequence: msg_pbuf.sequence.clone(), + ..Default::default() + }) { + Ok(buf) => buf, + Err(e) => { + return Err(NatsStreamingError::from(e)); + } + }; + + let ack_pub_msg = commands::PubCommand::builder() + .subject(ack_inbox_autoack.clone()) + .payload(sub_request_buf) + .build() + .unwrap(); + + Ok((StreamingMessage::new(msg_pbuf, Some((nats_ack.clone(), ack_pub_msg))), ack_mode.clone())) + }) + .and_then(|(mut stream_msg, ack_mode)| match ack_mode { + SubscriptionAckMode::Auto => Either::A(stream_msg.ack().map(move |_| stream_msg)), + SubscriptionAckMode::Manual => Either::B(future::ok(()).map(move |_| stream_msg)), + }) + .forward(tx).map(|_| ()).map_err(|_| ()) + ); let settings = StreamingSubscriptionSettings::builder() .sid(sub_sid) .subject(subject) - .ack_inbox(ack_inbox.clone()) + .ack_inbox(resp.ack_inbox) .client_id(client_id) .build() .unwrap(); - - tokio_executor::spawn( - sub_stream - .map_err(|e| NatsStreamingError::from(e)) - .and_then(move |msg| { - let msg_pbuf = match streaming::MsgProto::decode(&msg.payload) { - Ok(msg) => msg, - Err(e) => { - return Either::A(future::err(e.into())); - } - }; - - let sub_request_buf = match Self::encode_message(streaming::Ack { - subject: msg_pbuf.subject.clone(), - sequence: msg_pbuf.sequence.clone(), - ..Default::default() - }) { - Ok(buf) => buf, - Err(e) => { - return Either::A(future::err(e.into())); - } - }; - - let ack_pub_msg = commands::PubCommand::builder() - .subject(ack_inbox.clone()) - .payload(sub_request_buf) - .build() - .unwrap(); - - Either::B(nats_ack.publish(ack_pub_msg).from_err().map(move |_| msg_pbuf)) - }).forward(tx) - .map(|_| ()) - .map_err(|_| ()), - ); - future::ok(StreamingSubscription::new(Arc::clone(&nats), sub_config, rx, settings)) }) })) diff --git a/src/streaming/subscription.rs b/src/streaming/subscription.rs index d27c44e..db6055c 100644 --- a/src/streaming/subscription.rs +++ b/src/streaming/subscription.rs @@ -29,12 +29,12 @@ impl StreamingSubscriptionSettings { pub struct StreamingSubscription { nats: Arc, config: Arc>, - rx: UnboundedReceiver, + rx: UnboundedReceiver, settings: StreamingSubscriptionSettings, } impl Stream for StreamingSubscription { - type Item = streaming::MsgProto; + type Item = StreamingMessage; type Error = NatsStreamingError; fn poll(&mut self) -> Poll, Self::Error> { @@ -46,7 +46,7 @@ impl StreamingSubscription { pub(crate) fn new( nats: Arc, config: Arc>, - rx: UnboundedReceiver, + rx: UnboundedReceiver, settings: StreamingSubscriptionSettings, ) -> Self { StreamingSubscription { @@ -108,3 +108,31 @@ impl StreamingSubscription { self.unsub_or_close(true) } } + +/// A message coming from a subscription stream. +#[derive(Debug)] +pub struct StreamingMessage { + /// The protobuf message from the Nats stream. + pub proto: streaming::MsgProto, + + /// The data used for acking this message. + ack: Option<(Arc, commands::PubCommand)>, +} + +impl StreamingMessage { + pub fn new(proto: streaming::MsgProto, ack: Option<(Arc, commands::PubCommand)>) -> Self { + StreamingMessage{proto, ack} + } + + /// Ack this message. + /// + /// If this message came from a stream configured with `SubscriptionAckMode::Auto`, then this + /// will be a no-op returning an immediately resolved `future::ok(())`. + pub fn ack(&mut self) -> impl Future { + if let Some((client, ack_cmd)) = self.ack.take() { + Either::A(client.publish(ack_cmd).from_err()) + } else { + Either::B(future::ok(())) + } + } +} From 85a41073923a928cfc5ed88decc12b8be54997b0 Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Fri, 18 Jan 2019 00:19:38 -0600 Subject: [PATCH 5/6] A small update to some docs. --- src/streaming/streaming_client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/streaming/streaming_client.rs b/src/streaming/streaming_client.rs index 4e44fa5..4dbcf6e 100644 --- a/src/streaming/streaming_client.rs +++ b/src/streaming/streaming_client.rs @@ -65,7 +65,8 @@ pub struct SubscribeOptions { /// Control whether a subscription will automatically or manually ack received messages. /// /// By default, subscriptions will be setup to automatically ack messages which are received on -/// the stream. Use `Manual` mode to control the acks yourself. +/// the stream. Use `Manual` mode to control the acks yourself. Simply call `StreamingMessage.ack` +/// to ack a message. #[derive(Debug, Clone)] pub enum SubscriptionAckMode { Auto, From b52bb6dc8fa87bfa9e841fed3542950a6112ef6e Mon Sep 17 00:00:00 2001 From: Mathieu Amiot Date: Mon, 4 Mar 2019 23:14:04 +0100 Subject: [PATCH 6/6] Added support for verbose mode + test --- src/client/ack_trigger.rs | 65 +++++++++++++++++++++++++++++++++++++++ src/client/client.rs | 31 ++++++++++--------- src/client/mod.rs | 2 ++ src/client/multiplexer.rs | 6 +++- src/client/sender.rs | 36 ++++++++++++++++------ src/lib.rs | 2 -- tests/all.rs | 29 +++++++++++++++++ 7 files changed, 144 insertions(+), 27 deletions(-) create mode 100644 src/client/ack_trigger.rs diff --git a/src/client/ack_trigger.rs b/src/client/ack_trigger.rs new file mode 100644 index 0000000..64f31af --- /dev/null +++ b/src/client/ack_trigger.rs @@ -0,0 +1,65 @@ +use error::NatsError; +use futures::prelude::*; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +#[derive(Debug, Default)] +pub struct AckTrigger(Arc); + +impl AckTrigger { + #[inline(always)] + fn store(&self, val: bool) { + self.0.store(val, Ordering::SeqCst); + } + + #[inline(always)] + pub fn pull_down(&self) { + self.store(false); + } + + #[inline(always)] + pub fn pull_up(&self) { + self.store(true); + } + + #[inline(always)] + pub fn is_up(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + + #[inline(always)] + pub fn fire(self) -> impl Future + Send + Sync { + self.pull_down(); + self + } +} + +impl From for AckTrigger { + fn from(v: bool) -> Self { + AckTrigger(Arc::new(AtomicBool::from(v))) + } +} + +impl Clone for AckTrigger { + #[inline(always)] + fn clone(&self) -> Self { + AckTrigger(Arc::clone(&self.0)) + } +} + +impl Future for AckTrigger { + type Item = (); + type Error = NatsError; + + #[inline(always)] + fn poll(&mut self) -> Result, Self::Error> { + Ok(if self.is_up() { + Async::Ready(()) + } else { + futures::task::current().notify(); + Async::NotReady + }) + } +} diff --git a/src/client/client.rs b/src/client/client.rs index 1fe1fd7..cbd766a 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -10,15 +10,12 @@ use parking_lot::RwLock; use std::{ net::{SocketAddr, ToSocketAddrs}, str::FromStr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, }; use tokio_executor; use url::Url; -use super::{NatsClientMultiplexer, NatsClientSender, NatsSink, NatsStream}; +use super::{AckTrigger, NatsClientMultiplexer, NatsClientSender, NatsSink, NatsStream}; use error::NatsError; use net::*; use protocol::{commands::*, Op}; @@ -44,10 +41,6 @@ impl NatsClientOptions { pub struct NatsClient { /// Backup of options pub(crate) opts: NatsClientOptions, - /// Ack for verbose - got_ack: Arc, - /// Verbose setting - verbose: AtomicBool, /// Server info server_info: Arc>>, /// Stream of the messages that are not caught for subscriptions (only system messages like PING/PONG should be here) @@ -112,7 +105,11 @@ impl NatsClient { .and_then(move |connection| { let (sink, stream): (NatsSink, NatsStream) = connection.split(); let (rx, other_rx) = NatsClientMultiplexer::new(stream); - let tx = NatsClientSender::new(sink); + + let is_verbose = opts.connect_command.verbose; + let trigger = AckTrigger::from(is_verbose); + let mut tx = NatsClientSender::new(sink, trigger.clone()); + tx.set_verbose(is_verbose); let (tmp_other_tx, tmp_other_rx) = mpsc::unbounded(); let tx_inner = tx.clone(); @@ -121,29 +118,33 @@ impl NatsClient { server_info: Arc::new(RwLock::new(None)), other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)), rx: Arc::new(rx), - verbose: AtomicBool::from(opts.connect_command.verbose), - got_ack: Arc::new(AtomicBool::default()), opts, }; let server_info_arc = Arc::clone(&client.server_info); - let ack_arc = Arc::clone(&client.got_ack); - let is_verbose = client.verbose.load(Ordering::SeqCst); tokio_executor::spawn( other_rx .for_each(move |op| { + debug!(target: "nitox", "Got rest OP from multiplexer: {:?}", op); match op { Op::PING => { + debug!(target: "nitox", "Got PING +PING"); tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ())); let _ = tmp_other_tx.unbounded_send(op); } Op::INFO(server_info) => { + debug!(target: "nitox", "Got server info"); *server_info_arc.write() = Some(server_info); } Op::OK => { + debug!(target: "nitox", "Got verbose ack +OK"); if is_verbose { - ack_arc.store(true, Ordering::SeqCst); + debug!(target: "nitox", "Verbose mode is enabled, pulling up trigger"); + trigger.pull_up(); + debug!(target: "nitox", "Verbose mode trigger is pulled up"); + } else { + debug!(target: "nitox", "Verbose mode is NOT enabled, that's weird"); } } op => { diff --git a/src/client/mod.rs b/src/client/mod.rs index 7e54710..126795c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -8,6 +8,8 @@ type NatsStream = stream::SplitStream; /// Useless pretty much, just for code semantics type NatsSubscriptionId = String; +mod ack_trigger; +pub(crate) use self::ack_trigger::*; mod sender; pub(crate) use self::sender::*; mod multiplexer; diff --git a/src/client/multiplexer.rs b/src/client/multiplexer.rs index dad4622..60efa69 100644 --- a/src/client/multiplexer.rs +++ b/src/client/multiplexer.rs @@ -23,6 +23,7 @@ pub(crate) struct NatsClientMultiplexer { } impl NatsClientMultiplexer { + /// Creates the multiplexer from a general, unfiltered stream of events pub fn new(stream: NatsStream) -> (Self, mpsc::UnboundedReceiver) { let subs_tx: Arc>> = Arc::new(RwLock::new(HashMap::default())); @@ -52,7 +53,8 @@ impl NatsClientMultiplexer { } future::ok::<(), NatsError>(()) - }).map(|_| ()) + }) + .map(|_| ()) .map_err(|_| ()); tokio_executor::spawn(work_tx); @@ -60,6 +62,7 @@ impl NatsClientMultiplexer { (NatsClientMultiplexer { subs_tx, other_tx }, other_rx) } + /// Creates a stream for a subscription ID pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream + Send + Sync { let (tx, rx) = mpsc::unbounded(); (*self.subs_tx.write()).insert( @@ -74,6 +77,7 @@ impl NatsClientMultiplexer { rx.map_err(|_| NatsError::InnerBrokenChain) } + /// Remove a subscription ID from the multiplexer. Also fuses the stream pub fn remove_sid(&self, sid: &str) { (*self.subs_tx.write()).remove(sid); } diff --git a/src/client/sender.rs b/src/client/sender.rs index 4763952..f9a9ff4 100644 --- a/src/client/sender.rs +++ b/src/client/sender.rs @@ -1,38 +1,56 @@ -use futures::{prelude::*, sync::mpsc, Future}; +use futures::{future::Either, prelude::*, sync::mpsc, Future}; use tokio_executor; -use super::NatsSink; +use super::{AckTrigger, NatsSink}; use error::NatsError; use protocol::Op; -/// Keep-alive for the sink, also supposed to take care of handling verbose messaging, but can't for now +/// Keep-alive for the sink, also takes care of handling verbose signaling #[derive(Clone, Debug)] pub(crate) struct NatsClientSender { tx: mpsc::UnboundedSender, verbose: bool, + trigger: AckTrigger, } impl NatsClientSender { - pub fn new(sink: NatsSink) -> Self { + pub fn new(sink: NatsSink, trigger: AckTrigger) -> Self { let (tx, rx) = mpsc::unbounded(); let rx = rx.map_err(|_| NatsError::InnerBrokenChain); let work = sink.send_all(rx).map(|_| ()).map_err(|_| ()); tokio_executor::spawn(work); - NatsClientSender { tx, verbose: false } + NatsClientSender { + tx, + verbose: false, + trigger, + } } - #[allow(dead_code)] pub fn set_verbose(&mut self, verbose: bool) { self.verbose = verbose; } /// Sends an OP to the server pub fn send(&self, op: Op) -> impl Future { - //let _verbose = self.verbose.clone(); - self.tx + debug!(target: "nitox", "Sending OP: {:?}", op); + debug!(target: "nitox", "Sender is verbose: {}", self.verbose); + + let fut = self + .tx .unbounded_send(op) .map_err(|_| NatsError::InnerBrokenChain) - .into_future() + .into_future(); + + if !self.verbose { + return Either::A(fut); + } + + debug!(target: "nitox", "Verbose mode is enabled, will try firing trigger"); + let trigger = self.trigger.clone(); + Either::B(fut.and_then(move |_| { + debug!(target: "nitox", "Command sent, now pulling down and firing trigger"); + trigger.fire() + })) } } diff --git a/src/lib.rs b/src/lib.rs index 6f35705..a212bca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,8 +108,6 @@ extern crate prost_derive; #[macro_use] mod error; -// TODO: Handle verbose mode - pub use self::error::*; pub mod codec; mod protocol; diff --git a/tests/all.rs b/tests/all.rs index c016bef..a4082db 100644 --- a/tests/all.rs +++ b/tests/all.rs @@ -397,6 +397,35 @@ fn can_request_a_lot_pedantic() { assert!(connection_result.is_ok()); } +#[test] +fn can_take_verbose_in_account() { + elog!(); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + + let connect_cmd = ConnectCommand::builder().verbose(true).build().unwrap(); + let options = NatsClientOptions::builder() + .connect_command(connect_cmd) + .cluster_uri("127.0.0.1:4222") + .build() + .unwrap(); + + let connection = NatsClient::from_options(options) + .and_then(|client| client.connect()) + .and_then(|client| { + client + .publish(PubCommand::builder().subject("foo").payload("bar").build().unwrap()) + .and_then(move |_| futures::future::ok(client)) + }); + + let (tx, rx) = oneshot::channel(); + runtime.spawn(connection.then(move |r| tx.send(r).map_err(|e| panic!("Cannot send Result {:?}", e)))); + let connection_result = rx.wait().expect("Cannot wait for a result"); + let _ = runtime.shutdown_now().wait(); + debug!(target: "nitox", "can_take_verbose_in_account::connection_result {:#?}", connection_result); + assert!(connection_result.is_ok()); +} + #[test] fn can_pong_to_ping() { elog!();