From 11308179d2b795446d9f13011130c20fe7ef42d2 Mon Sep 17 00:00:00 2001 From: Negezor Date: Fri, 12 Apr 2024 22:51:48 +1100 Subject: [PATCH 1/3] chore(deps): bump redis to 0.25 --- sea-streamer-redis/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sea-streamer-redis/Cargo.toml b/sea-streamer-redis/Cargo.toml index a3593ed..38c5e32 100644 --- a/sea-streamer-redis/Cargo.toml +++ b/sea-streamer-redis/Cargo.toml @@ -23,7 +23,7 @@ flume = { version = "0.10", default-features = false, features = ["async"] } lazy_static = { version = "1.4" } log = { version = "0.4", default-features = false } mac_address = { version = "1" } -redis = { version = "0.22", default-features = false, features = ["acl", "streams"] } +redis = { version = "0.25", default-features = false, features = ["acl", "streams"] } sea-streamer-types = { version = "0.3", path = "../sea-streamer-types" } sea-streamer-runtime = { version = "0.3", path = "../sea-streamer-runtime" } structopt = { version = "0.3", optional = true } From dd99ff145b42424337686206522b83107674bba1 Mon Sep 17 00:00:00 2001 From: Negezor Date: Fri, 12 Apr 2024 22:53:51 +1100 Subject: [PATCH 2/3] chore(redis): rename redis async-std tls feature flag --- sea-streamer-redis/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sea-streamer-redis/Cargo.toml b/sea-streamer-redis/Cargo.toml index 38c5e32..fe6602e 100644 --- a/sea-streamer-redis/Cargo.toml +++ b/sea-streamer-redis/Cargo.toml @@ -38,7 +38,7 @@ test = ["anyhow", "async-std?/attributes", "tokio?/full", "env_logger"] executables = ["anyhow", "env_logger", "structopt", "runtime-tokio", "tokio/full"] runtime-async-std = ["async-std", "redis/async-std-comp", "sea-streamer-runtime/runtime-async-std"] runtime-tokio = ["tokio", "redis/tokio-comp", "sea-streamer-runtime/runtime-tokio"] -runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-tls-comp"] +runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-native-tls-comp"] runtime-tokio-native-tls = ["runtime-tokio", "redis/tokio-native-tls-comp"] [[bin]] From 2ad8d7108b3b1d3d2a2e5b2f75658bc7b1b75240 Mon Sep 17 00:00:00 2001 From: Negezor Date: Fri, 12 Apr 2024 23:02:47 +1100 Subject: [PATCH 3/3] refactor: use aio::MultiplexedConnection instead of deprecated aio::Connection --- sea-streamer-redis/src/cluster.rs | 11 +++++++---- sea-streamer-redis/src/connection.rs | 13 +++++++------ sea-streamer-redis/src/consumer/node.rs | 20 ++++++++++++++++---- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/sea-streamer-redis/src/cluster.rs b/sea-streamer-redis/src/cluster.rs index e023814..ed85c9b 100644 --- a/sea-streamer-redis/src/cluster.rs +++ b/sea-streamer-redis/src/cluster.rs @@ -82,7 +82,7 @@ impl RedisCluster { } /// Get any available connection to the cluster - pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::Connection)> { + pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> { for (node, inner) in self.conn.iter_mut() { if let Ok(conn) = inner.try_get() { return Ok((node, conn)); @@ -93,7 +93,10 @@ impl RedisCluster { #[inline] /// Get a connection to the specific node, will wait and retry a few times until dead. - pub async fn get(&mut self, node: &NodeId) -> RedisResult<&mut redis::aio::Connection> { + pub async fn get( + &mut self, + node: &NodeId, + ) -> RedisResult<&mut redis::aio::MultiplexedConnection> { Self::get_connection(&mut self.conn, &self.options, node).await } @@ -103,7 +106,7 @@ impl RedisCluster { pub async fn get_connection_for( &mut self, key: &str, - ) -> RedisResult<(&NodeId, &mut redis::aio::Connection)> { + ) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> { let node = Self::get_node_for(&self.keys, &self.cluster, key); Ok(( node, @@ -115,7 +118,7 @@ impl RedisCluster { conn: &'a mut HashMap, options: &Arc, node: &NodeId, - ) -> RedisResult<&'a mut redis::aio::Connection> { + ) -> RedisResult<&'a mut redis::aio::MultiplexedConnection> { assert!(!node.scheme().is_empty(), "Must have protocol"); assert!(node.host_str().is_some(), "Must have host"); assert!(node.port().is_some(), "Must have port"); diff --git a/sea-streamer-redis/src/connection.rs b/sea-streamer-redis/src/connection.rs index 2603efc..8ed220b 100644 --- a/sea-streamer-redis/src/connection.rs +++ b/sea-streamer-redis/src/connection.rs @@ -8,7 +8,7 @@ use sea_streamer_runtime::{sleep, timeout}; use sea_streamer_types::{ConnectOptions, StreamErr}; #[derive(Debug)] -/// A wrapped [`redis::aio::Connection`] that can auto-reconnect. +/// A wrapped [`redis::aio::MultiplexedConnection`] that can auto-reconnect. pub struct Connection { node: NodeId, options: Arc, @@ -16,7 +16,7 @@ pub struct Connection { } enum State { - Alive(redis::aio::Connection), + Alive(redis::aio::MultiplexedConnection), Reconnecting { delay: u32 }, Dead, } @@ -69,7 +69,7 @@ impl Connection { } /// Get a mutable connection, will wait and retry a few times until dead. - pub async fn get(&mut self) -> RedisResult<&mut redis::aio::Connection> { + pub async fn get(&mut self) -> RedisResult<&mut redis::aio::MultiplexedConnection> { match &mut self.state { State::Alive(_) | State::Dead => (), State::Reconnecting { delay } => { @@ -93,7 +93,7 @@ impl Connection { } /// Get a mutable connection, only if it is alive. - pub fn try_get(&mut self) -> RedisResult<&mut redis::aio::Connection> { + pub fn try_get(&mut self) -> RedisResult<&mut redis::aio::MultiplexedConnection> { match &mut self.state { State::Alive(conn) => Ok(conn), State::Dead => Err(StreamErr::Connect(format!( @@ -119,7 +119,7 @@ impl Connection { async fn create_connection( url: NodeId, options: Arc, -) -> RedisResult { +) -> RedisResult { let host = if let Some(host) = url.host_str() { host.to_owned() } else { @@ -132,6 +132,7 @@ async fn create_connection( "rediss" => ConnectionAddr::TcpTls { host, port, + tls_params: None, insecure: options.disable_hostname_verification(), }, "" => return Err(StreamErr::Connect("protocol not set".to_owned())), @@ -147,7 +148,7 @@ async fn create_connection( // I wish we could do `.await_timeout(d)` some day match timeout( options.timeout().unwrap_or(DEFAULT_TIMEOUT), - client.get_async_connection(), + client.get_multiplexed_async_connection(), ) .await { diff --git a/sea-streamer-redis/src/consumer/node.rs b/sea-streamer-redis/src/consumer/node.rs index d39ffa2..57bc840 100644 --- a/sea-streamer-redis/src/consumer/node.rs +++ b/sea-streamer-redis/src/consumer/node.rs @@ -365,7 +365,10 @@ impl Node { && Timestamp::now_utc() - *self.options.auto_commit_interval() > self.group.last_commit } - async fn commit_ack(&mut self, conn: &mut redis::aio::Connection) -> RedisResult<()> { + async fn commit_ack( + &mut self, + conn: &mut redis::aio::MultiplexedConnection, + ) -> RedisResult<()> { for shard in self.shards.iter_mut() { if !shard.pending_ack.is_empty() { match self.options.auto_commit() { @@ -438,7 +441,10 @@ impl Node { } } - async fn read_next(&mut self, conn: &mut redis::aio::Connection) -> RedisResult { + async fn read_next( + &mut self, + conn: &mut redis::aio::MultiplexedConnection, + ) -> RedisResult { let mode = self.running_mode(); if matches!(mode, ConsumerMode::Resumable | ConsumerMode::LoadBalanced) && self.group.first_read @@ -601,7 +607,10 @@ impl Node { } } - async fn auto_claim(&mut self, conn: &mut redis::aio::Connection) -> RedisResult { + async fn auto_claim( + &mut self, + conn: &mut redis::aio::MultiplexedConnection, + ) -> RedisResult { self.group.last_check = Timestamp::now_utc(); let change = self.group.claiming.is_none(); if self.group.claiming.is_none() { @@ -695,7 +704,10 @@ impl Node { } } - async fn move_shards(&mut self, conn: &mut redis::aio::Connection) -> Vec { + async fn move_shards( + &mut self, + conn: &mut redis::aio::MultiplexedConnection, + ) -> Vec { let mut events = Vec::new(); let shards = std::mem::take(&mut self.shards); for shard in shards {