From 73bb33710741f1dc47da8df0ef28c518fd4fb69f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Pi=C3=A9dallu?= Date: Thu, 12 Sep 2024 18:03:30 +0200 Subject: [PATCH] Refactor autopatch: use a AppToPatch class that regroups actions, put loops outside the action functions --- tools/autopatches/autopatch.py | 300 ++++++++++++++++----------------- 1 file changed, 147 insertions(+), 153 deletions(-) diff --git a/tools/autopatches/autopatch.py b/tools/autopatches/autopatch.py index cfc3742b56..ca911b525b 100755 --- a/tools/autopatches/autopatch.py +++ b/tools/autopatches/autopatch.py @@ -5,29 +5,37 @@ import os import subprocess import sys +import logging import time +from typing import Optional, TypeVar, Iterable, Generator from pathlib import Path import requests +import tqdm import toml +from git import Repo, Head, Actor + # add apps/tools to sys.path sys.path.insert(0, str(Path(__file__).parent.parent)) +from app_caches import AppDir from appslib.utils import ( # noqa: E402 pylint: disable=import-error,wrong-import-position get_catalog, ) +import appslib.get_apps_repo as get_apps_repo TOOLS_DIR = Path(__file__).resolve().parent.parent my_env = os.environ.copy() my_env["GIT_TERMINAL_PROMPT"] = "0" -os.makedirs(".apps_cache", exist_ok=True) -login = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip() -token = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip() +LOGIN = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip() +TOKEN = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip() github_api = "https://api.github.com" +PATCHES_PATH = Path(__file__).resolve().parent / "patches" + def apps(min_level=4): for app, infos in get_catalog().items(): @@ -36,162 +44,128 @@ def apps(min_level=4): yield infos -def app_cache_folder(app): - return os.path.join(".apps_cache", app) - +class AppToPatch: + def __init__(self, id: str, path: Path, info: dict) -> None: + self.id = id + self.path = path + self.info = info + self.patch: Optional[str] = None + self._repo: Optional[Repo] = None + + @property + def repo(self) -> Repo: + if self._repo is None: + self._repo = Repo(self.path) + return self._repo + + def cache(self) -> None: + appdir = AppDir(self.id, self.path) + appdir.ensure(self.info["url"], self.info.get("branch", "master"), False, False) + + def reset(self) -> None: + if self.get_diff(): + logging.warning("%s had local changes, they were stashed.", self.id) + self.repo.git.stash("save") + self.repo.git.checkout("testing") + + def apply(self, patch: str) -> None: + current_branch = self.repo.active_branch + self.repo.head.reset(f"{current_branch}", index=True, working_tree=True) + subprocess.call([PATCHES_PATH / patch / "patch"], cwd=self.path) + + def get_diff(self) -> str: + return " ".join(str(d) for d in self.repo.index.diff(None, create_patch=True)) + + def diff(self) -> None: + diff = self.get_diff() + if not diff: + return + print(80 * "=") + print(f"Changes in : {self.id}") + print(80 * "=") + print() + print(diff) + print("\n\n\n") + + def on_github(self) -> bool: + return "github.com/yunohost-apps" in self.info["url"].lower() + + def fork_if_needed(self, session: requests.Session) -> None: + repo_name = self.info["url"].split("/")[-1] + r = session.get(github_api + f"/repos/{LOGIN}/{repo_name}") + if r.status_code == 200: + return + + fork_repo_name = self.info["url"].split("github.com/")[-1] + r = session.post(github_api + f"/repos/{fork_repo_name}/forks") + r.raise_for_status() + time.sleep(2) # to avoid rate limiting lol + + def commit(self, patch: str) -> None: + pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip() + title = f"[autopatch] {pr_title}" + self.repo.git.add(all=True) + self.repo.index.commit(title, author=Actor("Yunohost-Bot", None)) + + def push(self, patch: str, session: requests.Session) -> None: + if not self.get_diff(): + return + + if not self.on_github(): + return + + self.fork_if_needed(session) + + base_branch = self.repo.active_branch + if patch in self.repo.heads: + self.repo.delete_head(patch) + head_branch = self.repo.create_head(patch) + head_branch.checkout() + + self.commit(patch) + + if "fork" in self.repo.remotes: + self.repo.delete_remote(self.repo.remote("fork")) + reponame = self.info["url"].rsplit("/", 1)[-1] + self.repo.create_remote( + "fork", url=f"https://{LOGIN}:{TOKEN}@github.com/{LOGIN}/{reponame}" + ) -def git(cmd, in_folder=None): - if not isinstance(cmd, list): - cmd = cmd.split() - if in_folder: - cmd = ["-C", in_folder] + cmd - cmd = ["git"] + cmd - return subprocess.check_output(cmd, env=my_env).strip().decode("utf-8") + self.repo.remote(name="fork").push(progress=None, force=True) + self.create_pull_request(patch, head_branch, base_branch, session) -# Progress bar helper, stolen from https://stackoverflow.com/a/34482761 -def progressbar(it, prefix="", size=60, file=sys.stdout): - it = list(it) - count = len(it) + def create_pull_request( + self, patch: str, head: Head, base: Head, session: requests.Session + ) -> None: + pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip() + pr_body = (PATCHES_PATH / patch / "pr_body.md").open().read().strip() + PR = { + "title": f"[autopatch] {pr_title}", + "body": f"This is an automatic PR\n\n{pr_body}", + "head": f"{LOGIN}:{head.name}", + "base": base.name, + "maintainer_can_modify": True, + } - def show(j, name=""): - name += " " - x = int(size * j / count) - file.write( - "%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name) - ) - file.flush() - - show(0) - for i, item in enumerate(it): - yield item - show(i + 1, item["id"]) - file.write("\n") - file.flush() - - -def build_cache(): - for app in progressbar(apps(), "Git cloning: ", 40): - folder = os.path.join(".apps_cache", app["id"]) - reponame = app["url"].rsplit("/", 1)[-1] - git(f"clone --quiet --depth 1 --single-branch {app['url']} {folder}") - git( - f"remote add fork https://{login}:{token}@github.com/{login}/{reponame}", - in_folder=folder, - ) + fork_repo_name = self.info["url"].split("github.com/")[-1] + repo_name = self.info["url"].split("/")[-1] + r = session.post(github_api + f"/repos/{fork_repo_name}/pulls", json.dumps(PR)) + r.raise_for_status() + print(json.loads(r.text)["html_url"]) + time.sleep(4) # to avoid rate limiting lol -def apply(patch): - patch_path = os.path.abspath(os.path.join("patches", patch, "patch.sh")) - - for app in progressbar(apps(), "Apply to: ", 40): - folder = os.path.join(".apps_cache", app["id"]) - current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder) - git(f"reset --hard origin/{current_branch}", in_folder=folder) - os.system(f"cd {folder} && bash {patch_path}") - - -def diff(): - for app in apps(): - folder = os.path.join(".apps_cache", app["id"]) - if bool( - subprocess.check_output(f"cd {folder} && git diff", shell=True) - .strip() - .decode("utf-8") - ): - print("\n\n\n") - print("=================================") - print("Changes in : " + app["id"]) - print("=================================") - print("\n") - os.system(f"cd {folder} && git --no-pager diff") - - -def push(patch): - title = ( - "[autopatch] " - + open(os.path.join("patches", patch, "pr_title.md")).read().strip() - ) +IterType = TypeVar("IterType") - def diff_not_empty(app): - folder = os.path.join(".apps_cache", app["id"]) - return bool( - subprocess.check_output(f"cd {folder} && git diff", shell=True) - .strip() - .decode("utf-8") - ) - def app_is_on_github(app): - return "github.com" in app["url"] +def progressbar(elements: list[IterType]) -> Generator[IterType, None, None]: + return tqdm.tqdm(elements, total=len(elements), ascii=" ยท#") - apps_to_push = [ - app for app in apps() if diff_not_empty(app) and app_is_on_github(app) - ] - with requests.Session() as s: - s.headers.update({"Authorization": f"token {token}"}) - for app in progressbar(apps_to_push, "Forking: ", 40): - app["repo"] = app["url"][len("https://github.com/") :].strip("/") - fork_if_needed(app["repo"], s) - time.sleep(2) # to avoid rate limiting lol - - for app in progressbar(apps_to_push, "Pushing: ", 40): - app["repo"] = app["url"][len("https://github.com/") :].strip("/") - app_repo_name = app["url"].rsplit("/", 1)[-1] - folder = os.path.join(".apps_cache", app["id"]) - current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder) - git(f"reset origin/{current_branch}", in_folder=folder) - git( - ["commit", "-a", "-m", title, "--author='Yunohost-Bot <>'"], - in_folder=folder, - ) - try: - git(f"remote remove fork", in_folder=folder) - except Exception: - pass - git( - f"remote add fork https://{login}:{token}@github.com/{login}/{app_repo_name}", - in_folder=folder, - ) - git(f"push fork {current_branch}:{patch} --quiet --force", in_folder=folder) - create_pull_request(app["repo"], patch, current_branch, s) - time.sleep(4) # to avoid rate limiting lol - - -def fork_if_needed(repo, s): - repo_name = repo.split("/")[-1] - r = s.get(github_api + f"/repos/{login}/{repo_name}") - - if r.status_code == 200: - return - - r = s.post(github_api + f"/repos/{repo}/forks") - - if r.status_code != 200: - print(r.text) - - -def create_pull_request(repo, patch, base_branch, s): - PR = { - "title": "[autopatch] " - + open(os.path.join("patches", patch, "pr_title.md")).read().strip(), - "body": "This is an automatic PR\n\n" - + open(os.path.join("patches", patch, "pr_body.md")).read().strip(), - "head": login + ":" + patch, - "base": base_branch, - "maintainer_can_modify": True, - } - - r = s.post(github_api + f"/repos/{repo}/pulls", json.dumps(PR)) - - if r.status_code != 200: - print(r.text) - else: - json.loads(r.text)["html_url"] - - -def main(): +def main() -> None: parser = argparse.ArgumentParser() + get_apps_repo.add_args(parser) parser.add_argument( "the_patch", type=str, nargs="?", help="The name of the patch to apply" ) @@ -212,24 +186,44 @@ def main(): ) args = parser.parse_args() + get_apps_repo.from_args(args) + cache_path = get_apps_repo.cache_path(args) + cache_path.mkdir(exist_ok=True, parents=True) + if not (args.cache or args.apply or args.diff or args.push): parser.error("We required --cache, --apply, --diff or --push.") + apps_to_patch: list[AppToPatch] = [ + AppToPatch(info["id"], cache_path / info["id"], info) for info in apps() + ] + if args.cache: - build_cache() + print("Caching apps...") + for app in progressbar(apps_to_patch): + app.cache() if args.apply: if not args.the_patch: parser.error("--apply requires the patch name to be passed") - apply(args.the_patch) + print(f"Applying patch '{args.the_patch}' to apps...") + for app in progressbar(apps_to_patch): + app.reset() + app.apply(args.the_patch) if args.diff: - diff() + print("Printing diff of apps...") + for app in progressbar(apps_to_patch): + app.diff() if args.push: if not args.the_patch: parser.error("--push requires the patch name to be passed") - push(args.the_patch) + print("Pushing apps...") + with requests.Session() as session: + session.headers.update({"Authorization": f"token {TOKEN}"}) + for app in progressbar(apps_to_patch): + app.push(args.the_patch, session) -main() +if __name__ == "__main__": + main()