Skip to content

Commit

Permalink
Add proper delivery handling on receiver side (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Mar 10, 2024
1 parent 7d0cb48 commit efd8a84
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 124 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.0] - 2024-03-08

* Add proper delivery handling on receiver side

## [2.0.0] - 2024-03-06

* Add proper delivery handling
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "2.0.0"
version = "2.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
5 changes: 1 addition & 4 deletions codec/codegen/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@

use std::u8;
use derive_more::From;
use ntex_bytes::{BufMut, Bytes, BytesMut, ByteString};
use uuid::Uuid;

use super::*;
use crate::error::AmqpParseError;
use crate::codec::{self, decode_format_code, decode_list_header, Decode, DecodeFormatted, Encode};
use crate::codec::{decode_format_code, decode_list_header};

#[derive(Clone, Debug, PartialEq, Eq, From)]
pub enum Frame {
Expand Down
1 change: 0 additions & 1 deletion codec/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use ntex_bytes::{Bytes, BytesMut};
use std::marker::Sized;

use crate::error::AmqpParseError;

Expand Down
6 changes: 2 additions & 4 deletions codec/src/protocol/definitions.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#![allow(unused_assignments, unused_variables, unreachable_patterns)]
use super::*;
use crate::codec::{self, decode_format_code, decode_list_header, Decode, DecodeFormatted, Encode};
use crate::error::AmqpParseError;
use crate::codec::{decode_format_code, decode_list_header};
use derive_more::From;
use ntex_bytes::{BufMut, ByteString, Bytes, BytesMut};
use std::u8;
use uuid::Uuid;

#[derive(Clone, Debug, PartialEq, Eq, From)]
pub enum Frame {
Open(Open),
Expand Down
4 changes: 2 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ntex::util::{HashMap, PoolRef, Ready};
use crate::codec::protocol::{self as codec, Begin, Close, End, Error, Frame, Role};
use crate::codec::{AmqpCodec, AmqpFrame};
use crate::dispatcher::ControlQueue;
use crate::session::{Session, SessionInner};
use crate::session::{Session, SessionInner, INITIAL_OUTGOING_ID};
use crate::sndlink::{SenderLink, SenderLinkInner};
use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Configuration};

Expand Down Expand Up @@ -194,7 +194,7 @@ impl ConnectionRef {

let begin = Begin(Box::new(codec::BeginInner {
remote_channel: None,
next_outgoing_id: 1,
next_outgoing_id: INITIAL_OUTGOING_ID,
incoming_window: std::u32::MAX,
outgoing_window: std::u32::MAX,
handle_max: std::u32::MAX,
Expand Down
76 changes: 49 additions & 27 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bitflags::bitflags! {
}
}

#[derive(Debug)]
pub struct Delivery {
id: DeliveryNumber,
session: Session,
Expand All @@ -34,12 +35,36 @@ pub(crate) struct DeliveryInner {
}

impl Delivery {
pub(crate) fn new_rcv(id: DeliveryNumber, settled: bool, session: Session) -> Delivery {
if !settled {
session
.inner
.get_mut()
.unsettled_rcv_deliveries
.insert(id, DeliveryInner::new());
}

Delivery {
id,
session,
flags: StdCell::new(if settled {
Flags::LOCAL_SETTLED
} else {
Flags::empty()
}),
}
}

pub fn id(&self) -> DeliveryNumber {
self.id
}

pub fn remote_state(&self) -> Option<DeliveryState> {
if let Some(inner) = self
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))
.get_mut(&self.id)
{
inner.state.clone()
Expand Down Expand Up @@ -113,7 +138,7 @@ impl Delivery {
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))
.get_mut(&self.id)
{
if let Some(st) = self.check_inner(inner) {
Expand All @@ -134,7 +159,7 @@ impl Delivery {
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))
.get_mut(&self.id)
{
if inner.settled {
Expand All @@ -159,19 +184,20 @@ impl Delivery {
// return clone of terminal state
Some(Ok(Some(st.clone())))
}
} else if let Some(ref err) = inner.error {
Some(Err(err.clone()))
} else {
None
inner.error.as_ref().map(|err| Err(err.clone()))
}
}
}

impl Drop for Delivery {
fn drop(&mut self) {
let inner = self.session.inner.get_mut();
let deliveries = inner.unsettled_deliveries(self.is_set(Flags::SENDER));

if deliveries.contains_key(&self.id) {
deliveries.remove(&self.id);

if inner.unsettled_deliveries.contains_key(&self.id) {
if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) {
let err = Error::build()
.condition(ErrorCondition::Custom(Symbol(Str::Static(
Expand All @@ -193,8 +219,6 @@ impl Drop for Delivery {
}));
inner.post_frame(disp.into());
}

inner.unsettled_deliveries.remove(&self.id);
}
}
}
Expand Down Expand Up @@ -269,26 +293,24 @@ impl DeliveryBuilder {

if let Some(ref err) = inner.error {
Err(err.clone())
} else if inner
.max_message_size
.map(|l| self.data.len() > l as usize)
.unwrap_or_default()
{
Err(AmqpProtocolError::BodyTooLarge)
} else {
if inner
.max_message_size
.map(|l| self.data.len() > l as usize)
.unwrap_or_default()
{
Err(AmqpProtocolError::BodyTooLarge)
} else {
let id = self.sender.get_mut().send(self.data, self.tag).await?;
let id = self.sender.get_mut().send(self.data, self.tag).await?;

Ok(Delivery {
id,
session: self.sender.get_ref().session.clone(),
flags: StdCell::new(if self.settled {
Flags::SENDER | Flags::LOCAL_SETTLED
} else {
Flags::SENDER
}),
})
}
Ok(Delivery {
id,
session: self.sender.get_ref().session.clone(),
flags: StdCell::new(if self.settled {
Flags::SENDER | Flags::LOCAL_SETTLED
} else {
Flags::SENDER
}),
})
}
}
}
Loading

0 comments on commit efd8a84

Please sign in to comment.