Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel execution of tests enabled #28

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ name = "pypi"
[packages]
termcolor = "==1.1.0"
requests = "*"
pathos = "*"

[dev-packages]
black = "*"
Expand Down
156 changes: 82 additions & 74 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions autotest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@

import json, re, sys, traceback
from collections import OrderedDict
import multiprocessing as mp


# add autotest directory to module path
# set start method to spawn (for portability to all 3 major OS)
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
mp.set_start_method("spawn")

from util import AutotestException
from command_line_arguments import process_arguments
Expand Down
21 changes: 21 additions & 0 deletions parameter_descriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,27 @@ def finalize_dcc_output_checking(name, value, parameters):
),
]

# parallelisation parameter options
PARAMETER_LIST += [
# len(os.sched_getaffinity(0)) => for CPU count usable by process running autotest
Parameter(
"test_threads_max",
default=os.cpu_count(),
description="""
Maximum number of threads (processes) to run autotests in parallel.
Defaults to current computer core count.
""",
),
Parameter(
"test_threads_n",
default=1,
description="""
Number of threads (processes) to run autotests in parallel.
Defaults to 1 (serial execution).
""",
),
]


PARAMETERS = collections.OrderedDict(
(bv.name, bv) for bv in PARAMETER_LIST if isinstance(bv, Parameter)
Expand Down
54 changes: 46 additions & 8 deletions run_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# run all the tests
# This code needs extensive revision.

import copy, glob, io, os, re, subprocess, sys
import copy, glob, io, os, re, subprocess, sys, tempfile
from contextlib import contextmanager
from termcolor import colored as termcolor_colored
from parse_test_specification import output_file_without_parameters
from copy_files_to_temp_directory import copy_files_to_temp_directory
from copy_files_to_temp_directory import copy_directory
from util import die
import pathos.multiprocessing as mp # type: ignore
from functools import partial

# necessary for typehinting
from typing import Dict, List, Any, Union
Expand All @@ -14,6 +17,10 @@
from run_test import _Test
from argparse import Namespace

# pathos multiprocessing is used instead of stdlib multiprocessing due to
# requirement of passing along file descriptors to child workers with minimal refactor
# See more here: https://stackoverflow.com/a/36366426


def run_tests(
tests: Dict[str, _Test],
Expand Down Expand Up @@ -48,12 +55,16 @@ def run_tests(
print(error_msg, flush=True, file=file)
return 1

results = []
for (label, test) in tests.items():
if label not in args.labels:
continue
result = run_one_test(test, args, file=file)
results.append(result)
# multiprocess tests via multiprocessing pool map
thread_count = min(
global_parameters["test_threads_n"], global_parameters["test_threads_max"]
)
pool = mp.ProcessPool(nodes=thread_count)
test_list = [test for (label, test) in tests.items() if label in args.labels]

results = pool.map(partial(run_one_test_wrapper, args=args, file=file), test_list)
pool.close()
pool.join()

if debug > 3:
subprocess.call("echo after tests run;ls -l;pwd", shell=True)
Expand All @@ -78,6 +89,33 @@ def run_tests(
return 1 if n_tests_failed + n_tests_not_run else 0


# nice context manager in regards to current directory (not necessary but made everything much easier)
@contextmanager
def cwd(path):
oldpwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(oldpwd)


# this is cursed but I would prefer to get tempdir cleanup all in one easy place
def run_one_test_wrapper(
test: _Test, args: Namespace, file=sys.stdout, previous_errors: Dict[str, Any] = {}
) -> int:
# transfer test to another temp directory (for parallelisation)
with tempfile.TemporaryDirectory() as temp_dir:
# transfer over current directory contents to temp_dir
copy_directory(os.getcwd(), temp_dir)
# run test within temp directory context
with cwd(temp_dir):
# run the test
result = run_one_test(test, args, file=file)

return result


# TODO: provide stricter type for previous_errors
def run_one_test(
test: _Test, args: Namespace, file=sys.stdout, previous_errors: Dict[str, Any] = {}
Expand Down