Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a SourcesIdQueueInput that allows manipulating items in the queue #506

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 207 additions & 12 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ use crossbeam_channel::{unbounded as channel, Receiver, Sender};
#[cfg(not(feature = "crossbeam-channel"))]
use std::sync::mpsc::{channel, Receiver, Sender};

type BoxedSource<S> = Box<dyn Source<Item = S> + Send>;
type QueueNextItem<S> = (BoxedSource<S>, Option<Sender<()>>);

trait InputQueue<S> {
fn keep_alive_if_empty(&self) -> bool;
fn has_next(&self) -> bool;
fn next(&self) -> Option<QueueNextItem<S>>;
}

/// Builds a new queue. It consists of an input and an output.
///
/// The input can be used to add sounds to the end of the queue, while the output implements
Expand Down Expand Up @@ -41,11 +50,32 @@ where
(input, output)
}

pub fn id_queue<S, I>(
keep_alive_if_empty: bool,
) -> (Arc<SourcesIdQueueInput<S, I>>, SourcesQueueOutput<S>)
where
S: Sample + Send + 'static,
I: Eq + PartialEq + Send + 'static,
{
let input = Arc::new(SourcesIdQueueInput {
next_sounds: Mutex::new(Vec::new()),
keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
});

let output = SourcesQueueOutput {
current: Box::new(Empty::<S>::new()) as Box<_>,
signal_after_end: None,
input: input.clone(),
};

(input, output)
}

// TODO: consider reimplementing this with `from_factory`

/// The input of the queue.
pub struct SourcesQueueInput<S> {
next_sounds: Mutex<Vec<(Box<dyn Source<Item = S> + Send>, Option<Sender<()>>)>>,
next_sounds: Mutex<Vec<QueueNextItem<S>>>,

// See constructor.
keep_alive_if_empty: AtomicBool,
Expand Down Expand Up @@ -101,6 +131,123 @@ where
len
}
}

impl<S> InputQueue<S> for SourcesQueueInput<S>
where
S: Sample + Send + 'static,
{
fn keep_alive_if_empty(&self) -> bool {
self.keep_alive_if_empty.load(Ordering::Acquire)
}

fn has_next(&self) -> bool {
self.next_sounds.lock().unwrap().len() > 0
}

fn next(&self) -> Option<QueueNextItem<S>> {
let mut next = self.next_sounds.lock().unwrap();
if next.len() > 0 {
Some(next.remove(0))
} else {
None
}
}
}

/// A queue input that can associate ids with each item. This allows the queue to be used more like
/// a playlist where items can be removed and/or reordered
pub struct SourcesIdQueueInput<S, I> {
next_sounds: Mutex<Vec<(I, QueueNextItem<S>)>>,

// See constructor.
keep_alive_if_empty: AtomicBool,
}

impl<S, I> SourcesIdQueueInput<S, I>
where
S: Sample + Send + 'static,
I: Eq + PartialEq,
{
/// Adds a new source to the end of the queue.
#[inline]
pub fn append<T>(&self, id: I, source: T)
where
T: Source<Item = S> + Send + 'static,
{
self.next_sounds
.lock()
.unwrap()
.push((id, (Box::new(source) as Box<_>, None)));
}

/// Adds a new source to the end of the queue.
///
/// The `Receiver` will be signalled when the sound has finished playing.
///
/// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
#[inline]
pub fn append_with_signal<T>(&self, id: I, source: T) -> Receiver<()>
where
T: Source<Item = S> + Send + 'static,
{
let (tx, rx) = channel();
self.next_sounds
.lock()
.unwrap()
.push((id, (Box::new(source) as Box<_>, Some(tx))));
rx
}

/// Remove the item with id `id` from the queue
#[inline]
pub fn remove(&self, id: I) {
let mut next = self.next_sounds.lock().unwrap();
next.retain(|i| i.0 != id);
}

/// Swap item having id `id_a` with the item having id `id_b`. If either item does not exist,
/// this is a no-op
pub fn swap(&self, id_a: I, id_b: I) {
let mut next_sounds = self.next_sounds.lock().unwrap();
let mut index_a = None;
let mut index_b = None;
let mut p = 0;
for (id, _) in next_sounds.iter() {
if index_a.is_none() && *id == id_a {
index_a = Some(p);
} else if index_b.is_none() && *id == id_b {
index_b = Some(p);
}
p += 1;
}
if let (Some(index_a), Some(index_b)) = (index_a, index_b) {
next_sounds.swap(index_a, index_b);
}
}
}

impl<S, I> InputQueue<S> for SourcesIdQueueInput<S, I>
where
S: Sample + Send + 'static,
{
fn keep_alive_if_empty(&self) -> bool {
self.keep_alive_if_empty.load(Ordering::Acquire)
}

fn has_next(&self) -> bool {
self.next_sounds.lock().unwrap().len() > 0
}

fn next(&self) -> Option<QueueNextItem<S>> {
let mut next = self.next_sounds.lock().unwrap();
if next.len() > 0 {
Some(next.remove(0).1)
} else {
None
}
}
}

/// The output of the queue. Implements `Source`.
pub struct SourcesQueueOutput<S> {
// The current iterator that produces samples.
Expand All @@ -110,7 +257,7 @@ pub struct SourcesQueueOutput<S> {
signal_after_end: Option<Sender<()>>,

// The next sounds.
input: Arc<SourcesQueueInput<S>>,
input: Arc<dyn InputQueue<S> + Send + Sync>,
}

const THRESHOLD: usize = 512;
Expand All @@ -135,9 +282,7 @@ where
if let Some(val) = self.current.current_frame_len() {
if val != 0 {
return Some(val);
} else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
&& self.input.next_sounds.lock().unwrap().is_empty()
{
} else if self.input.keep_alive_if_empty() && self.input.has_next() {
// The next source will be a filler silence which will have the length of `THRESHOLD`
return Some(THRESHOLD);
}
Expand Down Expand Up @@ -212,19 +357,16 @@ where
let _ = signal_after_end.send(());
}

let (next, signal_after_end) = {
let mut next = self.input.next_sounds.lock().unwrap();

if next.len() == 0 {
let (next, signal_after_end) = match self.input.next() {
Some(t) => t,
None => {
let silence = Box::new(Zero::<S>::new_samples(1, 44100, THRESHOLD)) as Box<_>;
if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
if self.input.keep_alive_if_empty() {
// Play a short silence in order to avoid spinlocking.
(silence, None)
} else {
return Err(());
}
} else {
next.remove(0)
}
};

Expand Down Expand Up @@ -263,6 +405,59 @@ mod tests {
assert_eq!(rx.next(), None);
}

#[test]
fn simple_id() {
let (tx, mut rx) = queue::id_queue(false);
let s1 = "sb1".to_string();
let s2 = "sb2".to_string();
let v1 = vec![10i16, -10, 10, -10];
let v2 = vec![10i16, -9, 9, -9];
tx.append(s1, SamplesBuffer::new(1, 48000, v1.clone()));
tx.append(s2, SamplesBuffer::new(1, 48000, v2.clone()));
assert_eq!(rx.channels(), 1);
assert_eq!(rx.sample_rate(), 48000);
for v in v1.into_iter().chain(v2.into_iter()) {
assert_eq!(rx.next(), Some(v));
}
}

#[test]
fn id_with_remove() {
let (tx, mut rx) = queue::id_queue(false);
let s1 = "sb1".to_string();
let s2 = "sb2".to_string();
tx.append(s1, SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
tx.append(s2, SamplesBuffer::new(1, 48000, vec![10i16, -9, 9, -9]));
assert_eq!(rx.channels(), 1);
assert_eq!(rx.sample_rate(), 48000);
assert_eq!(rx.next(), Some(10));
assert_eq!(rx.next(), Some(-10));
tx.remove("sb2".to_string());
assert_eq!(rx.next(), Some(10));
assert_eq!(rx.next(), Some(-10));
assert_eq!(rx.next(), None);
}

#[test]
fn id_swap() {
let (tx, mut rx) = queue::id_queue(false);
let s1 = "sb1".to_string();
let s2 = "sb2".to_string();
let s3 = "sb3".to_string();
let v1 = vec![10i16, -10, 10, -10];
let v2 = vec![10i16, -9, 9, -9];
let v3 = vec![12i16, -12, 12, -12];
tx.append(s1, SamplesBuffer::new(1, 48000, v1.clone()));
tx.append(s2.clone(), SamplesBuffer::new(1, 48000, v2.clone()));
tx.append(s3.clone(), SamplesBuffer::new(1, 48000, v3.clone()));
assert_eq!(rx.channels(), 1);
assert_eq!(rx.sample_rate(), 48000);
tx.swap(s3, s2);
for v in v1.into_iter().chain(v3.into_iter()).chain(v2.into_iter()) {
assert_eq!(rx.next(), Some(v));
}
}

#[test]
fn immediate_end() {
let (_, mut rx) = queue::queue::<i16>(false);
Expand Down