diff --git a/packages/server/src/target/build.rs b/packages/server/src/target/build.rs index 64ae4dc73..4cf541d70 100644 --- a/packages/server/src/target/build.rs +++ b/packages/server/src/target/build.rs @@ -116,72 +116,127 @@ impl Server { return Ok(Some(output)); } - // Get a remote build if one exists that satisfies the retry constraint. - 'a: { - // Find a build. - let futures = self - .get_remote_clients() - .await? - .into_values() - .map(|client| { - let arg = arg.clone(); - Box::pin(async move { - let arg = tg::target::build::Arg { - create: false, + // Create a build id for the local build, in order to avoid borrow checking errors when canceling in the case that a remote returns first. + let build_id = tg::build::Id::new(); + + // Create futures. + let local = self.try_create_local_build(build_id.clone(), id.clone(), arg.clone()); + let remote = self.try_get_remote_build(id.clone(), arg.clone()); + + // Race the local/remote builds. + let build = match future::select(std::pin::pin!(local), std::pin::pin!(remote)).await { + future::Either::Left((local, remote)) => { + if let Ok(Some(local)) = local { + Some(local) + } else { + remote.await? + } + }, + future::Either::Right((remote, local)) => { + if let Ok(Some(build)) = remote { + // Cancel the local build in the case that the remote won the race. + let server = self.clone(); + tokio::spawn(async move { + let arg = tg::build::finish::Arg { + status: tg::build::Status::Canceled, + error: None, + output: None, remote: None, - ..arg.clone() }; - let tg::target::build::Output { build } = - client.build_target(id, arg).await?; - let build = tg::Build::with_id(build); - Ok::<_, tg::Error>(Some((build, client))) - }) - }) - .collect_vec(); + server.try_finish_build(&build_id, arg).await.ok(); + }); + Some(build) + } else { + local.await? + } + }, + }; - // Wait for the first build. - if futures.is_empty() { - break 'a; - } - let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else { - break 'a; - }; + // Bail if no build was found/spawned. + let Some(build) = build else { + return Ok(None); + }; - // Add the build as a child of the parent. - if let Some(parent) = arg.parent.as_ref() { - self.try_add_build_child(parent, build.id()).await.map_err( - |source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"), - )?; - } + // Add the build as a child of the parent. + if let Some(parent) = arg.parent.as_ref() { + self.try_add_build_child(parent, build.id()).await.map_err( + |source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"), + )?; + } - // Touch the build. - tokio::spawn({ - let server = self.clone(); - let build = build.clone(); - async move { - let arg = tg::build::touch::Arg { remote: None }; - server.touch_build(build.id(), arg).await.ok(); - } - }); + // Create the output. + let output = tg::target::build::Output { + build: build.id().clone(), + }; + Ok(Some(output)) + } - // Create the output. - let output = tg::target::build::Output { - build: build.id().clone(), - }; + async fn try_get_remote_build( + &self, + id: tg::target::Id, + arg: tg::target::build::Arg, + ) -> tg::Result> { + // Find a build. + let futures = self + .get_remote_clients() + .await? + .into_values() + .map(|client| { + let arg = arg.clone(); + let id = id.clone(); + Box::pin(async move { + let arg = tg::target::build::Arg { + create: false, + remote: None, + ..arg.clone() + }; + let tg::target::build::Output { build } = client.build_target(&id, arg).await?; + let build = tg::Build::with_id(build); + Ok::<_, tg::Error>(Some((build, client))) + }) + }) + .collect_vec(); - return Ok(Some(output)); + // Wait for the first build. + if futures.is_empty() { + return Ok(None); + } + let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else { + return Ok(None); }; - // If the create arg is false, then return `None`. + // Add the build as a child of the parent. + if let Some(parent) = arg.parent.as_ref() { + self.try_add_build_child(parent, build.id()).await.map_err( + |source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"), + )?; + } + + // Touch the build. + tokio::spawn({ + let server = self.clone(); + let build = build.clone(); + async move { + let arg = tg::build::touch::Arg { remote: None }; + server.touch_build(build.id(), arg).await.ok(); + } + }); + + Ok(Some(build)) + } + + async fn try_create_local_build( + &self, + build_id: tg::build::Id, + target_id: tg::target::Id, + arg: tg::target::build::Arg, + ) -> tg::Result> { if !arg.create { return Ok(None); } - // Otherwise, create a new build. - let build_id = tg::build::Id::new(); - // Get the host. - let target = tg::Target::with_id(id.clone()); + let target = tg::Target::with_id(target_id.clone()); let host = target.host(self).await?; // Put the build. @@ -195,7 +250,7 @@ impl Server { output: None, retry: arg.retry, status: tg::build::Status::Enqueued, - target: id.clone(), + target: target_id.clone(), created_at: time::OffsetDateTime::now_utc(), enqueued_at: Some(time::OffsetDateTime::now_utc()), dequeued_at: None, @@ -215,13 +270,6 @@ impl Server { )?; } - // Add the build to the parent. - if let Some(parent) = arg.parent.as_ref() { - self.try_add_build_child(parent, build.id()).await.map_err( - |source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"), - )?; - } - // Publish the message. tokio::spawn({ let server = self.clone(); @@ -236,35 +284,35 @@ impl Server { }); // Spawn a task to spawn the build when the parent's permit is available. - let server = self.clone(); - let parent = arg.parent.clone(); - let build = build.clone(); - tokio::spawn(async move { - // Acquire the parent's permit. - let Some(permit) = parent.as_ref().and_then(|parent| { + tokio::spawn({ + let server = self.clone(); + let parent = arg.parent.clone(); + let build = build.clone(); + async move { + // Acquire the parent's permit. + let Some(permit) = parent.as_ref().and_then(|parent| { + server + .build_permits + .get(parent) + .map(|permit| permit.clone()) + }) else { + return; + }; + let permit = permit + .lock_owned() + .map(|guard| BuildPermit(Either::Right(guard))) + .await; + + // Attempt to spawn the build. server - .build_permits - .get(parent) - .map(|permit| permit.clone()) - }) else { - return; - }; - let permit = permit - .lock_owned() - .map(|guard| BuildPermit(Either::Right(guard))) - .await; - - // Attempt to spawn the build. - server - .spawn_build(build, permit, None) - .await - .inspect_err(|error| tracing::error!(?error, "failed to spawn the build")) - .ok(); + .spawn_build(build, permit, None) + .await + .inspect_err(|error| tracing::error!(?error, "failed to spawn the build")) + .ok(); + } }); - let output = tg::target::build::Output { build: build_id }; - - Ok(Some(output)) + Ok(Some(build)) } async fn detect_build_cycle(