Skip to content

Commit

Permalink
wip: concurrent metadata fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
aochagavia committed Jan 30, 2024
1 parent d39c732 commit fe79c92
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 146 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ tracing = "0.1.37"
elsa = "1.9.0"
bitvec = "1.0.1"
serde = { version = "1.0", features = ["derive"], optional = true }
smol = "2.0.0"

[dev-dependencies]
insta = "1.31.0"
indexmap = "2.0.0"
proptest = "1.2.0"
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
static_assertions = "1.1.0"
tokio = { version = "1.35.1", features = ["rt-multi-thread", "time"] }
52 changes: 48 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ pub trait VersionSet: Debug + Display + Clone + Eq + Hash {
fn contains(&self, v: &Self::V) -> bool;
}

/// Used to send a value, possibly asynchronously, from the dependency provider to the solver.
pub struct OneShotSender<T> {
tx: Option<smol::channel::Sender<T>>,
}

impl<T> OneShotSender<T> {
pub(crate) fn new() -> (Self, smol::channel::Receiver<T>) {
let (tx, receiver) = smol::channel::unbounded();
(Self { tx: Some(tx) }, receiver)
}

/// Send a value, possibly asynchronously, from the dependency provider to the solver.
pub fn send(mut self, value: T) -> Result<(), T> {
self.tx
.take()
.unwrap()
.send_blocking(value)
.map_err(|error| error.0)
}
}

/// Defines implementation specific behavior for the solver and a way for the solver to access the
/// packages that are available in the system.
pub trait DependencyProvider<VS: VersionSet, N: PackageName = String>: Sized {
Expand All @@ -68,14 +89,28 @@ pub trait DependencyProvider<VS: VersionSet, N: PackageName = String>: Sized {
/// version the next version is tried. This continues until a solution is found.
fn sort_candidates(&self, solver: &SolverCache<VS, N, Self>, solvables: &mut [SolvableId]);

/// Returns a list of solvables that should be considered when a package with the given name is
/// Obtains a list of solvables that should be considered when a package with the given name is
/// requested.
///
/// Returns `None` if no such package exist.
fn get_candidates(&self, name: NameId) -> Option<Candidates>;
/// The result should be submitted through the provided sender, which gives the trait
/// implementor the freedom to obtain the metadata in an asynchronous way if desired. In that
/// case, the trait implementor is responsible for rate-limiting requests (e.g. by using an
/// internal semaphore) and for triggering timeouts when they take too long.
///
/// Important: if no package exists, you still need to send a value (`None`), to notify the
/// solver. Otherwise the solver will crash when [OneShotSender] is dropped.
fn get_candidates(&self, name: NameId, sender: OneShotSender<Option<Candidates>>);

/// Returns the dependencies for the specified solvable.
fn get_dependencies(&self, solvable: SolvableId) -> Dependencies;
///
/// The result should be submitted through the provided sender, which gives the trait
/// implementor the freedom to obtain the metadata in an asynchronous way if desired. In that
/// case, the trait implementor is responsible for rate-limiting requests (e.g. using an
/// internal semaphore) and for triggering timeouts when they take too long.
///
/// Important: make sure to always send a value. Otherwise the solver will crash when
/// [OneShotSender] is dropped.
fn get_dependencies(&self, solvable: SolvableId, sender: OneShotSender<Dependencies>);

/// Whether the solver should stop the dependency resolution algorithm.
///
Expand Down Expand Up @@ -180,3 +215,12 @@ where
.join(" | ")
}
}

#[cfg(test)]
mod test {
use super::*;
use static_assertions::assert_impl_any;

assert_impl_any!(OneShotSender<Option<Candidates>>: Send, Sync);
assert_impl_any!(OneShotSender<Dependencies>: Send, Sync);
}
4 changes: 2 additions & 2 deletions src/problem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Problem {
let unresolved_node = graph.add_node(ProblemNode::UnresolvedDependency);

for clause_id in &self.clauses {
let clause = &solver.clauses[*clause_id].kind;
let clause = &solver.clauses.borrow()[*clause_id].kind;
match clause {
Clause::InstallRoot => (),
Clause::Excluded(solvable, reason) => {
Expand All @@ -65,7 +65,7 @@ impl Problem {
&Clause::Requires(package_id, version_set_id) => {
let package_node = Self::add_node(&mut graph, &mut nodes, package_id);

let candidates = solver.cache.get_or_cache_sorted_candidates(version_set_id).unwrap_or_else(|_| {
let candidates = smol::block_on(solver.cache.get_or_cache_sorted_candidates(version_set_id)).unwrap_or_else(|_| {
unreachable!("The version set was used in the solver, so it must have been cached. Therefore cancellation is impossible here and we cannot get an `Err(...)`")
});
if candidates.is_empty() {
Expand Down
40 changes: 25 additions & 15 deletions src/solver/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
frozen_copy_map::FrozenCopyMap,
id::{CandidatesId, DependenciesId},
},
Candidates, Dependencies, DependencyProvider, NameId, PackageName, Pool, SolvableId,
VersionSet, VersionSetId,
Candidates, Dependencies, DependencyProvider, NameId, OneShotSender, PackageName, Pool,
SolvableId, VersionSet, VersionSetId,
};
use bitvec::vec::BitVec;
use elsa::FrozenMap;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
///
/// If the provider has requested the solving process to be cancelled, the cancellation value
/// will be returned as an `Err(...)`.
pub fn get_or_cache_candidates(
pub async fn get_or_cache_candidates(
&self,
package_name: NameId,
) -> Result<&Candidates, Box<dyn Any>> {
Expand All @@ -90,9 +90,12 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
}

// Otherwise we have to get them from the DependencyProvider
let candidates = self
.provider
.get_candidates(package_name)
let (tx, candidates_receiver) = OneShotSender::new();
self.provider.get_candidates(package_name, tx);
let candidates = candidates_receiver
.recv()
.await
.expect("bug: dependency provider failed to send candidates")
.unwrap_or_default();

// Store information about which solvables dependency information is easy to
Expand Down Expand Up @@ -126,7 +129,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
///
/// If the provider has requested the solving process to be cancelled, the cancellation value
/// will be returned as an `Err(...)`.
pub fn get_or_cache_matching_candidates(
pub async fn get_or_cache_matching_candidates(
&self,
version_set_id: VersionSetId,
) -> Result<&[SolvableId], Box<dyn Any>> {
Expand All @@ -135,7 +138,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
None => {
let package_name = self.pool().resolve_version_set_package_name(version_set_id);
let version_set = self.pool().resolve_version_set(version_set_id);
let candidates = self.get_or_cache_candidates(package_name)?;
let candidates = self.get_or_cache_candidates(package_name).await?;

let matching_candidates = candidates
.candidates
Expand All @@ -158,7 +161,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
///
/// If the provider has requested the solving process to be cancelled, the cancellation value
/// will be returned as an `Err(...)`.
pub fn get_or_cache_non_matching_candidates(
pub async fn get_or_cache_non_matching_candidates(
&self,
version_set_id: VersionSetId,
) -> Result<&[SolvableId], Box<dyn Any>> {
Expand All @@ -167,7 +170,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
None => {
let package_name = self.pool().resolve_version_set_package_name(version_set_id);
let version_set = self.pool().resolve_version_set(version_set_id);
let candidates = self.get_or_cache_candidates(package_name)?;
let candidates = self.get_or_cache_candidates(package_name).await?;

let matching_candidates = candidates
.candidates
Expand All @@ -191,16 +194,18 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
///
/// If the provider has requested the solving process to be cancelled, the cancellation value
/// will be returned as an `Err(...)`.
pub fn get_or_cache_sorted_candidates(
pub async fn get_or_cache_sorted_candidates(
&self,
version_set_id: VersionSetId,
) -> Result<&[SolvableId], Box<dyn Any>> {
match self.version_set_to_sorted_candidates.get(&version_set_id) {
Some(candidates) => Ok(candidates),
None => {
let package_name = self.pool().resolve_version_set_package_name(version_set_id);
let matching_candidates = self.get_or_cache_matching_candidates(version_set_id)?;
let candidates = self.get_or_cache_candidates(package_name)?;
let matching_candidates = self
.get_or_cache_matching_candidates(version_set_id)
.await?;
let candidates = self.get_or_cache_candidates(package_name).await?;

// Sort all the candidates in order in which they should be tried by the solver.
let mut sorted_candidates = Vec::new();
Expand Down Expand Up @@ -228,7 +233,7 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
///
/// If the provider has requested the solving process to be cancelled, the cancellation value
/// will be returned as an `Err(...)`.
pub fn get_or_cache_dependencies(
pub async fn get_or_cache_dependencies(
&self,
solvable_id: SolvableId,
) -> Result<&Dependencies, Box<dyn Any>> {
Expand All @@ -242,7 +247,12 @@ impl<VS: VersionSet, N: PackageName, D: DependencyProvider<VS, N>> SolverCache<V
return Err(value);
}

let dependencies = self.provider.get_dependencies(solvable_id);
let (tx, dependencies_receiver) = OneShotSender::new();
self.provider.get_dependencies(solvable_id, tx);
let dependencies = dependencies_receiver
.recv()
.await
.expect("bug: dependency provider failed to send dependencies");
let dependencies_id = self.solvable_dependencies.alloc(dependencies);
self.solvable_to_dependencies
.insert_copy(solvable_id, dependencies_id);
Expand Down
Loading

0 comments on commit fe79c92

Please sign in to comment.