From 42dd4c6e2657b8ce733b8090a5d76b31fefe3c51 Mon Sep 17 00:00:00 2001 From: Gaetan Lepage Date: Sun, 22 Dec 2024 22:06:03 +0100 Subject: [PATCH] local evaluation: use nixpkgs ci.eval parallel implementation --- nixpkgs_review/eval_ci.py | 75 ++++++++++++++++++ nixpkgs_review/review.py | 162 ++++++++++++++++---------------------- nixpkgs_review/utils.py | 12 ++- 3 files changed, 154 insertions(+), 95 deletions(-) create mode 100644 nixpkgs_review/eval_ci.py diff --git a/nixpkgs_review/eval_ci.py b/nixpkgs_review/eval_ci.py new file mode 100644 index 00000000..db0a3ea9 --- /dev/null +++ b/nixpkgs_review/eval_ci.py @@ -0,0 +1,75 @@ +from pathlib import Path + +from .utils import System, sh + + +def _ci_command( + worktree_dir: Path, + command: str, + output_dir: str, + options: dict[str, str] | None = None, + args: dict[str, str] | None = None, +) -> None: + cmd: list[str] = [ + "nix-build", + str(worktree_dir.joinpath(Path("ci"))), + "-A", + f"eval.{command}", + ] + if options is not None: + for option, value in options.items(): + cmd.extend([option, value]) + + if args is not None: + for arg, value in args.items(): + cmd.extend(["--arg", arg, value]) + + cmd.extend(["--out-link", output_dir]) + sh(cmd, capture_output=True) + + +def local_eval( + worktree_dir: Path, + systems: set[System], + max_jobs: int, + n_cores: int, + chunk_size: int, + output_dir: str, +) -> None: + options: dict[str, str] = { + "--max-jobs": str(max_jobs), + "--cores": str(n_cores), + } + + eval_systems: str = " ".join(f'"{system}"' for system in systems) + eval_systems = f"[{eval_systems}]" + args: dict[str, str] = { + "evalSystems": eval_systems, + "chunkSize": str(chunk_size), + } + + _ci_command( + worktree_dir=worktree_dir, + command="full", + options=options, + args=args, + output_dir=output_dir, + ) + + +def compare( + worktree_dir: Path, + before_dir: str, + after_dir: str, + output_dir: str, +) -> None: + args: dict[str, str] = { + "beforeResultDir": before_dir, + "afterResultDir": after_dir, + } + _ci_command( + worktree_dir=worktree_dir, + command="compare", + args=args, + output_dir=output_dir, + ) diff --git a/nixpkgs_review/review.py b/nixpkgs_review/review.py index bc340bee..81be0268 100644 --- a/nixpkgs_review/review.py +++ b/nixpkgs_review/review.py @@ -1,5 +1,6 @@ import argparse -import concurrent.futures +import json +import multiprocessing import os import subprocess import sys @@ -8,9 +9,10 @@ from enum import Enum from pathlib import Path from re import Pattern -from typing import IO +from typing import IO, Any from xml.etree import ElementTree as ET +from . import eval_ci from .allow import AllowedFeatures from .builddir import Builddir from .errors import NixpkgsReviewError @@ -207,7 +209,10 @@ def apply_unstaged(self, staged: bool = False) -> None: sys.exit(1) def build_commit( - self, base_commit: str, reviewed_commit: str | None, staged: bool = False + self, + base_commit: str, + reviewed_commit: str | None, + staged: bool = False, ) -> dict[System, list[Attr]]: """ Review a local git commit @@ -216,29 +221,59 @@ def build_commit( print("Local evaluation for computing rebuilds") - # TODO: nix-eval-jobs ? - base_packages: dict[System, list[Package]] = list_packages( - self.builddir.nix_path, - self.systems, - self.allow, - n_threads=self.num_parallel_evals, - ) + # Source: https://github.com/NixOS/nixpkgs/blob/master/ci/eval/README.md + # TODO: make those overridable + max_jobs: int = len(self.systems) + n_cores: int = multiprocessing.cpu_count() // max_jobs + chunk_size: int = 10_000 + + with tempfile.TemporaryDirectory() as temp_dir: + before_dir: str = str(temp_dir / Path("before_eval_results")) + after_dir: str = str(temp_dir / Path("after_eval_results")) + # TODO: handle `self.allow` settings + eval_ci.local_eval( + worktree_dir=self.builddir.worktree_dir, + systems=self.systems, + max_jobs=max_jobs, + n_cores=n_cores, + chunk_size=chunk_size, + output_dir=before_dir, + ) - if reviewed_commit is None: - self.apply_unstaged(staged) - elif self.checkout == CheckoutOption.MERGE: - self.git_checkout(reviewed_commit) - else: - self.git_merge(reviewed_commit) + if reviewed_commit is None: + self.apply_unstaged(staged) + elif self.checkout == CheckoutOption.MERGE: + self.git_checkout(reviewed_commit) + else: + self.git_merge(reviewed_commit) + + eval_ci.local_eval( + worktree_dir=self.builddir.worktree_dir, + systems=self.systems, + max_jobs=max_jobs, + n_cores=n_cores, + chunk_size=chunk_size, + output_dir=after_dir, + ) - # TODO: nix-eval-jobs ? - merged_packages: dict[System, list[Package]] = list_packages( - self.builddir.nix_path, - self.systems, - self.allow, - n_threads=self.num_parallel_evals, - check_meta=True, - ) + # merged_packages: dict[System, list[Package]] = list_packages( + # self.builddir.nix_path, + # self.systems, + # self.allow, + # n_threads=self.num_parallel_evals, + # check_meta=True, + # ) + + output_dir: Path = temp_dir / Path("comparison") + eval_ci.compare( + worktree_dir=self.builddir.worktree_dir, + before_dir=before_dir, + after_dir=after_dir, + output_dir=str(output_dir), + ) + + with (output_dir / Path("changed-paths.json")).open() as compare_result: + outpaths_dict: dict[str, Any] = json.load(compare_result) # Systems ordered correctly (x86_64-linux, aarch64-linux, x86_64-darwin, aarch64-darwin) sorted_systems: list[System] = sorted( @@ -246,15 +281,20 @@ def build_commit( key=system_order_key, reverse=True, ) + changed_attrs: dict[System, set[str]] = {} for system in sorted_systems: - changed_pkgs, removed_pkgs = differences( - base_packages[system], merged_packages[system] + print(f"--------- Rebuilds on '{system}' ---------") + + rebuilds: set[str] = set( + outpaths_dict["rebuildsByPlatform"].get(system, []) + ) + print_packages( + names=list(rebuilds), + msg="to rebuild", ) - print(f"--------- Impacted packages on '{system}' ---------") - print_updates(changed_pkgs, removed_pkgs) - changed_attrs[system] = {p.attr_path for p in changed_pkgs} + changed_attrs[system] = rebuilds return self.build(changed_attrs, self.build_args) @@ -451,70 +491,6 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]: return packages -def _list_packages_system( - system: System, - nix_path: str, - allow: AllowedFeatures, - check_meta: bool = False, -) -> list[Package]: - cmd = [ - "nix-env", - "--extra-experimental-features", - "" if allow.url_literals else "no-url-literals", - "--option", - "system", - system, - "-f", - "", - "--nix-path", - nix_path, - "-qaP", - "--xml", - "--out-path", - "--show-trace", - "--allow-import-from-derivation" - if allow.ifd - else "--no-allow-import-from-derivation", - ] - if check_meta: - cmd.append("--meta") - info("$ " + " ".join(cmd)) - with tempfile.NamedTemporaryFile(mode="w") as tmp: - res = subprocess.run(cmd, stdout=tmp, check=False) - if res.returncode != 0: - msg = f"Failed to list packages: nix-env failed with exit code {res.returncode}" - raise NixpkgsReviewError(msg) - tmp.flush() - with Path(tmp.name).open() as f: - return parse_packages_xml(f) - - -def list_packages( - nix_path: str, - systems: set[System], - allow: AllowedFeatures, - n_threads: int, - check_meta: bool = False, -) -> dict[System, list[Package]]: - results: dict[System, list[Package]] = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: - future_to_system = { - executor.submit( - _list_packages_system, - system=system, - nix_path=nix_path, - allow=allow, - check_meta=check_meta, - ): system - for system in systems - } - for future in concurrent.futures.as_completed(future_to_system): - system = future_to_system[future] - results[system] = future.result() - - return results - - def package_attrs( package_set: set[str], system: str, diff --git a/nixpkgs_review/utils.py b/nixpkgs_review/utils.py index 17d69593..913972f9 100644 --- a/nixpkgs_review/utils.py +++ b/nixpkgs_review/utils.py @@ -30,10 +30,18 @@ def wrapper(text: str) -> None: def sh( - command: list[str], cwd: Path | str | None = None + command: list[str], + cwd: Path | str | None = None, + capture_output: bool = False, ) -> "subprocess.CompletedProcess[str]": info("$ " + shlex.join(command)) - return subprocess.run(command, cwd=cwd, text=True, check=False) + return subprocess.run( + command, + cwd=cwd, + text=True, + check=False, + capture_output=capture_output, + ) def verify_commit_hash(commit: str) -> str: