Skip to content

Commit

Permalink
publish: QoS2
Browse files Browse the repository at this point in the history
Lot's of fixes in order to get QoS2 to work (in memory storage only).

Signed-off-by: Guilherme Felipe da Silva <[email protected]>
  • Loading branch information
frenzox committed May 29, 2023
1 parent eec24f3 commit cac1874
Show file tree
Hide file tree
Showing 15 changed files with 329 additions and 134 deletions.
2 changes: 1 addition & 1 deletion mercurio-core/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn decode_var_byte_integer<T: Buf>(encoded: &mut T) -> crate::Result<VariableByt
Ok(VariableByteInteger(value))
}

#[derive(PartialEq, Eq, Debug, Default)]
#[derive(PartialEq, Eq, Debug, Default, Clone)]
pub struct VariableByteInteger(pub u32);

impl Encoder for VariableByteInteger {
Expand Down
1 change: 1 addition & 0 deletions mercurio-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::qos::QoS;

#[derive(Clone, Debug)]
pub struct Message {
pub packet_id: Option<u16>,
pub topic: String,
pub dup: bool,
pub qos: QoS,
Expand Down
4 changes: 2 additions & 2 deletions mercurio-core/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{

macro_rules! def_prop {
($t:ident {$i:ident: $a:expr, $($n:tt: $s:ty),*}) => {
#[derive(Debug, Default, PartialEq, Eq)]
pub struct $t {$($n: $s,)*}
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct $t {$(pub $n: $s,)*}

impl $t {
pub const $i: u32 = $a;
Expand Down
36 changes: 25 additions & 11 deletions mercurio-packets/src/puback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl Decoder for PubAckProperties {

#[derive(Default, Debug, PartialEq, Eq)]
pub struct PubAckPacket {
packet_id: u16,
reason: ReasonCode,
properties: Option<PubAckProperties>,
pub packet_id: u16,
pub reason: ReasonCode,
pub properties: Option<PubAckProperties>,
}

const PACKET_TYPE: u8 = 0x04;
Expand All @@ -79,29 +79,43 @@ impl Encoder for PubAckPacket {
buffer.put_u8(PACKET_TYPE << 4);

remaining_len += self.packet_id.encoded_size();
remaining_len += self.reason.encoded_size();
remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();

if self.properties.is_some() || self.reason != ReasonCode::Success {
remaining_len += self.reason.encoded_size();
remaining_len +=
VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
}

VariableByteInteger(remaining_len as u32).encode(buffer);

self.packet_id.encode(buffer);

if remaining_len == 2 {
return;
}

self.reason.encode(buffer);
VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer);
self.properties.encode(buffer);
}

fn encoded_size(&self) -> usize {
unimplemented!()
}
}

impl Decoder for PubAckPacket {
fn decode<T: Buf>(buffer: &mut T) -> crate::Result<Self> {
buffer.advance(1);

let _ = VariableByteInteger::decode(buffer)?;
let remaining_len = VariableByteInteger::decode(buffer)?;
let packet_id = u16::decode(buffer)?;

if remaining_len.0 == 2 {
return Ok(PubAckPacket {
packet_id,
reason: ReasonCode::Success,
properties: None,
});
}

let reason = ReasonCode::decode(buffer)?;
let properties = Some(PubAckProperties::decode(buffer)?);

Expand Down
35 changes: 24 additions & 11 deletions mercurio-packets/src/pubcomp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl Decoder for PubCompProperties {

#[derive(Default, Debug, PartialEq, Eq)]
pub struct PubCompPacket {
packet_id: u16,
reason: ReasonCode,
properties: Option<PubCompProperties>,
pub packet_id: u16,
pub reason: ReasonCode,
pub properties: Option<PubCompProperties>,
}

const PACKET_TYPE: u8 = 0x07;
Expand All @@ -79,29 +79,42 @@ impl Encoder for PubCompPacket {
buffer.put_u8(PACKET_TYPE << 4);

remaining_len += self.packet_id.encoded_size();
remaining_len += self.reason.encoded_size();
remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
if self.properties.is_some() || self.reason != ReasonCode::Success {
remaining_len += self.reason.encoded_size();
remaining_len +=
VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
}

VariableByteInteger(remaining_len as u32).encode(buffer);

self.packet_id.encode(buffer);

if remaining_len == 2 {
return;
}

self.reason.encode(buffer);
VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer);
self.properties.encode(buffer);
}

fn encoded_size(&self) -> usize {
unimplemented!()
}
}

impl Decoder for PubCompPacket {
fn decode<T: Buf>(buffer: &mut T) -> crate::Result<Self> {
buffer.advance(1);

let _ = VariableByteInteger::decode(buffer)?;
let remaining_len = VariableByteInteger::decode(buffer)?;
let packet_id = u16::decode(buffer)?;

if remaining_len.0 == 2 {
return Ok(PubCompPacket {
packet_id,
reason: ReasonCode::Success,
properties: None,
});
}

let reason = ReasonCode::decode(buffer)?;
let properties = Some(PubCompProperties::decode(buffer)?);

Expand Down
4 changes: 2 additions & 2 deletions mercurio-packets/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use mercurio_core::{
reason::ReasonCode,
};

#[derive(Default, Debug, PartialEq, Eq)]
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct PublishProperties {
payload_format_indicator: Option<PayloadFormatIndicator>,
message_expiry_interval: Option<MessageExpiryInterval>,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Decoder for PublishProperties {
}
}

#[derive(Default, Debug, PartialEq, Eq)]
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct PublishPacket {
pub dup: bool,
pub qos_level: QoS,
Expand Down
31 changes: 24 additions & 7 deletions mercurio-packets/src/pubrec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl Decoder for PubRecProperties {

#[derive(Default, Debug, PartialEq, Eq)]
pub struct PubRecPacket {
packet_id: u16,
reason: ReasonCode,
properties: Option<PubRecProperties>,
pub packet_id: u16,
pub reason: ReasonCode,
pub properties: Option<PubRecProperties>,
}

const PACKET_TYPE: u8 = 0x05;
Expand All @@ -79,13 +79,21 @@ impl Encoder for PubRecPacket {
buffer.put_u8(PACKET_TYPE << 4);

remaining_len += self.packet_id.encoded_size();
remaining_len += self.reason.encoded_size();
remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
if self.properties.is_some() || self.reason != ReasonCode::Success {
remaining_len += self.reason.encoded_size();
remaining_len +=
VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
}

VariableByteInteger(remaining_len as u32).encode(buffer);

self.packet_id.encode(buffer);

if remaining_len == 2 {
return;
}

self.reason.encode(buffer);
VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer);
self.properties.encode(buffer);
Expand All @@ -100,8 +108,17 @@ impl Decoder for PubRecPacket {
fn decode<T: Buf>(buffer: &mut T) -> crate::Result<Self> {
buffer.advance(1);

let _ = VariableByteInteger::decode(buffer)?;
let remaining_len = VariableByteInteger::decode(buffer)?;
let packet_id = u16::decode(buffer)?;

if remaining_len.0 == 2 {
return Ok(PubRecPacket {
packet_id,
reason: ReasonCode::Success,
properties: None,
});
}

let reason = ReasonCode::decode(buffer)?;
let properties = Some(PubRecProperties::decode(buffer)?);

Expand Down
31 changes: 24 additions & 7 deletions mercurio-packets/src/pubrel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl Decoder for PubRelProperties {

#[derive(Default, Debug, PartialEq, Eq)]
pub struct PubRelPacket {
packet_id: u16,
reason: ReasonCode,
properties: Option<PubRelProperties>,
pub packet_id: u16,
pub reason: ReasonCode,
pub properties: Option<PubRelProperties>,
}

const PACKET_TYPE: u8 = 0x06;
Expand All @@ -79,13 +79,21 @@ impl Encoder for PubRelPacket {
buffer.put_u8((PACKET_TYPE << 4) | 0x02);

remaining_len += self.packet_id.encoded_size();
remaining_len += self.reason.encoded_size();
remaining_len += VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
if self.properties.is_some() || self.reason != ReasonCode::Success {
remaining_len += self.reason.encoded_size();
remaining_len +=
VariableByteInteger(self.properties.encoded_size() as u32).encoded_size();
remaining_len += self.properties.encoded_size();
}

VariableByteInteger(remaining_len as u32).encode(buffer);

self.packet_id.encode(buffer);

if remaining_len == 2 {
return;
}

self.reason.encode(buffer);
VariableByteInteger(self.properties.encoded_size() as u32).encode(buffer);
self.properties.encode(buffer);
Expand All @@ -100,8 +108,17 @@ impl Decoder for PubRelPacket {
fn decode<T: Buf>(buffer: &mut T) -> crate::Result<Self> {
buffer.advance(1);

let _ = VariableByteInteger::decode(buffer)?;
let remaining_len = VariableByteInteger::decode(buffer)?;
let packet_id = u16::decode(buffer)?;

if remaining_len.0 == 2 {
return Ok(PubRelPacket {
packet_id,
reason: ReasonCode::Success,
properties: None,
});
}

let reason = ReasonCode::decode(buffer)?;
let properties = Some(PubRelProperties::decode(buffer)?);

Expand Down
1 change: 1 addition & 0 deletions mercurio-server/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async fn main() -> mercurio_core::Result<()> {
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let listener = TcpListener::bind("127.0.0.1:1883").await?;
server::run(listener, signal::ctrl_c()).await;

Expand Down
10 changes: 2 additions & 8 deletions mercurio-server/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::{
sync::{Arc, Mutex},
};

use rand::{self, prelude::SliceRandom};
use tokio::sync::broadcast;

use crate::topic_tree::TopicTree;
use mercurio_core::{message::Message, Result};

const CHANNEL_SIZE: usize = 5;

#[derive(Debug, Clone)]
pub(crate) struct Broker {
shared: Arc<Shared>,
Expand All @@ -25,13 +26,6 @@ struct State {
shared_subscriptions: HashMap<String, HashMap<String, Vec<broadcast::Sender<Message>>>>,
}

fn get_random_element<T>(vec: &[T]) -> Option<&T> {
let mut rng = rand::thread_rng();
vec.choose(&mut rng)
}

const CHANNEL_SIZE: usize = 5;

impl Broker {
pub(crate) fn new() -> Broker {
let shared = Arc::new(Shared {
Expand Down
1 change: 0 additions & 1 deletion mercurio-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![allow(dead_code)]
mod broker;
pub mod connection;
pub mod server;
Expand Down
4 changes: 4 additions & 0 deletions mercurio-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Listener {
async fn run(&mut self) -> Result<()> {
loop {
let socket = self.accept().await?;

info!("Got a connection: {:#?}", socket.peer_addr());

let mut handler = Handler {
Expand Down Expand Up @@ -128,12 +129,15 @@ impl Handler {
).await?;

if let Some(res) = maybe_res {
tracing::debug!("Sending response packet:{:#?} to client {:?}", res, session.get_client_id().await);
self.connection.write_packet(res).await?;
}
}

// Try to send outgoing packet
Some(packet) = session.process_outgoing() => {
tracing::debug!("Sending outgoing packet: {:#?} to client {:?}", packet, session.get_client_id().await);

self.connection.write_packet(packet).await?;
}

Expand Down
Loading

0 comments on commit cac1874

Please sign in to comment.