diff --git a/podman_compose.py b/podman_compose.py index 20011455..189571b8 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -1,29 +1,24 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - # https://docs.docker.com/compose/compose-file/#service-configuration-reference # https://docs.docker.com/samples/ # https://docs.docker.com/compose/gettingstarted/ # https://docs.docker.com/compose/django/ # https://docs.docker.com/compose/wordpress/ - # TODO: podman pod logs --color -n -f pod_testlogs - - import sys import os import getpass import argparse import itertools import subprocess -import time import re import hashlib import random import json import glob - -from threading import Thread +import asyncio.subprocess +import signal import shlex @@ -371,7 +366,7 @@ def transform(args, project_name, given_containers): return pods, containers -def assert_volume(compose, mount_dict): +async def assert_volume(compose, mount_dict): """ inspect volume to get directory create volume if needed @@ -398,7 +393,7 @@ def assert_volume(compose, mount_dict): # TODO: might move to using "volume list" # podman volume list --format '{{.Name}}\t{{.MountPoint}}' -f 'label=io.podman.compose.project=HERE' try: - _ = compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") + _ = await compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") except subprocess.CalledProcessError as e: if is_ext: raise RuntimeError(f"External volume [{vol_name}] does not exists") from e @@ -419,8 +414,8 @@ def assert_volume(compose, mount_dict): for opt, value in driver_opts.items(): args.extend(["--opt", f"{opt}={value}"]) args.append(vol_name) - compose.podman.output([], "volume", args) - _ = compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") + await compose.podman.output([], "volume", args) + _ = await compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") def mount_desc_to_mount_args( @@ -710,7 +705,7 @@ def norm_ports(ports_in): return ports_out -def assert_cnt_nets(compose, cnt): +async def assert_cnt_nets(compose, cnt): """ create missing networks """ @@ -733,7 +728,7 @@ def assert_cnt_nets(compose, cnt): ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name ) try: - compose.podman.output([], "network", ["exists", net_name]) + await compose.podman.output([], "network", ["exists", net_name]) except subprocess.CalledProcessError as e: if is_ext: raise RuntimeError( @@ -776,8 +771,8 @@ def assert_cnt_nets(compose, cnt): if gateway: args.extend(("--gateway", gateway)) args.append(net_name) - compose.podman.output([], "network", args) - compose.podman.output([], "network", ["exists", net_name]) + await compose.podman.output([], "network", args) + await compose.podman.output([], "network", ["exists", net_name]) def get_net_args(compose, cnt): @@ -898,7 +893,7 @@ def get_net_args(compose, cnt): return net_args -def container_to_args(compose, cnt, detached=True): +async def container_to_args(compose, cnt, detached=True): # TODO: double check -e , --add-host, -v, --read-only dirname = compose.dirname pod = cnt.get("pod", None) or "" @@ -957,7 +952,7 @@ def container_to_args(compose, cnt, detached=True): for volume in cnt.get("volumes", []): podman_args.extend(get_mount_args(compose, cnt, volume)) - assert_cnt_nets(compose, cnt) + await assert_cnt_nets(compose, cnt) podman_args.extend(get_net_args(compose, cnt)) logging = cnt.get("logging", None) @@ -1161,12 +1156,22 @@ def __init__(self, compose, podman_path="podman", dry_run=False): self.podman_path = podman_path self.dry_run = dry_run - def output(self, podman_args, cmd="", cmd_args=None): + async def output(self, podman_args, cmd="", cmd_args=None): cmd_args = cmd_args or [] xargs = self.compose.get_podman_args(cmd) if cmd else [] cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args log(cmd_ls) - return subprocess.check_output(cmd_ls) + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout_data, stderr_data = await p.communicate() + if p.returncode == 0: + return stdout_data + else: + raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) def exec( self, @@ -1180,55 +1185,71 @@ def exec( log(" ".join([str(i) for i in cmd_ls])) os.execlp(self.podman_path, *cmd_ls) - def run( + async def run( self, podman_args, cmd="", cmd_args=None, wait=True, sleep=1, - obj=None, log_formatter=None, + *, + # Intentionally mutable default argument to hold references to tasks + task_reference=set() ): - if obj is not None: - obj.exit_code = None cmd_args = list(map(str, cmd_args or [])) xargs = self.compose.get_podman_args(cmd) if cmd else [] cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args log(" ".join([str(i) for i in cmd_ls])) if self.dry_run: return None - # subprocess.Popen( - # args, bufsize = 0, executable = None, stdin = None, stdout = None, stderr = None, preexec_fn = None, - # close_fds = False, shell = False, cwd = None, env = None, universal_newlines = False, startupinfo = None, - # creationflags = 0 - # ) if log_formatter is not None: + + async def format_out(stdout): + while True: + l = await stdout.readline() + if l: + print(log_formatter, l.decode('utf-8'), end='') + if stdout.at_eof(): + break + + # read, write = os.pipe() # Pipe podman process output through log_formatter (which can add colored prefix) - p = subprocess.Popen( - cmd_ls, stdout=subprocess.PIPE + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # pylint: disable=consider-using-with - _ = subprocess.Popen( - log_formatter, stdin=p.stdout - ) # pylint: disable=consider-using-with - p.stdout.close() # Allow p_process to receive a SIGPIPE if logging process exits. + + # This is hacky to make the tasks not get garbage collected + # https://github.com/python/cpython/issues/91887 + out_t = asyncio.create_task(format_out(p.stdout)) + task_reference.add(out_t) + out_t.add_done_callback(task_reference.discard) + + err_t = asyncio.create_task(format_out(p.stderr)) + task_reference.add(err_t) + err_t.add_done_callback(task_reference.discard) else: - p = subprocess.Popen(cmd_ls) # pylint: disable=consider-using-with + p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with if wait: - exit_code = p.wait() + try: + exit_code = await p.wait() + except asyncio.CancelledError: + p.terminate() + exit_code = await p.wait() + log("exit code:", exit_code) - if obj is not None: - obj.exit_code = exit_code + return exit_code if sleep: - time.sleep(sleep) + log(f"Sleep {sleep}") + await asyncio.sleep(sleep) return p - def volume_ls(self, proj=None): + async def volume_ls(self, proj=None): if not proj: proj = self.compose.project_name - output = self.output( + output = (await self.output( [], "volume", [ @@ -1239,7 +1260,7 @@ def volume_ls(self, proj=None): "--format", "{{.Name}}", ], - ).decode("utf-8") + )).decode("utf-8") volumes = output.splitlines() return volumes @@ -1487,7 +1508,7 @@ def get_podman_args(self, cmd): xargs.extend(shlex.split(args)) return xargs - def run(self): + async def run(self): log("podman-compose version: " + __version__) args = self._parse_args() podman_path = args.podman_path @@ -1504,8 +1525,8 @@ def run(self): # just to make sure podman is running try: self.podman_version = ( - self.podman.output(["--version"], "", []).decode("utf-8").strip() - or "" + (await self.podman.output(["--version"], "", [])).decode("utf-8").strip() + or "" ) self.podman_version = (self.podman_version.split() or [""])[-1] except subprocess.CalledProcessError: @@ -1521,7 +1542,7 @@ def run(self): if compose_required: self._parse_compose_file() cmd = self.commands[cmd_name] - retcode = cmd(self, args) + retcode = await cmd(self, args) if isinstance(retcode, int): sys.exit(retcode) @@ -2063,7 +2084,7 @@ def compose_systemd(compose, args): @cmd_run(podman_compose, "pull", "pull stack images") -def compose_pull(compose, args): +async def compose_pull(compose, args): img_containers = [cnt for cnt in compose.containers if "image" in cnt] if args.services: services = set(args.services) @@ -2072,27 +2093,33 @@ def compose_pull(compose, args): if not args.force_local: local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)} images -= local_images - for image in images: - compose.podman.run([], "pull", [image], sleep=0) + + sem = asyncio.Semaphore(args.parallel) + + async def _pull(image: str): + async with sem: + return await compose.podman.run([], "pull", [image], sleep=0) + + await asyncio.gather(*[_pull(image) for image in images]) @cmd_run(podman_compose, "push", "push stack images") -def compose_push(compose, args): +async def compose_push(compose, args): services = set(args.services) for cnt in compose.containers: if "build" not in cnt: continue if services and cnt["_service"] not in services: continue - compose.podman.run([], "push", [cnt["image"]], sleep=0) + await compose.podman.run([], "push", [cnt["image"]], sleep=0) -def build_one(compose, args, cnt): +async def build_one(compose, args, cnt): if "build" not in cnt: return None if getattr(args, "if_not_exists", None): try: - img_id = compose.podman.output( + img_id = await compose.podman.output( [], "inspect", ["-t", "image", "-f", "{{.Id}}", cnt["image"]] ) except subprocess.CalledProcessError: @@ -2148,40 +2175,41 @@ def build_one(compose, args, cnt): ) ) build_args.append(ctx) - status = compose.podman.run([], "build", build_args, sleep=0) + status = await compose.podman.run([], "build", build_args, sleep=0) + if status != 0: + raise Exception return status @cmd_run(podman_compose, "build", "build stack images") -def compose_build(compose, args): - # keeps the status of the last service/container built - status = 0 +async def compose_build(compose, args): + tasks = [] + sem = asyncio.Semaphore(args.parallel) - def parse_return_code(obj, current_status): - if obj and obj.returncode != 0: - return obj.returncode - return current_status + async def safe_build_task(cnt): + async with sem: + return await build_one(compose, args, cnt) if args.services: container_names_by_service = compose.container_names_by_service compose.assert_services(args.services) for service in args.services: cnt = compose.container_by_name[container_names_by_service[service][0]] - p = build_one(compose, args, cnt) - status = parse_return_code(p, status) - if status != 0: - return status + tasks.append(asyncio.create_task(safe_build_task(cnt))) + else: for cnt in compose.containers: - p = build_one(compose, args, cnt) - status = parse_return_code(p, status) - if status != 0: - return status + tasks.append(asyncio.create_task(safe_build_task(cnt))) - return status + for t in asyncio.as_completed(tasks): + status = await t + if status != 0: + sys.exit(status) + return 0 -def create_pods(compose, args): # pylint: disable=unused-argument + +async def create_pods(compose, args): # pylint: disable=unused-argument for pod in compose.pods: podman_args = [ "create", @@ -2196,7 +2224,7 @@ def create_pods(compose, args): # pylint: disable=unused-argument ports = [ports] for i in ports: podman_args.extend(["-p", str(i)]) - compose.podman.run([], "pod", podman_args) + await compose.podman.run([], "pod", podman_args) def get_excluded(compose, args): @@ -2213,7 +2241,7 @@ def get_excluded(compose, args): @cmd_run( podman_compose, "up", "Create and start the entire stack or some of its services" ) -def compose_up(compose, args): +async def compose_up(compose: PodmanCompose, args): proj_name = compose.project_name excluded = get_excluded(compose, args) if not args.no_build: @@ -2222,7 +2250,7 @@ def compose_up(compose, args): compose.commands["build"](compose, build_args) hashes = ( - compose.podman.output( + (await compose.podman.output( [], "ps", [ @@ -2232,7 +2260,7 @@ def compose_up(compose, args): "--format", '{{ index .Labels "io.podman.compose.config-hash"}}', ], - ) + )) .decode("utf-8") .splitlines() ) @@ -2246,15 +2274,15 @@ def compose_up(compose, args): podman_command = "run" if args.detach and not args.no_start else "create" - create_pods(compose, args) + await create_pods(compose, args) for cnt in compose.containers: if cnt["_service"] in excluded: log("** skipping: ", cnt["name"]) continue - podman_args = container_to_args(compose, cnt, detached=args.detach) - subproc = compose.podman.run([], podman_command, podman_args) - if podman_command == "run" and subproc and subproc.returncode: - compose.podman.run([], "start", [cnt["name"]]) + podman_args = await container_to_args(compose, cnt, detached=args.detach) + subproc = await compose.podman.run([], podman_command, podman_args) + if podman_command == "run" and subproc: + await compose.podman.run([], "start", [cnt["name"]]) if args.no_start or args.detach or args.dry_run: return # TODO: handle already existing @@ -2264,54 +2292,41 @@ def compose_up(compose, args): if exit_code_from: args.abort_on_container_exit = True - threads = [] - max_service_length = 0 for cnt in compose.containers: curr_length = len(cnt["_service"]) max_service_length = ( curr_length if curr_length > max_service_length else max_service_length ) - has_sed = os.path.isfile("/bin/sed") + + tasks = set() + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, lambda: [t.cancel("User exit") for t in tasks]) + for i, cnt in enumerate(compose.containers): # Add colored service prefix to output by piping output through sed color_idx = i % len(compose.console_colors) color = compose.console_colors[color_idx] space_suffix = " " * (max_service_length - len(cnt["_service"]) + 1) - log_formatter = "s/^/{}[{}]{}|\x1B[0m\\ /;".format( + log_formatter = "{}[{}]{}|\x1B[0m".format( color, cnt["_service"], space_suffix ) - log_formatter = ["sed", "-e", log_formatter] if has_sed else None if cnt["_service"] in excluded: log("** skipping: ", cnt["name"]) continue - # TODO: remove sleep from podman.run - obj = compose if exit_code_from == cnt["_service"] else None - thread = Thread( - target=compose.podman.run, - args=[[], "start", ["-a", cnt["name"]]], - kwargs={"obj": obj, "log_formatter": log_formatter}, - daemon=True, - name=cnt["name"], - ) - thread.start() - threads.append(thread) - time.sleep(1) - - while threads: - to_remove = [] - for thread in threads: - thread.join(timeout=1.0) - if not thread.is_alive(): - to_remove.append(thread) - if args.abort_on_container_exit: - time.sleep(1) - exit_code = ( - compose.exit_code if compose.exit_code is not None else -1 - ) - sys.exit(exit_code) - for thread in to_remove: - threads.remove(thread) + + tasks.add(asyncio.create_task(compose.podman.run([], "start", ["-a", cnt["name"]], wait=True, sleep=None, log_formatter=log_formatter), name=cnt["_service"])) + + exit_code = 0 + for task in asyncio.as_completed(tasks): + t = await task + if args.abort_on_container_exit: + [_.cancel() for _ in tasks if not _.cancelling() and not _.cancelled()] + if task.get_name() == exit_code_from: + exit_code = t + + sys.exit(exit_code) def get_volume_names(compose, cnt): @@ -2332,7 +2347,7 @@ def get_volume_names(compose, cnt): @cmd_run(podman_compose, "down", "tear down entire stack") -def compose_down(compose, args): +async def compose_down(compose, args): excluded = get_excluded(compose, args) podman_args = [] timeout_global = getattr(args, "timeout", None) @@ -2348,14 +2363,14 @@ def compose_down(compose, args): timeout = str_to_seconds(timeout_str) if timeout is not None: podman_stop_args.extend(["-t", str(timeout)]) - compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]], sleep=0) + await compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]], sleep=0) for cnt in containers: if cnt["_service"] in excluded: continue - compose.podman.run([], "rm", [cnt["name"]], sleep=0) + await compose.podman.run([], "rm", [cnt["name"]], sleep=0) if args.remove_orphans: names = ( - compose.podman.output( + (await compose.podman.output( [], "ps", [ @@ -2365,14 +2380,14 @@ def compose_down(compose, args): "--format", "{{ .Names }}", ], - ) + )) .decode("utf-8") .splitlines() ) for name in names: - compose.podman.run([], "stop", [*podman_args, name], sleep=0) + await compose.podman.run([], "stop", [*podman_args, name], sleep=0) for name in names: - compose.podman.run([], "rm", [name], sleep=0) + await compose.podman.run([], "rm", [name], sleep=0) if args.volumes: vol_names_to_keep = set() for cnt in containers: @@ -2380,22 +2395,22 @@ def compose_down(compose, args): continue vol_names_to_keep.update(get_volume_names(compose, cnt)) log("keep", vol_names_to_keep) - for volume_name in compose.podman.volume_ls(): + async for volume_name in compose.podman.volume_ls(): if volume_name in vol_names_to_keep: continue - compose.podman.run([], "volume", ["rm", volume_name]) + await compose.podman.run([], "volume", ["rm", volume_name]) if excluded: return for pod in compose.pods: - compose.podman.run([], "pod", ["rm", pod["name"]], sleep=0) + await compose.podman.run([], "pod", ["rm", pod["name"]], sleep=0) @cmd_run(podman_compose, "ps", "show status of containers") -def compose_ps(compose, args): +async def compose_ps(compose, args): proj_name = compose.project_name if args.quiet is True: - compose.podman.run( + await compose.podman.run( [], "ps", [ @@ -2407,7 +2422,7 @@ def compose_ps(compose, args): ], ) else: - compose.podman.run( + await compose.podman.run( [], "ps", ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"] ) @@ -2417,7 +2432,7 @@ def compose_ps(compose, args): "run", "create a container similar to a service to run a one-off command", ) -def compose_run(compose, args): +async def compose_run(compose, args): create_pods(compose, args) compose.assert_services(args.service) container_names = compose.container_names_by_service[args.service] @@ -2483,17 +2498,17 @@ def compose_run(compose, args): if args.rm and "restart" in cnt: del cnt["restart"] # run podman - podman_args = container_to_args(compose, cnt, args.detach) + podman_args = await container_to_args(compose, cnt, args.detach) if not args.detach: podman_args.insert(1, "-i") if args.rm: podman_args.insert(1, "--rm") - p = compose.podman.run([], "run", podman_args, sleep=0) - sys.exit(p.returncode) + p = await compose.podman.run([], "run", podman_args, sleep=0) + sys.exit(p) @cmd_run(podman_compose, "exec", "execute a command in a running container") -def compose_exec(compose, args): +async def compose_exec(compose, args): compose.assert_services(args.service) container_names = compose.container_names_by_service[args.service] container_name = container_names[args.index - 1] @@ -2518,11 +2533,11 @@ def compose_exec(compose, args): podman_args += [container_name] if args.cnt_command is not None and len(args.cnt_command) > 0: podman_args += args.cnt_command - p = compose.podman.run([], "exec", podman_args, sleep=0) - sys.exit(p.returncode) + p = await compose.podman.run([], "exec", podman_args, sleep=0) + sys.exit(p) -def transfer_service_status(compose, args, action): +async def transfer_service_status(compose, args, action): # TODO: handle dependencies, handle creations container_names_by_service = compose.container_names_by_service if not args.services: @@ -2537,6 +2552,7 @@ def transfer_service_status(compose, args, action): targets = list(reversed(targets)) podman_args = [] timeout_global = getattr(args, "timeout", None) + tasks = [] for target in targets: if action != "start": timeout = timeout_global @@ -2548,26 +2564,27 @@ def transfer_service_status(compose, args, action): timeout = str_to_seconds(timeout_str) if timeout is not None: podman_args.extend(["-t", str(timeout)]) - compose.podman.run([], action, podman_args + [target], sleep=0) + tasks.append(asyncio.create_task(compose.podman.run([], action, podman_args + [target], sleep=0))) + await asyncio.gather(*tasks) @cmd_run(podman_compose, "start", "start specific services") -def compose_start(compose, args): +async def compose_start(compose, args): transfer_service_status(compose, args, "start") @cmd_run(podman_compose, "stop", "stop specific services") -def compose_stop(compose, args): +async def compose_stop(compose, args): transfer_service_status(compose, args, "stop") @cmd_run(podman_compose, "restart", "restart specific services") -def compose_restart(compose, args): +async def compose_restart(compose, args): transfer_service_status(compose, args, "restart") @cmd_run(podman_compose, "logs", "show logs from services") -def compose_logs(compose, args): +async def compose_logs(compose, args): container_names_by_service = compose.container_names_by_service if not args.services and not args.latest: args.services = container_names_by_service.keys() @@ -2594,7 +2611,7 @@ def compose_logs(compose, args): podman_args.extend(["--until", args.until]) for target in targets: podman_args.append(target) - compose.podman.run([], "logs", podman_args) + await compose.podman.run([], "logs", podman_args) @cmd_run(podman_compose, "config", "displays the compose file") @@ -2635,31 +2652,31 @@ def _published_target(port_string): @cmd_run(podman_compose, "pause", "Pause all running containers") -def compose_pause(compose, args): +async def compose_pause(compose, args): container_names_by_service = compose.container_names_by_service if not args.services: args.services = container_names_by_service.keys() targets = [] for service in args.services: targets.extend(container_names_by_service[service]) - compose.podman.run([], "pause", targets) + await compose.podman.run([], "pause", targets) @cmd_run(podman_compose, "unpause", "Unpause all running containers") -def compose_unpause(compose, args): +async def compose_unpause(compose, args): container_names_by_service = compose.container_names_by_service if not args.services: args.services = container_names_by_service.keys() targets = [] for service in args.services: targets.extend(container_names_by_service[service]) - compose.podman.run([], "unpause", targets) + await compose.podman.run([], "unpause", targets) @cmd_run( podman_compose, "kill", "Kill one or more running containers with a specific signal" ) -def compose_kill(compose, args): +async def compose_kill(compose, args): # to ensure that the user did not execute the command by mistake if not args.services and not args.all: print( @@ -2680,15 +2697,14 @@ def compose_kill(compose, args): targets.extend(container_names_by_service[service]) for target in targets: podman_args.append(target) - compose.podman.run([], "kill", podman_args) - - if args.services: + await compose.podman.run([], "kill", podman_args) + elif args.services: targets = [] for service in args.services: targets.extend(container_names_by_service[service]) for target in targets: podman_args.append(target) - compose.podman.run([], "kill", podman_args) + await compose.podman.run([], "kill", podman_args) @cmd_run( @@ -2696,7 +2712,7 @@ def compose_kill(compose, args): "stats", "Display percentage of CPU, memory, network I/O, block I/O and PIDs for services.", ) -def compose_stats(compose, args): +async def compose_stats(compose, args): container_names_by_service = compose.container_names_by_service if not args.services: args.services = container_names_by_service.keys() @@ -2717,7 +2733,7 @@ def compose_stats(compose, args): podman_args.append(target) try: - compose.podman.run([], "stats", podman_args) + await compose.podman.run([], "stats", podman_args) except KeyboardInterrupt: pass @@ -3075,6 +3091,15 @@ def compose_ps_parse(parser): ) +@cmd_parse(podman_compose, ["build", "pull"]) +def compose_build_pull_parse(parser): + parser.add_argument( + "--parallel", + type=int, + default=os.environ.get("PODMAN_PARALLEL", sys.maxsize) + ) + + @cmd_parse(podman_compose, ["build", "up"]) def compose_build_up_parse(parser): parser.add_argument( @@ -3201,9 +3226,9 @@ def compose_stats_parse(parser): ) -def main(): - podman_compose.run() +async def main(): + await podman_compose.run() if __name__ == "__main__": - main() + asyncio.run(main(), debug=True)