Skip to content

Commit

Permalink
feat: allow enabling jetstream for nats (#268)
Browse files Browse the repository at this point in the history
NATS has a built-in persistence engine called JetStream which enables
messages to be stored and replayed at a later time.

https://docs.nats.io/nats-concepts/jetstream
  • Loading branch information
cedricziel authored Jan 13, 2025
1 parent 0ed99c2 commit d4a6151
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 2 deletions.
10 changes: 8 additions & 2 deletions examples/nats.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use async_nats::connect;
use futures::StreamExt;
use testcontainers_modules::{nats::Nats, testcontainers::runners::AsyncRunner};
use testcontainers_modules::{
nats::{Nats, NatsServerCmd},
testcontainers::{runners::AsyncRunner, ImageExt},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
// optional: customize command, here we enable jetstream
let nats_cmd = NatsServerCmd::default().with_jetstream();

// startup the module
let node = Nats::default().start().await?;
let node = Nats::default().with_cmd(&nats_cmd).start().await?;

// default docker username/password
let default_username = "ruser";
Expand Down
103 changes: 103 additions & 0 deletions src/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct Nats {
pub struct NatsServerCmd {
user: Option<String>,
pass: Option<String>,

jetstream: Option<bool>,
}

impl NatsServerCmd {
Expand All @@ -35,6 +37,22 @@ impl NatsServerCmd {
self.pass = Some(password.to_owned());
self
}

/// Enable JetStream in the Nats server to use the built-in persistence
/// features of NATS.
///
/// See: https://docs.nats.io/nats-concepts/jetstream
///
/// Example:
/// ```rust,ignore
/// # use testcontainers_modules::nats::{Nats, NatsServerCmd};
/// let nats_cmd = NatsServerCmd::default().with_jetstream();
/// let node = Nats::default().with_cmd(&nats_cmd).start().await?;
/// ```
pub fn with_jetstream(mut self) -> Self {
self.jetstream = Some(true);
self
}
}

impl IntoIterator for &NatsServerCmd {
Expand All @@ -53,6 +71,12 @@ impl IntoIterator for &NatsServerCmd {
args.push(pass.to_owned())
}

if let Some(ref jetstream) = self.jetstream {
if *jetstream {
args.push("--jetstream".to_owned());
}
}

args.into_iter()
}
}
Expand Down Expand Up @@ -80,6 +104,9 @@ impl Image for Nats {

#[cfg(test)]
mod tests {
use std::time::Duration;

use async_nats::jetstream::{self, consumer::PushConsumer};
use futures::StreamExt;
use testcontainers::{runners::AsyncRunner, ImageExt};

Expand All @@ -99,9 +126,17 @@ mod tests {
let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
}

#[test]
fn enable_jetstream() {
let nats_cmd_args = NatsServerCmd::default().with_jetstream();
assert_eq!(nats_cmd_args.jetstream, Some(true));
let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
}

#[tokio::test]
async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
let container = Nats::default().start().await?;

let host = container.get_host().await?;
let host_port = container.get_host_port_ipv4(4222).await?;
let url = format!("{host}:{host_port}");
Expand All @@ -126,4 +161,72 @@ mod tests {
assert_eq!(message.payload, "data");
Ok(())
}

#[tokio::test]
/// Show how to use the Nats module with the Jetstream feature.
/// See: https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs
async fn it_works_with_jetstream() -> Result<(), Box<dyn std::error::Error + 'static>> {
let nats_cmd = NatsServerCmd::default().with_jetstream();
let container = Nats::default().with_cmd(&nats_cmd).start().await?;

let host = container.get_host().await?;
let host_port = container.get_host_port_ipv4(4222).await?;
let url = format!("{host}:{host_port}");

let nats_client = async_nats::ConnectOptions::default()
.connect(url)
.await
.expect("failed to connect to nats server");

let inbox = nats_client.new_inbox();

let jetstream = jetstream::new(nats_client);

let stream_name = String::from("EVENTS");

let consumer: PushConsumer = jetstream
.create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec!["events.>".to_string()],
..Default::default()
})
.await?
.create_consumer(jetstream::consumer::push::Config {
deliver_subject: inbox.clone(),
inactive_threshold: Duration::from_secs(60),
..Default::default()
})
.await?;

// Publish a few messages for the example.
for i in 0..10 {
jetstream
.publish(format!("events.{i}"), "data".into())
.await?
.await?;
}

let mut messages_processed = 0;

let mut messages = consumer.messages().await?.take(10);

// Iterate over messages.
while let Some(message) = messages.next().await {
let message = message?;

assert_eq!(
message.subject.to_string(),
format!("events.{messages_processed}")
);

// acknowledge the message
message.ack().await.unwrap();

messages_processed += 1;
}

assert_eq!(messages_processed, 10);

Ok(())
}
}

0 comments on commit d4a6151

Please sign in to comment.