From cac187441e6ddf8e32317cb4aa89f33364efa064 Mon Sep 17 00:00:00 2001 From: Guilherme Felipe da Silva Date: Mon, 29 May 2023 12:45:10 +0200 Subject: [PATCH] publish: QoS2 Lot's of fixes in order to get QoS2 to work (in memory storage only). Signed-off-by: Guilherme Felipe da Silva --- mercurio-core/src/codec.rs | 2 +- mercurio-core/src/message.rs | 1 + mercurio-core/src/properties.rs | 4 +- mercurio-packets/src/puback.rs | 36 ++- mercurio-packets/src/pubcomp.rs | 35 ++- mercurio-packets/src/publish.rs | 4 +- mercurio-packets/src/pubrec.rs | 31 ++- mercurio-packets/src/pubrel.rs | 31 ++- mercurio-server/src/bin/main.rs | 1 + mercurio-server/src/broker.rs | 10 +- mercurio-server/src/lib.rs | 1 - mercurio-server/src/server.rs | 4 + mercurio-server/src/session.rs | 295 ++++++++++++++++++------- mercurio-server/src/session_manager.rs | 4 +- mercurio-server/src/topic_tree.rs | 4 +- 15 files changed, 329 insertions(+), 134 deletions(-) diff --git a/mercurio-core/src/codec.rs b/mercurio-core/src/codec.rs index 841164d..02fdba4 100644 --- a/mercurio-core/src/codec.rs +++ b/mercurio-core/src/codec.rs @@ -63,7 +63,7 @@ fn decode_var_byte_integer(encoded: &mut T) -> crate::Result, pub topic: String, pub dup: bool, pub qos: QoS, diff --git a/mercurio-core/src/properties.rs b/mercurio-core/src/properties.rs index 0bae9b9..5c7e8d1 100644 --- a/mercurio-core/src/properties.rs +++ b/mercurio-core/src/properties.rs @@ -7,8 +7,8 @@ use crate::{ macro_rules! def_prop { ($t:ident {$i:ident: $a:expr, $($n:tt: $s:ty),*}) => { - #[derive(Debug, Default, PartialEq, Eq)] - pub struct $t {$($n: $s,)*} + #[derive(Debug, Default, PartialEq, Eq, Clone)] + pub struct $t {$(pub $n: $s,)*} impl $t { pub const $i: u32 = $a; diff --git a/mercurio-packets/src/puback.rs b/mercurio-packets/src/puback.rs index 5b30619..4199b9e 100644 --- a/mercurio-packets/src/puback.rs +++ b/mercurio-packets/src/puback.rs @@ -65,9 +65,9 @@ impl Decoder for PubAckProperties { #[derive(Default, Debug, PartialEq, Eq)] pub struct PubAckPacket { - packet_id: u16, - reason: ReasonCode, - properties: Option, + pub packet_id: u16, + pub reason: ReasonCode, + pub properties: Option, } const PACKET_TYPE: u8 = 0x04; @@ -79,29 +79,43 @@ impl Encoder for PubAckPacket { buffer.put_u8(PACKET_TYPE << 4); remaining_len += self.packet_id.encoded_size(); - remaining_len += self.reason.encoded_size(); - remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); - remaining_len += self.properties.encoded_size(); + + if self.properties.is_some() || self.reason != ReasonCode::Success { + remaining_len += self.reason.encoded_size(); + remaining_len += + VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); + remaining_len += self.properties.encoded_size(); + } VariableByteInteger(remaining_len as u32).encode(buffer); self.packet_id.encode(buffer); + + if remaining_len == 2 { + return; + } + self.reason.encode(buffer); VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer); self.properties.encode(buffer); } - - fn encoded_size(&self) -> usize { - unimplemented!() - } } impl Decoder for PubAckPacket { fn decode(buffer: &mut T) -> crate::Result { buffer.advance(1); - let _ = VariableByteInteger::decode(buffer)?; + let remaining_len = VariableByteInteger::decode(buffer)?; let packet_id = u16::decode(buffer)?; + + if remaining_len.0 == 2 { + return Ok(PubAckPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }); + } + let reason = ReasonCode::decode(buffer)?; let properties = Some(PubAckProperties::decode(buffer)?); diff --git a/mercurio-packets/src/pubcomp.rs b/mercurio-packets/src/pubcomp.rs index 3ea40e4..6e3b19d 100644 --- a/mercurio-packets/src/pubcomp.rs +++ b/mercurio-packets/src/pubcomp.rs @@ -65,9 +65,9 @@ impl Decoder for PubCompProperties { #[derive(Default, Debug, PartialEq, Eq)] pub struct PubCompPacket { - packet_id: u16, - reason: ReasonCode, - properties: Option, + pub packet_id: u16, + pub reason: ReasonCode, + pub properties: Option, } const PACKET_TYPE: u8 = 0x07; @@ -79,29 +79,42 @@ impl Encoder for PubCompPacket { buffer.put_u8(PACKET_TYPE << 4); remaining_len += self.packet_id.encoded_size(); - remaining_len += self.reason.encoded_size(); - remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); - remaining_len += self.properties.encoded_size(); + if self.properties.is_some() || self.reason != ReasonCode::Success { + remaining_len += self.reason.encoded_size(); + remaining_len += + VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); + remaining_len += self.properties.encoded_size(); + } VariableByteInteger(remaining_len as u32).encode(buffer); self.packet_id.encode(buffer); + + if remaining_len == 2 { + return; + } + self.reason.encode(buffer); VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer); self.properties.encode(buffer); } - - fn encoded_size(&self) -> usize { - unimplemented!() - } } impl Decoder for PubCompPacket { fn decode(buffer: &mut T) -> crate::Result { buffer.advance(1); - let _ = VariableByteInteger::decode(buffer)?; + let remaining_len = VariableByteInteger::decode(buffer)?; let packet_id = u16::decode(buffer)?; + + if remaining_len.0 == 2 { + return Ok(PubCompPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }); + } + let reason = ReasonCode::decode(buffer)?; let properties = Some(PubCompProperties::decode(buffer)?); diff --git a/mercurio-packets/src/publish.rs b/mercurio-packets/src/publish.rs index 8c01a73..4ae7a3c 100644 --- a/mercurio-packets/src/publish.rs +++ b/mercurio-packets/src/publish.rs @@ -8,7 +8,7 @@ use mercurio_core::{ reason::ReasonCode, }; -#[derive(Default, Debug, PartialEq, Eq)] +#[derive(Default, Debug, PartialEq, Eq, Clone)] pub struct PublishProperties { payload_format_indicator: Option, message_expiry_interval: Option, @@ -87,7 +87,7 @@ impl Decoder for PublishProperties { } } -#[derive(Default, Debug, PartialEq, Eq)] +#[derive(Default, Debug, PartialEq, Eq, Clone)] pub struct PublishPacket { pub dup: bool, pub qos_level: QoS, diff --git a/mercurio-packets/src/pubrec.rs b/mercurio-packets/src/pubrec.rs index c79a988..1e5f264 100644 --- a/mercurio-packets/src/pubrec.rs +++ b/mercurio-packets/src/pubrec.rs @@ -65,9 +65,9 @@ impl Decoder for PubRecProperties { #[derive(Default, Debug, PartialEq, Eq)] pub struct PubRecPacket { - packet_id: u16, - reason: ReasonCode, - properties: Option, + pub packet_id: u16, + pub reason: ReasonCode, + pub properties: Option, } const PACKET_TYPE: u8 = 0x05; @@ -79,13 +79,21 @@ impl Encoder for PubRecPacket { buffer.put_u8(PACKET_TYPE << 4); remaining_len += self.packet_id.encoded_size(); - remaining_len += self.reason.encoded_size(); - remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); - remaining_len += self.properties.encoded_size(); + if self.properties.is_some() || self.reason != ReasonCode::Success { + remaining_len += self.reason.encoded_size(); + remaining_len += + VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); + remaining_len += self.properties.encoded_size(); + } VariableByteInteger(remaining_len as u32).encode(buffer); self.packet_id.encode(buffer); + + if remaining_len == 2 { + return; + } + self.reason.encode(buffer); VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer); self.properties.encode(buffer); @@ -100,8 +108,17 @@ impl Decoder for PubRecPacket { fn decode(buffer: &mut T) -> crate::Result { buffer.advance(1); - let _ = VariableByteInteger::decode(buffer)?; + let remaining_len = VariableByteInteger::decode(buffer)?; let packet_id = u16::decode(buffer)?; + + if remaining_len.0 == 2 { + return Ok(PubRecPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }); + } + let reason = ReasonCode::decode(buffer)?; let properties = Some(PubRecProperties::decode(buffer)?); diff --git a/mercurio-packets/src/pubrel.rs b/mercurio-packets/src/pubrel.rs index da9158f..b9f5bc3 100644 --- a/mercurio-packets/src/pubrel.rs +++ b/mercurio-packets/src/pubrel.rs @@ -65,9 +65,9 @@ impl Decoder for PubRelProperties { #[derive(Default, Debug, PartialEq, Eq)] pub struct PubRelPacket { - packet_id: u16, - reason: ReasonCode, - properties: Option, + pub packet_id: u16, + pub reason: ReasonCode, + pub properties: Option, } const PACKET_TYPE: u8 = 0x06; @@ -79,13 +79,21 @@ impl Encoder for PubRelPacket { buffer.put_u8((PACKET_TYPE << 4) | 0x02); remaining_len += self.packet_id.encoded_size(); - remaining_len += self.reason.encoded_size(); - remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); - remaining_len += self.properties.encoded_size(); + if self.properties.is_some() || self.reason != ReasonCode::Success { + remaining_len += self.reason.encoded_size(); + remaining_len += + VariableByteInteger(self.properties.encoded_size() as u32).encoded_size(); + remaining_len += self.properties.encoded_size(); + } VariableByteInteger(remaining_len as u32).encode(buffer); self.packet_id.encode(buffer); + + if remaining_len == 2 { + return; + } + self.reason.encode(buffer); VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer); self.properties.encode(buffer); @@ -100,8 +108,17 @@ impl Decoder for PubRelPacket { fn decode(buffer: &mut T) -> crate::Result { buffer.advance(1); - let _ = VariableByteInteger::decode(buffer)?; + let remaining_len = VariableByteInteger::decode(buffer)?; let packet_id = u16::decode(buffer)?; + + if remaining_len.0 == 2 { + return Ok(PubRelPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }); + } + let reason = ReasonCode::decode(buffer)?; let properties = Some(PubRelProperties::decode(buffer)?); diff --git a/mercurio-server/src/bin/main.rs b/mercurio-server/src/bin/main.rs index 3a598d8..edb1bcf 100644 --- a/mercurio-server/src/bin/main.rs +++ b/mercurio-server/src/bin/main.rs @@ -14,6 +14,7 @@ async fn main() -> mercurio_core::Result<()> { .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let listener = TcpListener::bind("127.0.0.1:1883").await?; server::run(listener, signal::ctrl_c()).await; diff --git a/mercurio-server/src/broker.rs b/mercurio-server/src/broker.rs index eccb667..686f4c7 100644 --- a/mercurio-server/src/broker.rs +++ b/mercurio-server/src/broker.rs @@ -3,12 +3,13 @@ use std::{ sync::{Arc, Mutex}, }; -use rand::{self, prelude::SliceRandom}; use tokio::sync::broadcast; use crate::topic_tree::TopicTree; use mercurio_core::{message::Message, Result}; +const CHANNEL_SIZE: usize = 5; + #[derive(Debug, Clone)] pub(crate) struct Broker { shared: Arc, @@ -25,13 +26,6 @@ struct State { shared_subscriptions: HashMap>>>, } -fn get_random_element(vec: &[T]) -> Option<&T> { - let mut rng = rand::thread_rng(); - vec.choose(&mut rng) -} - -const CHANNEL_SIZE: usize = 5; - impl Broker { pub(crate) fn new() -> Broker { let shared = Arc::new(Shared { diff --git a/mercurio-server/src/lib.rs b/mercurio-server/src/lib.rs index 338d70e..8f90525 100644 --- a/mercurio-server/src/lib.rs +++ b/mercurio-server/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] mod broker; pub mod connection; pub mod server; diff --git a/mercurio-server/src/server.rs b/mercurio-server/src/server.rs index 051bd46..546e7f1 100644 --- a/mercurio-server/src/server.rs +++ b/mercurio-server/src/server.rs @@ -57,6 +57,7 @@ impl Listener { async fn run(&mut self) -> Result<()> { loop { let socket = self.accept().await?; + info!("Got a connection: {:#?}", socket.peer_addr()); let mut handler = Handler { @@ -128,12 +129,15 @@ impl Handler { ).await?; if let Some(res) = maybe_res { + tracing::debug!("Sending response packet:{:#?} to client {:?}", res, session.get_client_id().await); self.connection.write_packet(res).await?; } } // Try to send outgoing packet Some(packet) = session.process_outgoing() => { + tracing::debug!("Sending outgoing packet: {:#?} to client {:?}", packet, session.get_client_id().await); + self.connection.write_packet(packet).await?; } diff --git a/mercurio-server/src/session.rs b/mercurio-server/src/session.rs index 825961a..f39c381 100644 --- a/mercurio-server/src/session.rs +++ b/mercurio-server/src/session.rs @@ -1,9 +1,9 @@ use std::{pin::Pin, sync::Arc}; -use rand::Rng; use tokio::sync::{broadcast, Mutex}; use tokio_stream::{Stream, StreamExt, StreamMap}; use tracing::info; +use uuid::Uuid; type Messages = Pin + Send>>; @@ -15,7 +15,10 @@ use mercurio_packets::{ connect::ConnectPacket, pingresp::PingRespPacket, puback::PubAckPacket, + pubcomp::PubCompPacket, publish::PublishPacket, + pubrec::PubRecPacket, + pubrel::PubRelPacket, suback::{SubAckPacket, SubAckPayload}, ControlPacket, }; @@ -36,8 +39,12 @@ struct Shared { } struct State { - connect_packet: ConnectPacket, + pub connect_packet: ConnectPacket, subscriptions: StreamMap, + unacknowledged_messages: Vec, + pubrecs: Vec, + tx_pending_messages: Vec, + expiry: Option, } impl SessionDropGuard { @@ -54,11 +61,28 @@ impl SessionDropGuard { impl Session { pub fn new(connect_packet: ConnectPacket) -> Self { + let expiry = connect_packet + .properties + .as_ref() + .map(|p| { + p.session_expiry_interval + .as_ref() + .map(|session_expiry_interval| { + tokio::time::Instant::now() + + std::time::Duration::from_secs(session_expiry_interval.value.into()) + }) + }) + .flatten(); + Session { shared: Arc::new(Shared { state: Mutex::new(State { connect_packet, subscriptions: StreamMap::new(), + unacknowledged_messages: Vec::new(), + pubrecs: Vec::new(), + tx_pending_messages: Vec::new(), + expiry, }), }), } @@ -69,6 +93,11 @@ impl Session { session.connect_packet = connect_packet; } + pub(crate) async fn get_client_id(&self) -> String { + let session = self.shared.state.lock().await; + session.connect_packet.payload.client_id.clone() + } + pub async fn begin(&mut self, connection: &mut Connection, resume: bool) -> Result<()> { let mut ack = ConnAckPacket::default(); ack.flags.session_present = resume; @@ -77,14 +106,8 @@ impl Session { let mut session = self.shared.state.lock().await; if session.connect_packet.payload.client_id.is_empty() { - let random_bytes: [u8; 16] = rand::thread_rng().gen(); - session.connect_packet.payload.client_id = uuid::Builder::from_bytes(random_bytes) - .set_variant(uuid::Variant::RFC4122) - .set_version(uuid::Version::Random) - .build() - .to_hyphenated() - .to_string(); - + let uuid = Uuid::new_v4(); + session.connect_packet.payload.client_id = uuid.hyphenated().to_string(); ack.properties = Some(ConnAckProperties { assigned_client_id: Some(AssignedClientIdentifier::new( session.connect_packet.payload.client_id.clone(), @@ -108,76 +131,181 @@ impl Session { Ok(()) } - pub(crate) async fn process_incoming( + async fn handle_publish( &mut self, - packet: ControlPacket, + packet: PublishPacket, broker: &Broker, ) -> Result> { - match packet { - ControlPacket::Connect(_) => Err(ReasonCode::ProtocolError.into()), - ControlPacket::ConnAck(_) => Err(ReasonCode::ProtocolError.into()), - ControlPacket::Publish(p) => { - let topic = p.topic_name.clone(); - - let message = Message { - topic: p.topic_name, - dup: p.dup, - retain: p.retain, - qos: p.qos_level, - payload: p.payload, - }; + match packet.qos_level { + QoS::AtMostOnce => Ok(None), + QoS::AtLeastOnce => { + if let Some(packet_id) = packet.packet_id { + Ok(ControlPacket::PubAck(PubAckPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }) + .into()) + } else { + Err(ReasonCode::ProtocolError.into()) + } + } + QoS::ExactlyOnce => { + if let Some(packet_id) = packet.packet_id { + let mut session = self.shared.state.lock().await; + session.unacknowledged_messages.push(packet.clone()); + Ok(ControlPacket::PubRec(PubRecPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }) + .into()) + } else { + Err(ReasonCode::ProtocolError.into()) + } + } + QoS::Invalid => Err(ReasonCode::ProtocolError.into()), + } + .and_then(|res| { + let topic = packet.topic_name.clone(); + let message = Message { + packet_id: packet.packet_id, + topic: packet.topic_name, + dup: packet.dup, + retain: packet.retain, + qos: packet.qos_level, + payload: packet.payload, + }; - broker.publish(&topic, message)?; + broker.publish(&topic, message)?; - if p.qos_level == QoS::AtMostOnce { - return Ok(None); - } + Ok(res) + }) + } - Ok(ControlPacket::PubAck(PubAckPacket::default()).into()) - } - ControlPacket::PubAck(_) => todo!(), - ControlPacket::PubRec(_) => todo!(), - ControlPacket::PubRel(_) => todo!(), - ControlPacket::PubComp(_) => todo!(), - ControlPacket::Subscribe(p) => { - let mut session = self.shared.state.lock().await; - let mut ack = SubAckPacket { - packet_id: p.packet_id, - properties: None, - payload: Vec::new(), - }; + async fn handle_puback(&mut self, packet: PubAckPacket) -> Result> { + let mut session = self.shared.state.lock().await; + if let Some(index) = session + .unacknowledged_messages + .iter() + .position(|p| p.packet_id == Some(packet.packet_id)) + { + session.unacknowledged_messages.remove(index); + } + + Ok(None) + } + + async fn handle_pubrec(&mut self, packet: PubRecPacket) -> Result> { + let mut session = self.shared.state.lock().await; + if let Some(index) = session + .unacknowledged_messages + .iter() + .position(|p| p.packet_id == Some(packet.packet_id)) + { + session.unacknowledged_messages.remove(index); + } + + let packet_id = packet.packet_id; + session.pubrecs.push(packet); + + Ok(ControlPacket::PubRel(PubRelPacket { + packet_id, + reason: ReasonCode::Success, + properties: None, + }) + .into()) + } + + async fn handle_pubcomp(&mut self, packet: PubCompPacket) -> Result> { + let mut session = self.shared.state.lock().await; + if let Some(index) = session + .pubrecs + .iter() + .position(|p| p.packet_id == packet.packet_id) + { + session.pubrecs.remove(index); + } + + Ok(None) + } + + async fn handle_pubrel(&mut self, packet: PubRelPacket) -> Result> { + let mut session = self.shared.state.lock().await; + if let Some(index) = session + .pubrecs + .iter() + .position(|p| p.packet_id == packet.packet_id) + { + session.pubrecs.remove(index); + } + + Ok(ControlPacket::PubComp(PubCompPacket { + packet_id: packet.packet_id, + reason: ReasonCode::Success, + properties: None, + }) + .into()) + } - for sub in &p.payload { - let mut rx = broker.subscribe(sub.topic_filter.to_string()); - ack.payload.push(SubAckPayload { - reason_code: ReasonCode::GrantedQoS0, - }); - - let rx = Box::pin(async_stream::stream! { - loop { - match rx.recv().await { - Ok(msg) => yield msg, - // If we lagged in consuming messages, just resume. - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(_) => break, - } - } - }); - - session - .subscriptions - .insert(sub.topic_filter.to_string(), rx); + async fn handle_subscribe( + &mut self, + packet: mercurio_packets::subscribe::SubscribePacket, + broker: &Broker, + ) -> Result> { + let mut session = self.shared.state.lock().await; + let mut ack = SubAckPacket { + packet_id: packet.packet_id, + properties: None, + payload: Vec::new(), + }; + + for sub in &packet.payload { + let mut rx = broker.subscribe(sub.topic_filter.to_string()); + ack.payload.push(SubAckPayload { + reason_code: ReasonCode::GrantedQoS0, + }); + + let rx = Box::pin(async_stream::stream! { + loop { + match rx.recv().await { + Ok(msg) => yield msg, + // If we lagged in consuming messages, just resume. + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(_) => break, + } } + }); - Ok(ControlPacket::SubAck(ack).into()) - } - ControlPacket::SubAck(_) => todo!(), + session + .subscriptions + .insert(sub.topic_filter.to_string(), rx); + } + + Ok(ControlPacket::SubAck(ack).into()) + } + + pub(crate) async fn process_incoming( + &mut self, + packet: ControlPacket, + broker: &Broker, + ) -> Result> { + match packet { + ControlPacket::Publish(packet) => self.handle_publish(packet, broker).await, + ControlPacket::PubAck(packet) => self.handle_puback(packet).await, + ControlPacket::PubRec(packet) => self.handle_pubrec(packet).await, + ControlPacket::PubRel(packet) => self.handle_pubrel(packet).await, + ControlPacket::PubComp(packet) => self.handle_pubcomp(packet).await, + ControlPacket::Subscribe(packet) => self.handle_subscribe(packet, broker).await, ControlPacket::Unsubscribe(_) => todo!(), - ControlPacket::UnsubAck(_) => todo!(), ControlPacket::PingReq(_) => Ok(ControlPacket::PingResp(PingRespPacket {}).into()), - ControlPacket::PingResp(_) => todo!(), - ControlPacket::Disconnect(p) => Ok(ControlPacket::Disconnect(p).into()), + ControlPacket::Disconnect(packet) => Ok(ControlPacket::Disconnect(packet).into()), ControlPacket::Auth(_) => todo!(), + + // Some packets are not supposed to be received by the server. + // Namely: ConnAck, Unsubscribe, UnsubAck, PingResp + // The Connect packet is handled before the session is created. + _ => Err(ReasonCode::ProtocolError.into()), } } @@ -185,16 +313,27 @@ impl Session { let mut session = self.shared.state.lock().await; match session.subscriptions.next().await { - Some((topic, message)) => ControlPacket::Publish(PublishPacket { - dup: message.dup, - qos_level: message.qos, - retain: false, - topic_name: topic, - packet_id: 0.into(), - properties: None, - payload: message.payload, - }) - .into(), + Some((topic, message)) => { + let publish = PublishPacket { + dup: message.dup, + qos_level: message.qos, + retain: false, + topic_name: topic, + packet_id: message.packet_id, + properties: None, + payload: message.payload, + }; + + match message.qos { + mercurio_core::qos::QoS::AtMostOnce => {} + mercurio_core::qos::QoS::AtLeastOnce | mercurio_core::qos::QoS::ExactlyOnce => { + session.unacknowledged_messages.push(publish.clone()); + } + mercurio_core::qos::QoS::Invalid => unreachable!(), + }; + + Some(ControlPacket::Publish(publish)) + } None => None, } } diff --git a/mercurio-server/src/session_manager.rs b/mercurio-server/src/session_manager.rs index df8dbf5..48d5498 100644 --- a/mercurio-server/src/session_manager.rs +++ b/mercurio-server/src/session_manager.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::Mutex; use mercurio_core::Result; use mercurio_packets::connect::ConnectPacket; @@ -21,7 +21,6 @@ pub(crate) struct SessionManager { struct Shared { state: Mutex, - background_task: Notify, } struct State { @@ -46,7 +45,6 @@ impl SessionManager { state: Mutex::new(State { sessions: HashMap::new(), }), - background_task: Notify::new(), }); SessionManager { shared } diff --git a/mercurio-server/src/topic_tree.rs b/mercurio-server/src/topic_tree.rs index ee9b6d3..1873915 100644 --- a/mercurio-server/src/topic_tree.rs +++ b/mercurio-server/src/topic_tree.rs @@ -11,18 +11,16 @@ use tracing::error; struct TopicNode { channel: broadcast::Sender, children: HashMap>, - n_subs: u32, level: usize, } impl TopicNode { pub fn new(level: usize) -> TopicNode { - let (sender, _) = broadcast::channel(5); //TODO: What size should this actually be? + let (sender, _) = broadcast::channel(5); // TODO: What size should this actually be? TopicNode { channel: sender, children: HashMap::new(), - n_subs: 0, level, } }