Skip to content

Commit

Permalink
feat: Prepare protocol for multiple task follow
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Feb 21, 2025
1 parent 3df8c55 commit 36773bb
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 33 deletions.
17 changes: 12 additions & 5 deletions pueue/src/client/commands/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pueue_lib::{
Response,
client::Client,
log::{get_log_file_handle, get_log_path, seek_to_last_lines},
network::message::StreamRequest,
network::message::{StreamRequest, TaskSelection},
};
use tokio::time::sleep;

Expand Down Expand Up @@ -51,18 +51,25 @@ pub async fn remote_follow(
task_id: Option<usize>,
lines: Option<usize>,
) -> Result<()> {
let task_ids = task_id.map(|id| vec![id]).unwrap_or_default();

// Request the log stream.
client
.send_request(StreamRequest { task_id, lines })
.send_request(StreamRequest {
tasks: TaskSelection::TaskIds(task_ids),
lines,
})
.await?;

// Receive the stream until the connection is closed, breaks or another failure appears.
loop {
let response = client.receive_response().await?;
match response {
Response::Stream(text) => {
print!("{text}");
io::stdout().flush().unwrap();
Response::Stream(response) => {
for (_, text) in response.logs {
print!("{text}");
io::stdout().flush().unwrap();
}
continue;
}
Response::Close => break,
Expand Down
64 changes: 38 additions & 26 deletions pueue/src/daemon/network/message_handler/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,42 @@ pub async fn follow_log(
// The user can specify the id of the task they want to follow
// If the id isn't specified and there's only a single running task, this task will be used.
// However, if there are multiple running tasks, the user will have to specify an id.
let task_id = if let Some(task_id) = message.task_id {
task_id
//
// NOTE: Read the docs of [StreamRequest] why we're not yet supporting selection of multiple
// tasks yet.
let task_id = if let TaskSelection::TaskIds(task_ids) = message.tasks {
task_ids.first().cloned()
} else {
// Get all ids of running tasks
let state = state.lock().unwrap();
let running_ids: Vec<_> = state
.tasks()
.iter()
.filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None })
.collect();

// Return a message on "no" or multiple running tasks.
match running_ids.len() {
0 => {
return Ok(create_failure_response("There are no running tasks."));
}
1 => running_ids[0],
_ => {
let running_ids = running_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
return Ok(create_failure_response(format!(
"Multiple tasks are running, please select one of the following: {running_ids}"
)));
None
};

let task_id = match task_id {
Some(id) => id,
None => {
// Get all ids of running tasks
let state = state.lock().unwrap();
let running_ids: Vec<_> = state
.tasks()
.iter()
.filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None })
.collect();

// Return a message on "no" or multiple running tasks.
match running_ids.len() {
0 => {
return Ok(create_failure_response("There are no running tasks."));

Check warning on line 96 in pueue/src/daemon/network/message_handler/log.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/network/message_handler/log.rs#L96

Added line #L96 was not covered by tests
}
1 => running_ids[0],
_ => {
let running_ids = running_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
return Ok(create_failure_response(format!(
"Multiple tasks are running, please select one of the following: {running_ids}"
)));

Check warning on line 107 in pueue/src/daemon/network/message_handler/log.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/network/message_handler/log.rs#L100-L107

Added lines #L100 - L107 were not covered by tests
}
}
}
};
Expand Down Expand Up @@ -159,8 +169,10 @@ pub async fn follow_log(

// Only send a message, if there's actual new content.
if !text.is_empty() {
let mut logs = BTreeMap::new();
logs.insert(task_id, text);
// Send the next chunk.
let response = Response::Stream(text);
let response = Response::Stream(StreamResponse { logs });
send_response(response, stream).await?;
}

Expand Down
1 change: 1 addition & 0 deletions pueue_lib/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The concept of SemVer is applied to the daemon/client API, but not the library A
### Changed

- Streamline all `Request` and `Response` variant names and struct names used in unit variant.
- Prepare `Request::Stream` and `Response::Stream` to be compatible with multiple follow tasks in the scope of [#614](https://github.com/Nukesor/pueue/issues/614).

## [0.28.1] - 2025-02-17

Expand Down
9 changes: 8 additions & 1 deletion pueue_lib/src/network/message/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,16 @@ pub enum ShutdownRequest {
}
impl_into_request!(ShutdownRequest, Request::DaemonShutdown);

/// Request the live streaming of a set of running tasks.
///
/// **WARNING**:
/// Even though this type currently accepts a TaskSelection, only
/// `TaskSelection::TaskIds(vec![])` and `TaskSelection::TaskIds(vec![id])` are accepted.
/// We already use this format in preparation for <https://github.com/Nukesor/pueue/issues/614>
/// That way we can stay forwards compatible without having to break the API.
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct StreamRequest {
pub task_id: Option<usize>,
pub tasks: TaskSelection,
pub lines: Option<usize>,
}
impl_into_request!(StreamRequest, Request::Stream);
Expand Down
11 changes: 10 additions & 1 deletion pueue_lib/src/network/message/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub enum Response {
Group(GroupResponse),

/// The next chunk of output, that's send to the client.
Stream(String),
Stream(StreamResponse),

Success(String),
Failure(String),
Expand Down Expand Up @@ -116,3 +116,12 @@ pub struct GroupResponse {
pub groups: BTreeMap<String, Group>,
}
impl_into_response!(GroupResponse, Response::Group);

/// Live log output returned by the daemon.
///
/// The logs are ordered by task id.
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct StreamResponse {
pub logs: BTreeMap<usize, String>,
}
impl_into_response!(StreamResponse, Response::Stream);

0 comments on commit 36773bb

Please sign in to comment.