Skip to content

Commit

Permalink
Add syscall-free & cas-ony buffering send variant
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Nov 25, 2023
1 parent 18afbb6 commit 87030a4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
18 changes: 16 additions & 2 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,19 @@ impl<T> Sender<T> {
})
}

pub fn send_buffered(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send_buffered(msg, None),
SenderFlavor::List(chan) => chan.send_buffered(msg, None),
// n/a duo to the synchronized req between senders and receivers.
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}

/// Waits for a message to be sent into the channel, but only for a limited time.
///
/// If the channel is full and not disconnected, this call will block until the send operation
Expand Down Expand Up @@ -1512,8 +1525,9 @@ impl<T> SelectHandle for Receiver<T> {
/// Writes a message into the channel.
pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
SenderFlavor::Array(chan) => chan.write(token, msg, true),
SenderFlavor::List(chan) => chan.write(token, msg, true),
// n/a duo to the synchronized req between senders and receivers.
SenderFlavor::Zero(chan) => chan.write(token, msg),
}
}
Expand Down
33 changes: 27 additions & 6 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<T> Channel<T> {
}

/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify_buffered: bool) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.array.slot.is_null() {
return Err(msg);
Expand All @@ -223,7 +223,9 @@ impl<T> Channel<T> {
slot.stamp.store(token.array.stamp, Ordering::Release);

// Wake a sleeping receiver.
self.receivers.notify();
if notify_buffered {
self.receivers.notify();
}
Ok(())
}

Expand Down Expand Up @@ -322,25 +324,25 @@ impl<T> Channel<T> {
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
unsafe { self.write(token, msg, true).map_err(TrySendError::Disconnected) }
} else {
Err(TrySendError::Full(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
fn send_internal(
&self,
msg: T,
deadline: Option<Instant>,
notify_buffered: bool,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if self.start_send(token) {
let res = unsafe { self.write(token, msg) };
let res = unsafe { self.write(token, msg, notify_buffered) };
return res.map_err(SendTimeoutError::Disconnected);
}

Expand Down Expand Up @@ -381,6 +383,25 @@ impl<T> Channel<T> {
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
self.send_internal(msg, deadline, true)
}

/// Sends a message into the channel.
pub(crate) fn send_buffered(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
self.send_internal(msg, deadline, false)
}


/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
Expand Down
22 changes: 19 additions & 3 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl<T> Channel<T> {
}

/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify_buffered: bool) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
Expand All @@ -292,7 +292,9 @@ impl<T> Channel<T> {
slot.state.fetch_or(WRITE, Ordering::Release);

// Wake a sleeping receiver.
self.receivers.notify();
if notify_buffered {
self.receivers.notify();
}
Ok(())
}

Expand Down Expand Up @@ -423,7 +425,21 @@ impl<T> Channel<T> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
self.write(token, msg)
self.write(token, msg, true)
.map_err(SendTimeoutError::Disconnected)
}
}

/// Sends a message into the channel.
pub(crate) fn send_buffered(
&self,
msg: T,
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
self.write(token, msg, false)
.map_err(SendTimeoutError::Disconnected)
}
}
Expand Down

0 comments on commit 87030a4

Please sign in to comment.