Skip to content

Commit

Permalink
Merge pull request #2150 from iced-rs/feature/command-run
Browse files Browse the repository at this point in the history
`Stream` support for `Command`
  • Loading branch information
hecrj authored Nov 29, 2023
2 parents 7e7d65a + a761448 commit 7f8b176
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
25 changes: 24 additions & 1 deletion futures/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Run commands and keep track of subscriptions.
use crate::core::event::{self, Event};
use crate::subscription;
use crate::{BoxFuture, Executor, MaybeSend};
use crate::{BoxFuture, BoxStream, Executor, MaybeSend};

use futures::{channel::mpsc, Sink};
use std::marker::PhantomData;
Expand Down Expand Up @@ -69,6 +69,29 @@ where
self.executor.spawn(future);
}

/// Runs a [`Stream`] in the [`Runtime`] until completion.
///
/// The resulting `Message`s will be forwarded to the `Sender` of the
/// [`Runtime`].
///
/// [`Stream`]: BoxStream
pub fn run(&mut self, stream: BoxStream<Message>) {
use futures::{FutureExt, StreamExt};

let sender = self.sender.clone();
let future =
stream.map(Ok).forward(sender).map(|result| match result {
Ok(()) => (),
Err(error) => {
log::warn!(
"Stream could not run until completion: {error}"
);
}
});

self.executor.spawn(future);
}

/// Tracks a [`Subscription`] in the [`Runtime`].
///
/// It will spawn new streams or close old ones as necessary! See
Expand Down
35 changes: 34 additions & 1 deletion runtime/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ mod action;
pub use action::Action;

use crate::core::widget;
use crate::futures::futures;
use crate::futures::MaybeSend;

use futures::channel::mpsc;
use futures::Stream;
use std::fmt;
use std::future::Future;

Expand Down Expand Up @@ -43,11 +46,21 @@ impl<T> Command<T> {
future: impl Future<Output = A> + 'static + MaybeSend,
f: impl FnOnce(A) -> T + 'static + MaybeSend,
) -> Command<T> {
use iced_futures::futures::FutureExt;
use futures::FutureExt;

Command::single(Action::Future(Box::pin(future.map(f))))
}

/// Creates a [`Command`] that runs the given stream to completion.
pub fn run<A>(
stream: impl Stream<Item = A> + 'static + MaybeSend,
f: impl Fn(A) -> T + 'static + MaybeSend,
) -> Command<T> {
use futures::StreamExt;

Command::single(Action::Stream(Box::pin(stream.map(f))))
}

/// Creates a [`Command`] that performs the actions of all the given
/// commands.
///
Expand Down Expand Up @@ -106,3 +119,23 @@ impl<T> fmt::Debug for Command<T> {
command.fmt(f)
}
}

/// Creates a [`Command`] that produces the `Message`s published from a [`Future`]
/// to an [`mpsc::Sender`] with the given bounds.
pub fn channel<Fut, Message>(
size: usize,
f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static,
) -> Command<Message>
where
Fut: Future<Output = ()> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
use futures::future;
use futures::stream::{self, StreamExt};

let (sender, receiver) = mpsc::channel(size);

let runner = stream::once(f(sender)).filter_map(|_| future::ready(None));

Command::single(Action::Stream(Box::pin(stream::select(receiver, runner))))
}
9 changes: 8 additions & 1 deletion runtime/src/command/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ pub enum Action<T> {
/// [`Future`]: iced_futures::BoxFuture
Future(iced_futures::BoxFuture<T>),

/// Run a [`Stream`] to completion.
///
/// [`Stream`]: iced_futures::BoxStream
Stream(iced_futures::BoxStream<T>),

/// Run a clipboard action.
Clipboard(clipboard::Action<T>),

Expand Down Expand Up @@ -52,10 +57,11 @@ impl<T> Action<T> {
A: 'static,
T: 'static,
{
use iced_futures::futures::FutureExt;
use iced_futures::futures::{FutureExt, StreamExt};

match self {
Self::Future(future) => Action::Future(Box::pin(future.map(f))),
Self::Stream(stream) => Action::Stream(Box::pin(stream.map(f))),
Self::Clipboard(action) => Action::Clipboard(action.map(f)),
Self::Window(window) => Action::Window(window.map(f)),
Self::System(system) => Action::System(system.map(f)),
Expand All @@ -74,6 +80,7 @@ impl<T> fmt::Debug for Action<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Future(_) => write!(f, "Action::Future"),
Self::Stream(_) => write!(f, "Action::Stream"),
Self::Clipboard(action) => {
write!(f, "Action::Clipboard({action:?})")
}
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ pub use crate::core::{
color, Alignment, Background, BorderRadius, Color, ContentFit, Degrees,
Gradient, Length, Padding, Pixels, Point, Radians, Rectangle, Size, Vector,
};
pub use crate::runtime::Command;

pub mod clipboard {
//! Access the clipboard.
Expand Down Expand Up @@ -239,6 +238,11 @@ pub mod mouse {
};
}

pub mod command {
//! Run asynchronous actions.
pub use crate::runtime::command::{channel, Command};
}

pub mod subscription {
//! Listen to external events in your application.
pub use iced_futures::subscription::{
Expand Down Expand Up @@ -287,6 +291,7 @@ pub mod widget {
}

pub use application::Application;
pub use command::Command;
pub use error::Error;
pub use event::Event;
pub use executor::Executor;
Expand Down
3 changes: 3 additions & 0 deletions winit/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ pub fn run_command<A, C, E>(
command::Action::Future(future) => {
runtime.spawn(future);
}
command::Action::Stream(stream) => {
runtime.run(stream);
}
command::Action::Clipboard(action) => match action {
clipboard::Action::Read(tag) => {
let message = tag(clipboard.read());
Expand Down

0 comments on commit 7f8b176

Please sign in to comment.