From d11b2ded1cd3043b925c4108614172528167d07e Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Wed, 18 Aug 2021 10:35:36 +0100 Subject: [PATCH] utils/async: Add new utils.async module Home for async-related utilities. --- devlib/module/cpufreq.py | 160 +++++++++++++++++----------- devlib/target.py | 97 ++++++++++++++--- devlib/utils/asyn.py | 219 +++++++++++++++++++++++++++++++++++++++ setup.py | 2 + 4 files changed, 403 insertions(+), 75 deletions(-) create mode 100644 devlib/utils/asyn.py diff --git a/devlib/module/cpufreq.py b/devlib/module/cpufreq.py index f559ef6cc..7af290d3e 100644 --- a/devlib/module/cpufreq.py +++ b/devlib/module/cpufreq.py @@ -13,10 +13,12 @@ # limitations under the License. # from contextlib import contextmanager +from operator import itemgetter from devlib.module import Module from devlib.exception import TargetStableError from devlib.utils.misc import memoized +import devlib.utils.asyn as asyn # a dict of governor name and a list of it tunables that can't be read @@ -52,22 +54,25 @@ def __init__(self, target): self._governor_tunables = {} @memoized - def list_governors(self, cpu): + @asyn.asyncf + async def list_governors(self, cpu): """Returns a list of governors supported by the cpu.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_available_governors'.format(cpu) - output = self.target.read_value(sysfile) + output = await self.target.read_value.asyn(sysfile) return output.strip().split() - def get_governor(self, cpu): + @asyn.asyncf + async def get_governor(self, cpu): """Returns the governor currently set for the specified CPU.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_governor'.format(cpu) - return self.target.read_value(sysfile) + return await self.target.read_value.asyn(sysfile) - def set_governor(self, cpu, governor, **kwargs): + @asyn.asyncf + async def set_governor(self, cpu, governor, **kwargs): """ Set the governor for the specified CPU. See https://www.kernel.org/doc/Documentation/cpu-freq/governors.txt @@ -90,15 +95,15 @@ def set_governor(self, cpu, governor, **kwargs): """ if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - supported = self.list_governors(cpu) + supported = await self.list_governors.asyn(cpu) if governor not in supported: raise TargetStableError('Governor {} not supported for cpu {}'.format(governor, cpu)) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_governor'.format(cpu) - self.target.write_value(sysfile, governor) - self.set_governor_tunables(cpu, governor, **kwargs) + await self.target.write_value.asyn(sysfile, governor) + return await self.set_governor_tunables.asyn(cpu, governor, **kwargs) - @contextmanager - def use_governor(self, governor, cpus=None, **kwargs): + @asyn.asynccontextmanager + async def use_governor(self, governor, cpus=None, **kwargs): """ Use a given governor, then restore previous governor(s) @@ -111,66 +116,97 @@ def use_governor(self, governor, cpus=None, **kwargs): :Keyword Arguments: Governor tunables, See :meth:`set_governor_tunables` """ if not cpus: - cpus = self.target.list_online_cpus() - - # Setting a governor & tunables for a cpu will set them for all cpus - # in the same clock domain, so only manipulating one cpu per domain - # is enough - domains = set(self.get_affected_cpus(cpu)[0] for cpu in cpus) - prev_governors = {cpu : (self.get_governor(cpu), self.get_governor_tunables(cpu)) - for cpu in domains} - - # Special case for userspace, frequency is not seen as a tunable - userspace_freqs = {} - for cpu, (prev_gov, _) in prev_governors.items(): - if prev_gov == "userspace": - userspace_freqs[cpu] = self.get_frequency(cpu) - - for cpu in domains: - self.set_governor(cpu, governor, **kwargs) + cpus = await self.target.list_online_cpus.asyn() + + async def get_cpu_info(cpu): + return await asyn.concurrently(( + self.get_affected_cpus.asyn(cpu), + self.get_governor.asyn(cpu), + self.get_governor_tunables.asyn(cpu), + # We won't always use the frequency, but it's much quicker to + # do concurrently anyway so do it now + self.get_frequency.asyn(cpu), + )) + + cpus_infos = await asyn.map_concurrently(get_cpu_info, cpus) + + # Setting a governor & tunables for a cpu will set them for all cpus in + # the same cpufreq policy, so only manipulating one cpu per domain is + # enough + domains = set( + info[0][0] + for info in cpus_infos.values() + ) + + await asyn.concurrently( + self.set_governor.asyn(cpu, governor, **kwargs) + for cpu in domains + ) try: yield - finally: - for cpu, (prev_gov, tunables) in prev_governors.items(): - self.set_governor(cpu, prev_gov, **tunables) + async def set_gov(cpu): + domain, prev_gov, tunables, freq = cpus_infos[cpu] + await self.set_governor.asyn(cpu, prev_gov, **tunables) + # Special case for userspace, frequency is not seen as a tunable if prev_gov == "userspace": - self.set_frequency(cpu, userspace_freqs[cpu]) + await self.set_frequency.asyn(cpu, freq) + + await asyn.concurrently( + set_gov(cpu) + for cpu in domains + ) - def list_governor_tunables(self, cpu): + @asyn.asyncf + async def list_governor_tunables(self, cpu): """Returns a list of tunables available for the governor on the specified CPU.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - governor = self.get_governor(cpu) + governor = await self.get_governor.asyn(cpu) if governor not in self._governor_tunables: try: tunables_path = '/sys/devices/system/cpu/{}/cpufreq/{}'.format(cpu, governor) - self._governor_tunables[governor] = self.target.list_directory(tunables_path) + self._governor_tunables[governor] = await self.target.list_directory.asyn(tunables_path) except TargetStableError: # probably an older kernel try: tunables_path = '/sys/devices/system/cpu/cpufreq/{}'.format(governor) - self._governor_tunables[governor] = self.target.list_directory(tunables_path) + self._governor_tunables[governor] = await self.target.list_directory.asyn(tunables_path) except TargetStableError: # governor does not support tunables self._governor_tunables[governor] = [] return self._governor_tunables[governor] - def get_governor_tunables(self, cpu): + @asyn.asyncf + async def get_governor_tunables(self, cpu): if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - governor = self.get_governor(cpu) + governor, tunable_list = await asyn.concurrently(( + self.get_governor.asyn(cpu), + self.list_governor_tunables.asyn(cpu) + )) + + write_only = set(WRITE_ONLY_TUNABLES.get(governor, [])) + tunable_list = [ + tunable + for tunable in tunable_list + if tunable not in write_only + ] + tunables = {} - for tunable in self.list_governor_tunables(cpu): - if tunable not in WRITE_ONLY_TUNABLES.get(governor, []): - try: - path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) - tunables[tunable] = self.target.read_value(path) - except TargetStableError: # May be an older kernel - path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) - tunables[tunable] = self.target.read_value(path) + async def get_tunable(tunable): + try: + path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) + x = await self.target.read_value.asyn(path) + except TargetStableError: # May be an older kernel + path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) + x = await self.target.read_value.asyn(path) + return x + + tunables = await asyn.map_concurrently(get_tunable, tunable_list) return tunables - def set_governor_tunables(self, cpu, governor=None, **kwargs): + @asyn.asyncf + async def set_governor_tunables(self, cpu, governor=None, **kwargs): """ Set tunables for the specified governor. Tunables should be specified as keyword arguments. Which tunables and values are valid depends on the @@ -191,20 +227,20 @@ def set_governor_tunables(self, cpu, governor=None, **kwargs): if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) if governor is None: - governor = self.get_governor(cpu) - valid_tunables = self.list_governor_tunables(cpu) + governor = await self.get_governor.asyn(cpu) + valid_tunables = await self.list_governor_tunables.asyn(cpu) for tunable, value in kwargs.items(): if tunable in valid_tunables: path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) try: - self.target.write_value(path, value) + await self.target.write_value.asyn(path, value) except TargetStableError: - if self.target.file_exists(path): + if await self.target.file_exists.asyn(path): # File exists but we did something wrong raise # Expected file doesn't exist, try older sysfs layout. path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) - self.target.write_value(path, value) + await self.target.write_value.asyn(path, value) else: message = 'Unexpected tunable {} for governor {} on {}.\n'.format(tunable, governor, cpu) message += 'Available tunables are: {}'.format(valid_tunables) @@ -301,7 +337,8 @@ def set_min_frequency(self, cpu, frequency, exact=True): except ValueError: raise ValueError('Frequency must be an integer; got: "{}"'.format(frequency)) - def get_frequency(self, cpu, cpuinfo=False): + @asyn.asyncf + async def get_frequency(self, cpu, cpuinfo=False): """ Returns the current frequency currently set for the specified CPU. @@ -321,9 +358,10 @@ def get_frequency(self, cpu, cpuinfo=False): sysfile = '/sys/devices/system/cpu/{}/cpufreq/{}'.format( cpu, 'cpuinfo_cur_freq' if cpuinfo else 'scaling_cur_freq') - return self.target.read_int(sysfile) + return await self.target.read_int.asyn(sysfile) - def set_frequency(self, cpu, frequency, exact=True): + @asyn.asyncf + async def set_frequency(self, cpu, frequency, exact=True): """ Set's the minimum value for CPU frequency. Actual frequency will depend on the Governor used and may vary during execution. The value should be @@ -347,16 +385,16 @@ def set_frequency(self, cpu, frequency, exact=True): try: value = int(frequency) if exact: - available_frequencies = self.list_frequencies(cpu) + available_frequencies = await self.list_frequencies.asyn(cpu) if available_frequencies and value not in available_frequencies: raise TargetStableError('Can\'t set {} frequency to {}\nmust be in {}'.format(cpu, value, available_frequencies)) - if self.get_governor(cpu) != 'userspace': + if await self.get_governor.asyn(cpu) != 'userspace': raise TargetStableError('Can\'t set {} frequency; governor must be "userspace"'.format(cpu)) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_setspeed'.format(cpu) - self.target.write_value(sysfile, value, verify=False) - cpuinfo = self.get_frequency(cpu, cpuinfo=True) + await self.target.write_value.asyn(sysfile, value, verify=False) + cpuinfo = await self.get_frequency.asyn(cpu, cpuinfo=True) if cpuinfo != value: self.logger.warning( 'The cpufreq value has not been applied properly cpuinfo={} request={}'.format(cpuinfo, value)) @@ -495,7 +533,8 @@ def trace_frequencies(self): # pylint: disable=protected-access return self.target._execute_util('cpufreq_trace_all_frequencies', as_root=True) - def get_affected_cpus(self, cpu): + @asyn.asyncf + async def get_affected_cpus(self, cpu): """ Get the online CPUs that share a frequency domain with the given CPU """ @@ -504,7 +543,8 @@ def get_affected_cpus(self, cpu): sysfile = '/sys/devices/system/cpu/{}/cpufreq/affected_cpus'.format(cpu) - return [int(c) for c in self.target.read_value(sysfile).split()] + content = await self.target.read_value.asyn(sysfile) + return [int(c) for c in content.split()] @memoized def get_related_cpus(self, cpu): diff --git a/devlib/target.py b/devlib/target.py index b78ca6dff..b70f64aa5 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -13,6 +13,7 @@ # limitations under the License. # +import asyncio import io import base64 import functools @@ -57,7 +58,9 @@ from devlib.utils.misc import commonprefix, merge_lists from devlib.utils.misc import ABI_MAP, get_cpu_name, ranges_to_list from devlib.utils.misc import batch_contextmanager, tls_property, nullcontext +from devlib.utils.misc import strip_bash_colors from devlib.utils.types import integer, boolean, bitmask, identifier, caseless_string, bytes_regex +import devlib.utils.asyn as asyn FSTAB_ENTRY_REGEX = re.compile(r'(\S+) on (.+) type (\S+) \((\S+)\)') @@ -728,7 +731,58 @@ def _prepare_cmd(self, command, force_locale): return command @call_conn - def execute(self, command, timeout=None, check_exit_code=True, + @asyn.asyncf + async def _execute_async(self, command, timeout=None, as_root=False, strip_colors=True, will_succeed=False, check_exit_code=True, force_locale='C'): + bg = self.background( + command=command, + as_root=as_root, + force_locale=force_locale, + ) + + def process(streams): + # Make sure we don't accidentally end up with "\n" if both streams + # are empty + res = b'\n'.join(x for x in streams if x).decode() + if strip_colors: + res = strip_bash_colors(res) + return res + + def thread_f(): + streams = (None, None) + excep = None + try: + with bg as _bg: + streams = _bg.communicate(timeout=timeout) + except BaseException as e: + excep = e + + if isinstance(excep, subprocess.CalledProcessError): + if check_exit_code: + excep = TargetStableError(excep) + else: + streams = (excep.output, excep.stderr) + excep = None + + if will_succeed and isinstance(excep, TargetStableError): + excep = TargetTransientError(excep) + + if excep is None: + res = process(streams) + loop.call_soon_threadsafe(future.set_result, res) + else: + loop.call_soon_threadsafe(future.set_exception, excep) + + loop = asyncio.get_running_loop() + future = asyncio.Future() + thread = threading.Thread( + target=thread_f, + daemon=True, + ) + thread.start() + return await future + + @call_conn + def _execute(self, command, timeout=None, check_exit_code=True, as_root=False, strip_colors=True, will_succeed=False, force_locale='C'): @@ -737,6 +791,11 @@ def execute(self, command, timeout=None, check_exit_code=True, check_exit_code=check_exit_code, as_root=as_root, strip_colors=strip_colors, will_succeed=will_succeed) + execute = asyn._AsyncPolymorphicFunction( + asyn=_execute_async.asyn, + blocking=_execute, + ) + @call_conn def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False, force_locale='C', timeout=None): @@ -821,18 +880,22 @@ def kick_off(self, command, as_root=False): # sysfs interaction - def read_value(self, path, kind=None): - output = self.execute('cat {}'.format(quote(path)), as_root=self.needs_su).strip() # pylint: disable=E1103 + @asyn.asyncf + async def read_value(self, path, kind=None): + output = await self.execute.asyn('cat {}'.format(quote(path)), as_root=self.needs_su) # pylint: disable=E1103 + output = output.strip() if kind: return kind(output) else: return output - def read_int(self, path): - return self.read_value(path, kind=integer) + @asyn.asyncf + async def read_int(self, path): + return await self.read_value.asyn(path, kind=integer) - def read_bool(self, path): - return self.read_value(path, kind=boolean) + @asyn.asyncf + async def read_bool(self, path): + return await self.read_value.asyn(path, kind=boolean) @contextmanager def revertable_write_value(self, path, value, verify=True): @@ -846,7 +909,8 @@ def revertable_write_value(self, path, value, verify=True): def batch_revertable_write_value(self, kwargs_list): return batch_contextmanager(self.revertable_write_value, kwargs_list) - def write_value(self, path, value, verify=True): + @asyn.asyncf + async def write_value(self, path, value, verify=True): value = str(value) if verify: @@ -875,7 +939,7 @@ def write_value(self, path, value, verify=True): cmd = cmd.format(busybox=quote(self.busybox), path=quote(path), value=quote(value)) try: - self.execute(cmd, check_exit_code=True, as_root=True) + await self.execute.asyn(cmd, check_exit_code=True, as_root=True) except TargetCalledProcessError as e: if e.returncode == 10: raise TargetStableError('Could not write "{value}" to {path}: {e.output}'.format( @@ -929,9 +993,10 @@ def ps(self, **kwargs): def makedirs(self, path): self.execute('mkdir -p {}'.format(quote(path))) - def file_exists(self, filepath): + @asyn.asyncf + async def file_exists(self, filepath): command = 'if [ -e {} ]; then echo 1; else echo 0; fi' - output = self.execute(command.format(quote(filepath)), as_root=self.is_rooted) + output = await self.execute.asyn(command.format(quote(filepath)), as_root=self.is_rooted) return boolean(output.strip()) def directory_exists(self, filepath): @@ -978,9 +1043,10 @@ def remove(self, path, as_root=False): def core_cpus(self, core): return [i for i, c in enumerate(self.core_names) if c == core] - def list_online_cpus(self, core=None): + @asyn.asyncf + async def list_online_cpus(self, core=None): path = self.path.join('/sys/devices/system/cpu/online') - output = self.read_value(path) + output = await self.read_value.asyn(path) all_online = ranges_to_list(output) if core: cpus = self.core_cpus(core) @@ -1414,8 +1480,9 @@ def ps(self, threads=False, **kwargs): filtered_result.append(entry) return filtered_result - def list_directory(self, path, as_root=False): - contents = self.execute('ls -1 {}'.format(quote(path)), as_root=as_root) + @asyn.asyncf + async def list_directory(self, path, as_root=False): + contents = await self.execute.asyn('ls -1 {}'.format(quote(path)), as_root=as_root) return [x.strip() for x in contents.split('\n') if x.strip()] def install(self, filepath, timeout=None, with_name=None): # pylint: disable=W0221 diff --git a/devlib/utils/asyn.py b/devlib/utils/asyn.py new file mode 100644 index 000000000..f77d7524f --- /dev/null +++ b/devlib/utils/asyn.py @@ -0,0 +1,219 @@ +# Copyright 2013-2018 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +Async-related utilities +""" + +import asyncio +import functools +import sys +import logging +import contextlib + +# Allow nesting asyncio loops, which is necessary for: +# * Being able to call the blocking variant of a function from an async +# function for backward compat +# * Critically, run the blocking variant of a function in a Jupyter notebook +# environment, since it also uses asyncio. +# +# Maybe there is still hope for future versions of Python though: +# https://bugs.python.org/issue22239 +import nest_asyncio +nest_asyncio.apply() + + +if sys.version_info >= (3, 7): + def _create_task(awaitable): + return asyncio.create_task(awaitable) + + _asynccontextmanager = contextlib.asynccontextmanager + + def run(coro): + return asyncio.run(coro) + +else: + def _create_task(awaitable): + return asyncio.get_running_loop().create_task(awaitable) + + _asynccontextmanager = async_generator.asynccontextmanager + + def run(coro): + loop = asyncio.get_event_loop() + # This will work thanks to nest_asyncio even if loop.is_running() is True + result = loop.run_until_complete(coro) + return result + + +def create_task(awaitable): + if isinstance(awaitable, asyncio.Task): + return awaitable + else: + return _create_task(awaitable) + + +async def concurrently(awaitables): + """ + Await concurrently for the given awaitables, and cancel them as soon as one + raises an exception. + """ + tasks = list(map(make_task, awaitables)) + try: + return await asyncio.gather(*tasks) + except BaseException: + for task in tasks: + task.cancel() + raise + + +async def map_concurrently(f, keys): + """ + Similar to :func:`devlib.asyn.concurrently`, but maps the given function + ``f`` on the given ``keys``. + + :return: A dictionary with ``keys`` as keys, and function result as values. + """ + keys = list(keys) + return dict(zip( + keys, + await concurrently(map(f, keys)) + )) + + +def compose(*coros): + """ + Compose coroutines, feeding the output of each as the input of the next + one. + + ``await compose(f, g)(x)`` is equivalent to ``await f(await g(x))`` + + .. note:: In Haskell, ``compose f g h`` would be equivalent to ``f <=< g <=< h`` + """ + async def f(*args, **kwargs): + empty_dict = {} + for coro in reversed(coros): + x = coro(*args, **kwargs) + # Allow mixing corountines and regular functions + if asyncio.isfuture(x): + x = await x + args = [x] + kwargs = empty_dict + + return x + return f + + +class _AsyncPolymorphicFunction: + """ + A callable that allows exposing both a synchronous and asynchronous API. + + When called, the blocking synchronous operation is called. The ```asyn`` + attribute gives access to the asynchronous version of the function, and all + the other attribute access will be redirected to the async function. + """ + def __init__(self, asyn, blocking): + self.asyn = asyn + self.blocking = blocking + + def __get__(self, *args, **kwargs): + return self.__class__( + asyn=self.asyn.__get__(*args, **kwargs), + blocking=self.blocking.__get__(*args, **kwargs), + ) + + def __call__(self, *args, **kwargs): + return self.blocking(*args, **kwargs) + + def __getattr__(self, attr): + return getattr(self.asyn, attr) + + +def asyncf(f): + """ + Decorator used to turn a coroutine into a blocking function, with an + optional asynchronous API. + + **Example**:: + + @asyncf + async def foo(x): + await do_some_async_things(x) + return x + + # Blocking call, just as if the function was synchronous, except it may + # use asynchronous code inside, e.g. to do concurrent operations. + foo(42) + + # Asynchronous API, foo.asyn being a corountine + await foo.asyn(42) + + This allows the same implementation to be both used as blocking for ease of + use and backward compatibility, or exposed as a corountine for callers that + can deal with awaitables. + """ + @functools.wraps(f) + def blocking(*args, **kwargs): + # Since run() needs a corountine, make sure we provide one + async def wrapper(): + return await f(*args, **kwargs) + return run(wrapper()) + + @functools.wraps(f) + def concurrent(*args, **kwargs): + awaitable = f(*args, **kwargs) + task = make_task(awaitable) + return task + + return _AsyncPolymorphicFunction( + asyn=f, + blocking=blocking, + ) + + +class _AsyncPolymorphicCM: + """ + Wrap an async context manager such that it exposes a synchronous API as + well for backward compatibility. + """ + def __init__(self, async_cm): + self.cm = async_cm + + def __aenter__(self, *args, **kwargs): + return self.cm.__aenter__(*args, **kwargs) + + def __aexit__(self, *args, **kwargs): + return self.cm.__aexit__(*args, **kwargs) + + def __enter__(self, *args, **kwargs): + return run(self.cm.__aenter__(*args, **kwargs)) + + def __exit__(self, *args, **kwargs): + return run(self.cm.__aexit__(*args, **kwargs)) + + +def asynccontextmanager(f): + """ + Same as :func:`contextlib.asynccontextmanager` except that it can also be + used with a regular ``with`` statement for backward compatibility. + """ + f = _asynccontextmanager(f) + + @functools.wraps(f) + def wrapper(*args, **kwargs): + cm = f(*args, **kwargs) + return _AsyncPolymorphicCM(cm) + + return wrapper diff --git a/setup.py b/setup.py index 0365683e6..7dcf2ce78 100644 --- a/setup.py +++ b/setup.py @@ -97,6 +97,8 @@ 'pandas<=0.24.2; python_version<"3"', 'pandas; python_version>"3"', 'lxml', # More robust xml parsing + 'nest_asyncio', # Allows running nested asyncio loops + 'async_generator; python_version<"3.7"', # Backport of contextlib.asynccontextmanager ], extras_require={ 'daq': ['daqpower>=2'],