From 2b8bd69202f79e111d49719c5efb259855e2e4f4 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 15 Nov 2021 14:47:16 +0000 Subject: [PATCH] devlib: Use async Target API Make use of the new async API to speedup other parts of devlib. --- devlib/collector/ftrace.py | 23 +++-- devlib/module/cgroups.py | 24 +++-- devlib/module/cpufreq.py | 204 +++++++++++++++++++++++-------------- devlib/module/cpuidle.py | 79 ++++++++------ 4 files changed, 205 insertions(+), 125 deletions(-) diff --git a/devlib/collector/ftrace.py b/devlib/collector/ftrace.py index 9849f4e21..292aa8271 100644 --- a/devlib/collector/ftrace.py +++ b/devlib/collector/ftrace.py @@ -28,6 +28,7 @@ from devlib.host import PACKAGE_BIN_DIRECTORY from devlib.exception import TargetStableError, HostError from devlib.utils.misc import check_output, which, memoized +from devlib.utils.asyn import asyncf TRACE_MARKER_START = 'TRACE_MARKER_START' @@ -243,7 +244,8 @@ def reset(self): self.target.write_value(self.function_profile_file, 0, verify=False) self._reset_needed = False - def start(self): + @asyncf + async def start(self): self.start_time = time.time() if self._reset_needed: self.reset() @@ -282,14 +284,17 @@ def start(self): self.target.cpuidle.perturb_cpus() # Enable kernel function profiling if self.functions and self.tracer is None: - self.target.execute('echo nop > {}'.format(self.current_tracer_file), - as_root=True) - self.target.execute('echo 0 > {}'.format(self.function_profile_file), - as_root=True) - self.target.execute('echo {} > {}'.format(self.function_string, self.ftrace_filter_file), - as_root=True) - self.target.execute('echo 1 > {}'.format(self.function_profile_file), - as_root=True) + target = self.target + await target.async_manager.concurrently( + execute.asyn('echo nop > {}'.format(self.current_tracer_file), + as_root=True), + execute.asyn('echo 0 > {}'.format(self.function_profile_file), + as_root=True), + execute.asyn('echo {} > {}'.format(self.function_string, self.ftrace_filter_file), + as_root=True), + execute.asyn('echo 1 > {}'.format(self.function_profile_file), + as_root=True), + ) def stop(self): diff --git a/devlib/module/cgroups.py b/devlib/module/cgroups.py index 6cbab19e0..ea71e2487 100644 --- a/devlib/module/cgroups.py +++ b/devlib/module/cgroups.py @@ -19,11 +19,13 @@ from shlex import quote import itertools import warnings +import asyncio from devlib.module import Module from devlib.exception import TargetStableError from devlib.utils.misc import list_to_ranges, isiterable from devlib.utils.types import boolean +from devlib.utils.asyn import asyncf class Controller(object): @@ -55,7 +57,8 @@ def __init__(self, kind, hid, clist): self.mount_point = None self._cgroups = {} - def mount(self, target, mount_root): + @asyncf + async def mount(self, target, mount_root): mounted = target.list_file_systems() if self.mount_name in [e.device for e in mounted]: @@ -68,16 +71,16 @@ def mount(self, target, mount_root): else: # Mount the controller if not already in use self.mount_point = target.path.join(mount_root, self.mount_name) - target.execute('mkdir -p {} 2>/dev/null'\ + await target.execute.asyn('mkdir -p {} 2>/dev/null'\ .format(self.mount_point), as_root=True) - target.execute('mount -t cgroup -o {} {} {}'\ + await target.execute.asyn('mount -t cgroup -o {} {} {}'\ .format(','.join(self.clist), self.mount_name, self.mount_point), as_root=True) # Check if this controller uses "noprefix" option - output = target.execute('mount | grep "{} "'.format(self.mount_name)) + output = await target.execute.asyn('mount | grep "{} "'.format(self.mount_name)) if 'noprefix' in output: self._noprefix = True # self.logger.debug('Controller %s using "noprefix" option', @@ -394,11 +397,12 @@ def __init__(self, target): # Initialize controllers self.logger.info('Available controllers:') self.controllers = {} - for ss in subsys: + + async def register_controller(ss): hid = ss.hierarchy controller = Controller(ss.name, hid, hierarchy[hid]) try: - controller.mount(self.target, self.cgroup_root) + await controller.mount.asyn(self.target, self.cgroup_root) except TargetStableError: message = 'Failed to mount "{}" controller' raise TargetStableError(message.format(controller.kind)) @@ -406,6 +410,14 @@ def __init__(self, target): controller.mount_point) self.controllers[ss.name] = controller + asyncio.run( + target.async_manager.map_concurrently( + register_controller, + subsys, + ) + ) + + def list_subsystems(self): subsystems = [] for line in self.target.execute('{} cat /proc/cgroups'\ diff --git a/devlib/module/cpufreq.py b/devlib/module/cpufreq.py index f559ef6cc..1ffa4d8ca 100644 --- a/devlib/module/cpufreq.py +++ b/devlib/module/cpufreq.py @@ -13,10 +13,12 @@ # limitations under the License. # from contextlib import contextmanager +from operator import itemgetter from devlib.module import Module from devlib.exception import TargetStableError from devlib.utils.misc import memoized +import devlib.utils.asyn as asyn # a dict of governor name and a list of it tunables that can't be read @@ -30,44 +32,52 @@ class CpufreqModule(Module): name = 'cpufreq' @staticmethod - def probe(target): - - # x86 with Intel P-State driver - if target.abi == 'x86_64': - path = '/sys/devices/system/cpu/intel_pstate' - if target.file_exists(path): - return True - - # Generic CPUFreq support (single policy) - path = '/sys/devices/system/cpu/cpufreq/policy0' - if target.file_exists(path): - return True - - # Generic CPUFreq support (per CPU policy) - path = '/sys/devices/system/cpu/cpu0/cpufreq' - return target.file_exists(path) + @asyn.asyncf + async def probe(target): + paths = [ + # x86 with Intel P-State driver + (target.abi == 'x86_64', '/sys/devices/system/cpu/intel_pstate'), + # Generic CPUFreq support (single policy) + (True, '/sys/devices/system/cpu/cpufreq/policy0'), + # Generic CPUFreq support (per CPU policy) + (True, '/sys/devices/system/cpu/cpu0/cpufreq'), + ] + paths = [ + path[1] for path in paths + if path[0] + ] + + exists = await target.async_manager.map_concurrently( + target.file_exists.asyn, + paths, + ) + + return any(exists.values()) def __init__(self, target): super(CpufreqModule, self).__init__(target) self._governor_tunables = {} @memoized - def list_governors(self, cpu): + @asyn.asyncf + async def list_governors(self, cpu): """Returns a list of governors supported by the cpu.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_available_governors'.format(cpu) - output = self.target.read_value(sysfile) + output = await self.target.read_value.asyn(sysfile) return output.strip().split() - def get_governor(self, cpu): + @asyn.asyncf + async def get_governor(self, cpu): """Returns the governor currently set for the specified CPU.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_governor'.format(cpu) - return self.target.read_value(sysfile) + return await self.target.read_value.asyn(sysfile) - def set_governor(self, cpu, governor, **kwargs): + @asyn.asyncf + async def set_governor(self, cpu, governor, **kwargs): """ Set the governor for the specified CPU. See https://www.kernel.org/doc/Documentation/cpu-freq/governors.txt @@ -90,15 +100,15 @@ def set_governor(self, cpu, governor, **kwargs): """ if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - supported = self.list_governors(cpu) + supported = await self.list_governors.asyn(cpu) if governor not in supported: raise TargetStableError('Governor {} not supported for cpu {}'.format(governor, cpu)) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_governor'.format(cpu) - self.target.write_value(sysfile, governor) - self.set_governor_tunables(cpu, governor, **kwargs) + await self.target.write_value.asyn(sysfile, governor) + return await self.set_governor_tunables.asyn(cpu, governor, **kwargs) - @contextmanager - def use_governor(self, governor, cpus=None, **kwargs): + @asyn.asynccontextmanager + async def use_governor(self, governor, cpus=None, **kwargs): """ Use a given governor, then restore previous governor(s) @@ -111,66 +121,97 @@ def use_governor(self, governor, cpus=None, **kwargs): :Keyword Arguments: Governor tunables, See :meth:`set_governor_tunables` """ if not cpus: - cpus = self.target.list_online_cpus() - - # Setting a governor & tunables for a cpu will set them for all cpus - # in the same clock domain, so only manipulating one cpu per domain - # is enough - domains = set(self.get_affected_cpus(cpu)[0] for cpu in cpus) - prev_governors = {cpu : (self.get_governor(cpu), self.get_governor_tunables(cpu)) - for cpu in domains} - - # Special case for userspace, frequency is not seen as a tunable - userspace_freqs = {} - for cpu, (prev_gov, _) in prev_governors.items(): - if prev_gov == "userspace": - userspace_freqs[cpu] = self.get_frequency(cpu) - - for cpu in domains: - self.set_governor(cpu, governor, **kwargs) + cpus = await self.target.list_online_cpus.asyn() + + async def get_cpu_info(cpu): + return await self.target.async_manager.concurrently(( + self.get_affected_cpus.asyn(cpu), + self.get_governor.asyn(cpu), + self.get_governor_tunables.asyn(cpu), + # We won't always use the frequency, but it's much quicker to + # do concurrently anyway so do it now + self.get_frequency.asyn(cpu), + )) + + cpus_infos = await self.target.async_manager.map_concurrently(get_cpu_info, cpus) + + # Setting a governor & tunables for a cpu will set them for all cpus in + # the same cpufreq policy, so only manipulating one cpu per domain is + # enough + domains = set( + info[0][0] + for info in cpus_infos.values() + ) + + await self.target.async_manager.concurrently( + self.set_governor.asyn(cpu, governor, **kwargs) + for cpu in domains + ) try: yield - finally: - for cpu, (prev_gov, tunables) in prev_governors.items(): - self.set_governor(cpu, prev_gov, **tunables) + async def set_gov(cpu): + domain, prev_gov, tunables, freq = cpus_infos[cpu] + await self.set_governor.asyn(cpu, prev_gov, **tunables) + # Special case for userspace, frequency is not seen as a tunable if prev_gov == "userspace": - self.set_frequency(cpu, userspace_freqs[cpu]) + await self.set_frequency.asyn(cpu, freq) + + await self.target.async_manager.concurrently( + set_gov(cpu) + for cpu in domains + ) - def list_governor_tunables(self, cpu): + @asyn.asyncf + async def list_governor_tunables(self, cpu): """Returns a list of tunables available for the governor on the specified CPU.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - governor = self.get_governor(cpu) + governor = await self.get_governor.asyn(cpu) if governor not in self._governor_tunables: try: tunables_path = '/sys/devices/system/cpu/{}/cpufreq/{}'.format(cpu, governor) - self._governor_tunables[governor] = self.target.list_directory(tunables_path) + self._governor_tunables[governor] = await self.target.list_directory.asyn(tunables_path) except TargetStableError: # probably an older kernel try: tunables_path = '/sys/devices/system/cpu/cpufreq/{}'.format(governor) - self._governor_tunables[governor] = self.target.list_directory(tunables_path) + self._governor_tunables[governor] = await self.target.list_directory.asyn(tunables_path) except TargetStableError: # governor does not support tunables self._governor_tunables[governor] = [] return self._governor_tunables[governor] - def get_governor_tunables(self, cpu): + @asyn.asyncf + async def get_governor_tunables(self, cpu): if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) - governor = self.get_governor(cpu) + governor, tunable_list = await self.target.async_manager.concurrently(( + self.get_governor.asyn(cpu), + self.list_governor_tunables.asyn(cpu) + )) + + write_only = set(WRITE_ONLY_TUNABLES.get(governor, [])) + tunable_list = [ + tunable + for tunable in tunable_list + if tunable not in write_only + ] + tunables = {} - for tunable in self.list_governor_tunables(cpu): - if tunable not in WRITE_ONLY_TUNABLES.get(governor, []): - try: - path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) - tunables[tunable] = self.target.read_value(path) - except TargetStableError: # May be an older kernel - path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) - tunables[tunable] = self.target.read_value(path) + async def get_tunable(tunable): + try: + path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) + x = await self.target.read_value.asyn(path) + except TargetStableError: # May be an older kernel + path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) + x = await self.target.read_value.asyn(path) + return x + + tunables = await self.target.async_manager.map_concurrently(get_tunable, tunable_list) return tunables - def set_governor_tunables(self, cpu, governor=None, **kwargs): + @asyn.asyncf + async def set_governor_tunables(self, cpu, governor=None, **kwargs): """ Set tunables for the specified governor. Tunables should be specified as keyword arguments. Which tunables and values are valid depends on the @@ -191,34 +232,35 @@ def set_governor_tunables(self, cpu, governor=None, **kwargs): if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) if governor is None: - governor = self.get_governor(cpu) - valid_tunables = self.list_governor_tunables(cpu) + governor = await self.get_governor.asyn(cpu) + valid_tunables = await self.list_governor_tunables.asyn(cpu) for tunable, value in kwargs.items(): if tunable in valid_tunables: path = '/sys/devices/system/cpu/{}/cpufreq/{}/{}'.format(cpu, governor, tunable) try: - self.target.write_value(path, value) + await self.target.write_value.asyn(path, value) except TargetStableError: - if self.target.file_exists(path): + if await self.target.file_exists.asyn(path): # File exists but we did something wrong raise # Expected file doesn't exist, try older sysfs layout. path = '/sys/devices/system/cpu/cpufreq/{}/{}'.format(governor, tunable) - self.target.write_value(path, value) + await self.target.write_value.asyn(path, value) else: message = 'Unexpected tunable {} for governor {} on {}.\n'.format(tunable, governor, cpu) message += 'Available tunables are: {}'.format(valid_tunables) raise TargetStableError(message) @memoized - def list_frequencies(self, cpu): + @asyn.asyncf + async def list_frequencies(self, cpu): """Returns a sorted list of frequencies supported by the cpu or an empty list if not could be found.""" if isinstance(cpu, int): cpu = 'cpu{}'.format(cpu) try: cmd = 'cat /sys/devices/system/cpu/{}/cpufreq/scaling_available_frequencies'.format(cpu) - output = self.target.execute(cmd) + output = await self.target.execute.asyn(cmd) available_frequencies = list(map(int, output.strip().split())) # pylint: disable=E1103 except TargetStableError: # On some devices scaling_frequencies is not generated. @@ -226,7 +268,7 @@ def list_frequencies(self, cpu): # Fall back to parsing stats/time_in_state path = '/sys/devices/system/cpu/{}/cpufreq/stats/time_in_state'.format(cpu) try: - out_iter = iter(self.target.read_value(path).split()) + out_iter = (await self.target.read_value.asyn(path)).split() except TargetStableError: if not self.target.file_exists(path): # Probably intel_pstate. Can't get available freqs. @@ -301,7 +343,8 @@ def set_min_frequency(self, cpu, frequency, exact=True): except ValueError: raise ValueError('Frequency must be an integer; got: "{}"'.format(frequency)) - def get_frequency(self, cpu, cpuinfo=False): + @asyn.asyncf + async def get_frequency(self, cpu, cpuinfo=False): """ Returns the current frequency currently set for the specified CPU. @@ -321,9 +364,10 @@ def get_frequency(self, cpu, cpuinfo=False): sysfile = '/sys/devices/system/cpu/{}/cpufreq/{}'.format( cpu, 'cpuinfo_cur_freq' if cpuinfo else 'scaling_cur_freq') - return self.target.read_int(sysfile) + return await self.target.read_int.asyn(sysfile) - def set_frequency(self, cpu, frequency, exact=True): + @asyn.asyncf + async def set_frequency(self, cpu, frequency, exact=True): """ Set's the minimum value for CPU frequency. Actual frequency will depend on the Governor used and may vary during execution. The value should be @@ -347,16 +391,16 @@ def set_frequency(self, cpu, frequency, exact=True): try: value = int(frequency) if exact: - available_frequencies = self.list_frequencies(cpu) + available_frequencies = await self.list_frequencies.asyn(cpu) if available_frequencies and value not in available_frequencies: raise TargetStableError('Can\'t set {} frequency to {}\nmust be in {}'.format(cpu, value, available_frequencies)) - if self.get_governor(cpu) != 'userspace': + if await self.get_governor.asyn(cpu) != 'userspace': raise TargetStableError('Can\'t set {} frequency; governor must be "userspace"'.format(cpu)) sysfile = '/sys/devices/system/cpu/{}/cpufreq/scaling_setspeed'.format(cpu) - self.target.write_value(sysfile, value, verify=False) - cpuinfo = self.get_frequency(cpu, cpuinfo=True) + await self.target.write_value.asyn(sysfile, value, verify=False) + cpuinfo = await self.get_frequency.asyn(cpu, cpuinfo=True) if cpuinfo != value: self.logger.warning( 'The cpufreq value has not been applied properly cpuinfo={} request={}'.format(cpuinfo, value)) @@ -495,7 +539,8 @@ def trace_frequencies(self): # pylint: disable=protected-access return self.target._execute_util('cpufreq_trace_all_frequencies', as_root=True) - def get_affected_cpus(self, cpu): + @asyn.asyncf + async def get_affected_cpus(self, cpu): """ Get the online CPUs that share a frequency domain with the given CPU """ @@ -504,7 +549,8 @@ def get_affected_cpus(self, cpu): sysfile = '/sys/devices/system/cpu/{}/cpufreq/affected_cpus'.format(cpu) - return [int(c) for c in self.target.read_value(sysfile).split()] + content = await self.target.read_value.asyn(sysfile) + return [int(c) for c in content.split()] @memoized def get_related_cpus(self, cpu): diff --git a/devlib/module/cpuidle.py b/devlib/module/cpuidle.py index 93774bee0..9213ad082 100644 --- a/devlib/module/cpuidle.py +++ b/devlib/module/cpuidle.py @@ -20,6 +20,7 @@ from devlib.module import Module from devlib.utils.types import integer, boolean +import devlib.utils.asyn as asyn class CpuidleState(object): @@ -57,19 +58,23 @@ def __init__(self, target, index, path, name, desc, power, latency, residency): self.id = self.target.path.basename(self.path) self.cpu = self.target.path.basename(self.target.path.dirname(path)) - def enable(self): - self.set('disable', 0) + @asyn.asyncf + async def enable(self): + await self.set.asyn('disable', 0) - def disable(self): - self.set('disable', 1) + @asyn.asyncf + async def disable(self): + await self.set.asyn('disable', 1) - def get(self, prop): + @asyn.asyncf + async def get(self, prop): property_path = self.target.path.join(self.path, prop) - return self.target.read_value(property_path) + return await self.target.read_value.asyn(property_path) - def set(self, prop, value): + @asyn.asyncf + async def set(self, prop, value): property_path = self.target.path.join(self.path, prop) - self.target.write_value(property_path, value) + await self.target.write_value.asyn(property_path, value) def __eq__(self, other): if isinstance(other, CpuidleState): @@ -94,8 +99,9 @@ class Cpuidle(Module): root_path = '/sys/devices/system/cpu/cpuidle' @staticmethod - def probe(target): - return target.file_exists(Cpuidle.root_path) + @asyn.asyncf + async def probe(target): + return await target.file_exists.asyn(Cpuidle.root_path) def __init__(self, target): super(Cpuidle, self).__init__(target) @@ -146,32 +152,43 @@ def get_state(self, state, cpu=0): return s raise ValueError('Cpuidle state {} does not exist'.format(state)) - def enable(self, state, cpu=0): - self.get_state(state, cpu).enable() - - def disable(self, state, cpu=0): - self.get_state(state, cpu).disable() - - def enable_all(self, cpu=0): - for state in self.get_states(cpu): - state.enable() - - def disable_all(self, cpu=0): - for state in self.get_states(cpu): - state.disable() - - def perturb_cpus(self): + @asyn.asyncf + async def enable(self, state, cpu=0): + await self.get_state(state, cpu).enable.asyn() + + @asyn.asyncf + async def disable(self, state, cpu=0): + await self.get_state(state, cpu).disable.asyn() + + @asyn.asyncf + async def enable_all(self, cpu=0): + await self.target.async_manager.concurrently( + state.enable.asyn() + for state in self.get_states(cpu) + ) + + @asyn.asyncf + async def disable_all(self, cpu=0): + await self.target.async_manager.concurrently( + state.disable.asyn() + for state in self.get_states(cpu) + ) + + @asyn.asyncf + async def perturb_cpus(self): """ Momentarily wake each CPU. Ensures cpu_idle events in trace file. """ # pylint: disable=protected-access - self.target._execute_util('cpuidle_wake_all_cpus') + await self.target._execute_util.asyn('cpuidle_wake_all_cpus') - def get_driver(self): - return self.target.read_value(self.target.path.join(self.root_path, 'current_driver')) + @asyn.asyncf + async def get_driver(self): + return await self.target.read_value.asyn(self.target.path.join(self.root_path, 'current_driver')) - def get_governor(self): + @asyn.asyncf + async def get_governor(self): path = self.target.path.join(self.root_path, 'current_governor_ro') - if not self.target.file_exists(path): + if not await self.target.file_exists.asyn(path): path = self.target.path.join(self.root_path, 'current_governor') - return self.target.read_value(path) + return await self.target.read_value.asyn(path)