Skip to content

Commit

Permalink
Use channels to return the derivation descriptions
Browse files Browse the repository at this point in the history
This results in a simple iterator, that can then be collected() as needed.
  • Loading branch information
Erin van der Veen committed Feb 14, 2024
1 parent 2940392 commit 24e4aa6
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 113 deletions.
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,14 @@ pub enum Error {

#[error("IO error when calling Nix: {0}")]
NixIO(#[from] std::io::Error),

#[error("Erorr when sending data to a mpsc channel: {0}")]
Mpsc(Box<std::sync::mpsc::SendError<crate::nix::DerivationDescription>>),
}

// Cannot automatically derive using #[from] because of the Box
impl From<std::sync::mpsc::SendError<crate::nix::DerivationDescription>> for Error {
fn from(e: std::sync::mpsc::SendError<crate::nix::DerivationDescription>) -> Self {
Error::Mpsc(Box::new(e))
}
}
154 changes: 79 additions & 75 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use ::std::sync::{Arc, Mutex};
use rayon::prelude::*;
use std::sync::mpsc;

use error::Result;
use nix::{DerivationDescription, FoundDrv};
Expand All @@ -9,109 +10,112 @@ pub mod nix;

fn process(
collected_paths: &Arc<Mutex<std::collections::HashSet<String>>>,
flake_ref: &str,
system: Option<&str>,
attribute_path: &str,
offline: &bool,
flake_ref: &String,
system: &Option<String>,
attribute_path: String,
offline: bool,
lib: &nix::lib::Lib,
) -> Vec<DerivationDescription> {
// Sender channel to communicate DerivationDescription to the main thread
tx: mpsc::Sender<DerivationDescription>,
) -> Result<()> {
// check if the build_input has already be processed
let done = {
let mut collected_paths = collected_paths.lock().unwrap();
!collected_paths.insert(attribute_path.to_string())
};

if done {
log::debug!("Skipping already processed derivation: {}", attribute_path);
return Ok(());
}

log::debug!("Processing derivation: {:?}", attribute_path);

// call describe_derivation to get the derivation description
let description =
nix::describe_derivation(flake_ref, system, attribute_path, offline, lib).unwrap();
nix::describe_derivation(flake_ref, system, &attribute_path, &offline, lib).unwrap();

// Send the DerivationDescription to the main thread
// TODO: Error handling
tx.send(description.clone())?;

// use par_iter to call process on all children of this derivation
let children: Vec<DerivationDescription> = description
description
.build_inputs
.par_iter()
.map(|build_input| {
// check if the build_input has already be processed
let done = {
let mut collected_paths = collected_paths.lock().unwrap();
match &build_input.output_path {
Some(output_path) => !collected_paths.insert(output_path.clone()),
None => false,
}
};

if done {
log::debug!(
"Skipping already processed derivation: {}",
build_input.attribute_path
);
Vec::new()
} else {
process(
collected_paths,
flake_ref,
system,
&build_input.attribute_path,
offline,
lib,
)
}
.into_par_iter()
.map(|build_input| -> Result<()> {
process(
collected_paths,
flake_ref,
system,
build_input.attribute_path,
offline,
lib,
tx.clone(),
)
})
.flatten()
.collect();
.collect::<Result<Vec<()>>>()?;

// combine the children and this derivation into a single Vec
let mut descriptions = children;
descriptions.push(description);
descriptions
Ok(())
}

pub fn nixtract(
flake_ref: impl AsRef<str>,
system: Option<impl AsRef<str>>,
attribute_path: Option<impl AsRef<str>>,
offline: &bool,
) -> Result<Vec<DerivationDescription>> {
let flake_ref = flake_ref.as_ref();
// Convert system to a Option<&str>
let system = system.as_ref().map(AsRef::as_ref);
let attribute_path = attribute_path.as_ref().map(AsRef::as_ref);
flake_ref: impl Into<String>,
system: Option<impl Into<String>>,
attribute_path: Option<impl Into<String>>,
offline: bool,
) -> Result<impl Iterator<Item = DerivationDescription>> {
// Convert the arguments to the expected types
let flake_ref = flake_ref.into();
let system = system.map(Into::into);
let attribute_path = attribute_path.map(Into::into);

// Writes the `lib.nix` file to the tempdir and stores its path
let lib = nix::lib::Lib::new()?;

// Create a channel to communicate DerivationDescription to the main thread
let (tx, rx) = mpsc::channel();

log::info!(
"Starting nixtract with flake_ref: {}, system: {}, attribute_path: {:?}",
flake_ref,
system
.map(AsRef::as_ref)
.unwrap_or("builtins.currentSystem"),
attribute_path.map(AsRef::as_ref).unwrap_or("")
.clone()
.unwrap_or("builtins.currentSystem".to_owned()),
attribute_path.clone().unwrap_or_default()
);

let collected_paths: Arc<Mutex<std::collections::HashSet<String>>> =
Arc::new(Mutex::new(std::collections::HashSet::new()));
// Spawn a new rayon thread to call process on every foundDrv
rayon::spawn(move || {
let collected_paths: Arc<Mutex<std::collections::HashSet<String>>> =
Arc::new(Mutex::new(std::collections::HashSet::new()));

// call find_attribute_paths to get the initial set of derivations
let attribute_paths =
nix::find_attribute_paths(flake_ref, system, attribute_path, offline, &lib)?;
// call find_attribute_paths to get the initial set of derivations
let attribute_paths =
nix::find_attribute_paths(&flake_ref, &system, &attribute_path, &offline, &lib)
.unwrap();

// Combine all AttributePaths into a single Vec
let mut derivations: Vec<FoundDrv> = Vec::new();
for attribute_path in attribute_paths {
derivations.extend(attribute_path.found_drvs);
}
// Combine all AttributePaths into a single Vec
let mut derivations: Vec<FoundDrv> = Vec::new();
for attribute_path in attribute_paths {
derivations.extend(attribute_path.found_drvs);
}

// call process on every foundDrv
let descriptions: Vec<DerivationDescription> = derivations
.par_iter()
.map(|found_drv| {
process(
derivations.into_par_iter().for_each(|found_drv| {
match process(
&collected_paths,
flake_ref,
system,
&found_drv.attribute_path,
&flake_ref,
&system,
found_drv.attribute_path,
offline,
&lib,
)
})
.flatten()
.collect();
tx.clone(),
) {
Ok(_) => {}
Err(e) => log::error!("Error processing derivation: {}", e),
}
});
});

Ok(descriptions)
Ok(rx.into_iter())
}
35 changes: 20 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,30 @@ fn main() -> Result<(), Box<dyn Error>> {
opts.flake_ref,
opts.system,
opts.attribute_path,
&opts.offline,
opts.offline,
)?;

// Print the results
let output = if opts.pretty {
serde_json::to_string_pretty(&results)?
} else {
serde_json::to_string(&results)?
};

match opts.output_path.as_deref() {
None | Some("-") => {
println!("{}", output);
}
Some(output_path) => {
log::info!("Writing results to {:?}", output_path);
std::fs::write(output_path, output)?;
// Create the out writer
let mut out_writer = match opts.output_path.as_deref() {
None | Some("-") => Box::new(std::io::stdout()) as Box<dyn std::io::Write>,
Some(path) => {
let file = std::fs::File::create(path)?;
Box::new(file) as Box<dyn std::io::Write>
}
};

// Print the results
for result in results {
let output = if opts.pretty {
serde_json::to_string_pretty(&result)?
} else {
serde_json::to_string(&result)?
};

// Append to the out_writer
out_writer.write_all(output.as_bytes())?;
out_writer.write_all(b"\n")?;
}

Ok(())
}
26 changes: 13 additions & 13 deletions src/nix/describe_derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use super::lib::Lib;
use crate::error::{Error, Result};

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DerivationDescription {
pub attribute_path: String,
Expand All @@ -19,21 +19,21 @@ pub struct DerivationDescription {
pub build_inputs: Vec<BuiltInput>,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Output {
pub name: String,
pub output_path: Option<String>,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ParsedName {
pub name: String,
pub version: String,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct NixpkgsMetadata {
pub description: String,
Expand All @@ -44,23 +44,23 @@ pub struct NixpkgsMetadata {
pub licenses: Option<Vec<License>>,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Source {
pub git_repo_url: String,
// Revision or tag of the git repo
pub rev: String,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct License {
// Not all licenses in nixpkgs have an associated spdx id
pub spdx_id: Option<String>,
pub full_name: String,
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BuiltInput {
pub attribute_path: String,
Expand All @@ -69,9 +69,9 @@ pub struct BuiltInput {
}

pub fn describe_derivation(
flake_ref: impl AsRef<str>,
system: Option<impl AsRef<str>>,
attribute_path: impl AsRef<str>,
flake_ref: &String,
system: &Option<String>,
attribute_path: &String,
offline: &bool,
lib: &Lib,
) -> Result<DerivationDescription> {
Expand All @@ -80,17 +80,17 @@ pub fn describe_derivation(
// Create a scope so env_vars isn't needlessly mutable
let env_vars: HashMap<String, String> = {
let mut res = HashMap::from([
("TARGET_FLAKE_REF".to_owned(), flake_ref.as_ref().to_owned()),
("TARGET_FLAKE_REF".to_owned(), flake_ref.to_owned()),
(
"TARGET_ATTRIBUTE_PATH".to_owned(),
attribute_path.as_ref().to_owned(),
attribute_path.to_owned(),
),
("NIXPKGS_ALLOW_UNFREE".to_owned(), "1".to_owned()),
("NIXPKGS_ALLOW_INSECURE".to_owned(), "1".to_owned()),
("NIXPKGS_ALLOW_BROKEN".to_owned(), "1".to_owned()),
]);
if let Some(system) = system {
res.insert("TARGET_SYSTEM".to_owned(), system.as_ref().to_owned());
res.insert("TARGET_SYSTEM".to_owned(), system.to_owned());
}
res
};
Expand Down
15 changes: 6 additions & 9 deletions src/nix/find_attribute_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ pub struct FoundDrv {
}

pub fn find_attribute_paths(
flake_ref: impl AsRef<str>,
system: Option<impl AsRef<str>>,
attribute_path: Option<impl AsRef<str>>,
flake_ref: &String,
system: &Option<String>,
attribute_path: &Option<String>,
offline: &bool,
lib: &Lib,
) -> Result<Vec<AttributePaths>> {
Expand All @@ -31,19 +31,16 @@ pub fn find_attribute_paths(
// Create a scope so env_vars isn't needlessly mutable
let env_vars: HashMap<String, String> = {
let mut res = HashMap::from([
("TARGET_FLAKE_REF".to_owned(), flake_ref.as_ref().to_owned()),
("TARGET_FLAKE_REF".to_owned(), flake_ref.to_owned()),
("NIXPKGS_ALLOW_UNFREE".to_owned(), "1".to_owned()),
("NIXPKGS_ALLOW_INSECURE".to_owned(), "1".to_owned()),
("NIXPKGS_ALLOW_BROKEN".to_owned(), "1".to_owned()),
]);
if let Some(attribute_path) = attribute_path {
res.insert(
"TARGET_ATTRIBUTE_PATH".to_owned(),
attribute_path.as_ref().to_owned(),
);
res.insert("TARGET_ATTRIBUTE_PATH".to_owned(), attribute_path.clone());
}
if let Some(system) = system {
res.insert("TARGET_SYSTEM".to_owned(), system.as_ref().to_owned());
res.insert("TARGET_SYSTEM".to_owned(), system.to_owned());
}
res
};
Expand Down
1 change: 0 additions & 1 deletion src/nix/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Nix only accepts a file as included files, so we need to create a temporary file to pass to it
//! TODO: Allow sharing of a single lib instance so we don't copy the file so many times
use crate::error::Result;
use std::io::Write;
Expand Down

0 comments on commit 24e4aa6

Please sign in to comment.