Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/async support #95

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions bbqtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ crossbeam-utils = "0.7"
crossbeam = "0.7"
heapless = "0.7"
cfg-if = "0.1"
futures = "0.3"


[[bench]]
name = "benches"
Expand Down
175 changes: 175 additions & 0 deletions bbqtest/src/async_framed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#[cfg(test)]
mod tests {

use bbqueue::BBBuffer;
use futures::executor::block_on;

#[test]
fn frame_wrong_size() {
block_on(async {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like some of these tests are invoking the sync and not async versions (grant instead of grant_async). Is that intended?

let bb: BBBuffer<256> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split_framed().unwrap();

// Create largeish grants
let mut wgr = prod.grant(127).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
// Note: In debug mode, this hits a debug_assert
wgr.commit(256);

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 127);
for (i, by) in rgr.iter().enumerate() {
assert_eq!((i as u8), *by);
}
rgr.release();
});
}

#[test]
fn full_size() {
block_on(async {
let bb: BBBuffer<256> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split_framed().unwrap();
let mut ctr = 0;

for _ in 0..10_000 {
// Create largeish grants
if let Ok(mut wgr) = prod.grant(127) {
ctr += 1;
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
wgr.commit(127);

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 127);
for (i, by) in rgr.iter().enumerate() {
assert_eq!((i as u8), *by);
}
rgr.release();
} else {
// Create smallish grants
let mut wgr = prod.grant(1).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
wgr.commit(1);

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 1);
for (i, by) in rgr.iter().enumerate() {
assert_eq!((i as u8), *by);
}
rgr.release();
};
}

assert!(ctr > 1);
});
}

#[test]
fn frame_overcommit() {
block_on(async {
let bb: BBBuffer<256> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split_framed().unwrap();

// Create largeish grants
let mut wgr = prod.grant(128).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
wgr.commit(255);

let mut wgr = prod.grant(64).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = (i as u8) + 128;
}
wgr.commit(127);

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 128);
rgr.release();

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 64);
rgr.release();
});
}

#[test]
fn frame_undercommit() {
block_on(async {
let bb: BBBuffer<512> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split_framed().unwrap();

for _ in 0..100_000 {
// Create largeish grants
let mut wgr = prod.grant(128).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
wgr.commit(13);

let mut wgr = prod.grant(64).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = (i as u8) + 128;
}
wgr.commit(7);

let mut wgr = prod.grant(32).unwrap();
for (i, by) in wgr.iter_mut().enumerate() {
*by = (i as u8) + 192;
}
wgr.commit(0);

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 13);
rgr.release();

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 7);
rgr.release();

let rgr = cons.read().unwrap();
assert_eq!(rgr.len(), 0);
rgr.release();
}
});
}

#[test]
fn frame_auto_commit_release() {
block_on(async {
let bb: BBBuffer<256> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split_framed().unwrap();

for _ in 0..100 {
{
let mut wgr = prod.grant(64).unwrap();
wgr.to_commit(64);
for (i, by) in wgr.iter_mut().enumerate() {
*by = i as u8;
}
// drop
}

{
let mut rgr = cons.read().unwrap();
rgr.auto_release(true);
let rgr = rgr;

for (i, by) in rgr.iter().enumerate() {
assert_eq!(*by, i as u8);
}
assert_eq!(rgr.len(), 64);
// drop
}
}

assert!(cons.read().is_none());
});
}
}
169 changes: 169 additions & 0 deletions bbqtest/src/async_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#[cfg(test)]
mod tests {
use bbqueue::BBBuffer;
use bbqueue::Error;
use futures::{executor::block_on, future::join};

#[test]
fn test_read() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();

{
let mut grant = prod.grant_exact(4).unwrap();
let buf = grant.buf();
buf[0] = 0xDE;
buf[1] = 0xAD;
buf[2] = 0xC0;
buf[3] = 0xDE;
grant.commit(4);
}

let r_grant = block_on(cons.read_async()).unwrap();

assert_eq!(4, r_grant.len());
assert_eq!(r_grant[0], 0xDE);
assert_eq!(r_grant[1], 0xAD);
assert_eq!(r_grant[2], 0xC0);
assert_eq!(r_grant[3], 0xDE);
}

#[test]
fn test_write() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();

let mut w_grant = block_on(prod.grant_exact_async(4)).unwrap();
assert_eq!(4, w_grant.len());
w_grant[0] = 0xDE;
w_grant[1] = 0xAD;
w_grant[2] = 0xC0;
w_grant[3] = 0xDE;
w_grant.commit(4);

let grant = cons.read().unwrap();
let rx_buf = grant.buf();
assert_eq!(4, rx_buf.len());
assert_eq!(rx_buf[0], 0xDE);
assert_eq!(rx_buf[1], 0xAD);
assert_eq!(rx_buf[2], 0xC0);
assert_eq!(rx_buf[3], 0xDE);
}

#[test]
fn test_read_after_write() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();

let read_fut = async {
let r_grant = cons.read_async().await.unwrap();
r_grant.release(4);

let time = std::time::Instant::now(); // TODO: Remove time dependence in test
#[cfg(feature = "verbose")]
println!("Read completed at {:?}", time);
time
};

let write_fut = async {
let mut w_grant = prod.grant_exact_async(4).await.unwrap();
w_grant[0] = 0xDE;
w_grant[1] = 0xAD;
w_grant[2] = 0xC0;
w_grant[3] = 0xDE;
w_grant.commit(4);

let time = std::time::Instant::now(); // TODO: Remove time dependence in test
#[cfg(feature = "verbose")]
println!("Write completed at {:?}", time);
time
};

let (r_time, w_time) = block_on(join(read_fut, write_fut));
assert!(r_time > w_time)
}

#[test]
fn grant_exact_too_big() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut _cons) = bb.try_split().unwrap();
let w_grant_res = block_on(async { prod.grant_exact_async(8).await });

assert_eq!(w_grant_res.unwrap_err(), Error::InsufficientSize);
}

#[test]
fn grant_exact_loop() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();
let w_grant = prod.grant_exact(4).unwrap();
w_grant.commit(4);

let read_fut = async {
let r_grant = cons.read_async().await.unwrap();
r_grant.release(4);

let time = std::time::Instant::now(); // TODO: Remove time dependence in test
#[cfg(feature = "verbose")]
println!("Read completed at {:?}", time);
time
};

let write_fut = async {
let w_grant = prod.grant_exact_async(3).await.unwrap();
w_grant.commit(4);

let time = std::time::Instant::now(); // TODO: Remove time dependence in test
#[cfg(feature = "verbose")]
println!("Write completed at {:?}", time);
time
};

let (w_time, r_time) = block_on(join(write_fut, read_fut));
assert!(r_time < w_time);
}

#[test]
fn grant_exact_loop_too_big() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();
let w_grant = prod.grant_exact(4).unwrap();
w_grant.commit(4);

let read_fut = async {
let r_grant = cons.read_async().await.unwrap();
r_grant.release(4);
};

let write_fut = async {
let w_grant = prod.grant_exact_async(4).await;
assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize);
};

block_on(join(write_fut, read_fut));
}

#[test]
fn write_cancelled() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();
let w_grant_fut = prod.grant_exact_async(6);
drop(w_grant_fut);
let r_grant = cons.read();
assert_eq!(r_grant.unwrap_err(), Error::InsufficientSize);
}

#[test]
fn read_cancelled() {
let bb: BBBuffer<6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();
let w_grant = prod.grant_exact(6).unwrap();
w_grant.commit(6);

let r_grant_fut = cons.read_async();
drop(r_grant_fut);

let w_grant = prod.grant_max_remaining(4);
assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize);
}
}
2 changes: 2 additions & 0 deletions bbqtest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! NOTE: this crate is really just a shim for testing
//! the other no-std crate.

mod async_framed;
mod async_usage;
mod framed;
mod multi_thread;
mod ring_around_the_senders;
Expand Down
Loading