Source code for lisa.wlgen.workload

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2018, Arm Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
from pathlib import Path
from shlex import quote
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from subprocess import CalledProcessError
import functools
import uuid
import warnings
import contextlib
import shutil
import inspect

from devlib.utils.misc import list_to_mask

from lisa.utils import ArtifactPath, Loggable, PartialInit, deprecate, destroyablecontextmanager, ContextManagerExit


class _WorkloadRunCMDecorator:
    def __init__(self, gen_f):
        self.gen_f = gen_f
        # Set __doc__ and the likes
        functools.update_wrapper(self, gen_f)

    def __call__(self, wload, *args, **kwargs):
        return _WorkloadRunCM(wload, self.gen_f, args, kwargs)

    def __get__(self, instance, owner=None):
        """
        Behave like a regular method
        """
        if instance is None:
            return self
        else:
            return functools.partial(self, instance)


class _NotSet:
    def __init__(self, excep):
        self.excep = excep


class _WorkloadRunCM(Loggable):
    def __init__(self, wload, f, args, kwargs):
        self.gen = f(wload, *args, **kwargs)
        self._bg_cmd = None
        self._futures = None
        self._output = _NotSet(RuntimeError('The output cannot be used inside the "with" statement that computes it'))
        self.name = f.__qualname__.split('.')[0]
        self.wload = wload

    @property
    def output(self):
        out = self._output
        if isinstance(out, _NotSet):
            raise out.excep
        else:
            return out

    def __getattr__(self, attr):
        """
        Make the context manager look like the
        :class:`devlib.connection.BackgroundCommand` it wraps.
        """
        if attr in ('stdout', 'stderr'):
            raise AttributeError(f'Attribute "{attr}" is not available, use "output" attribute outside the "with" block to get the postprocessed output')
        return getattr(self._bg_cmd, attr)

    @staticmethod
    def _read_streams(bg_cmd):

        def read(pipe):
            return pipe.read()

        pipes = dict(
            stderr=bg_cmd.stderr,
            stdout=bg_cmd.stdout,
        )
        executor = None
        try:
            executor = ThreadPoolExecutor(max_workers=len(pipes))
            futures = {
                name: executor.submit(read, pipe)
                for name, pipe in pipes.items()
            }
        except BaseException:
            # If something bad happened, ensure the executor is correctly
            # shutdown and all futures canceled to avoid leaking any thread
            if executor is not None:
                executor.shutdown(wait=False, cancel_futures=True)
            raise
        finally:
            # Ask the executor to free the resources as soon as all the futures
            # are completed
            if executor is not None:
                executor.shutdown(wait=False)

        return futures

    def __enter__(self):
        try:
            bg_cmd = next(self.gen)
        except StopIteration:
            raise RuntimeError('Generator did not yield')

        bg_cmd = bg_cmd.__enter__()
        self._bg_cmd = bg_cmd
        futures = self._read_streams(bg_cmd)
        self._futures = futures
        return self

    def __exit__(self, *exc_info):
        inner_exit = self._bg_cmd.__exit__
        wload = self.wload
        logger = self.logger

        try:
            suppress = inner_exit(*exc_info)
        except BaseException as e:
            exc_info = (type(e), e, e.__traceback__)
        else:
            if suppress:
                exc_info = (None, None, None)

        type_, value, traceback = exc_info

        returncode = self._bg_cmd.poll()

        if exc_info[0] is not None:
            try:
                self.gen.throw(*exc_info)
            except StopIteration as e:
                if e is value:
                    self._output = _NotSet(e)
                    return False
                else:
                    self._output = e.value
                    return True
            except BaseException as e:
                # Place a "bomb" value: if the user tries to access
                # "self.output", the exception will be raised again
                self._output = _NotSet(e)
                # __exit__ is not expected to re-raise the exception it was
                # given, instead it returns a falsy value to indicate it should
                # not be swallowed
                if e is value:
                    return False
                else:
                    raise
            # This cannot happen: throw() has to raise the exception or swallow
            # it and then later raise a StopIteration because it is finished
            else:
                assert False
        else:
            try:
                futures = self._futures
            except ValueError:
                results = dict(stdout=None, stderr=None)
            else:
                results = {
                    name: future.result()
                    for name, future in futures.items()
                }
                if wload._settings['log_std_streams']:
                    # Dump the stdout/stderr content to log files for easier
                    # debugging
                    for name, content in results.items():
                        path = ArtifactPath.join(wload.res_dir, f'{name}.log')
                        logger.debug(f'Saving {name} to {path}...')

                        with open(path, 'wb') as f:
                            f.write(content)

            # For convenience and to avoid depending too much on devlib's
            # BackgroundCommand in simple cases
            results['returncode'] = returncode

            if returncode:
                action = lambda: self.gen.throw(
                    CalledProcessError(
                        returncode=returncode,
                        cmd=f'<Workload {self.name}>',
                        output=results['stdout'],
                        stderr=results['stderr'],
                    )
                )
            else:
                action = lambda: self.gen.send(results)

            try:
                action()
            except StopIteration as e:
                output = e.value
                excep = None
            except Exception as e:
                output = _NotSet(e)
                excep = e
            else:
                excep = None

            self._output = output
            if excep is not None:
                raise excep


[docs] class _WorkloadBase: """ :meta public: Dummy base class so that :class:`Workload` is processed by ``__init_subclass__`` as well. """
[docs] def __init_subclass__(cls, *args, **kwargs): """ Automatically decorate ``_run()`` so that it returns a context manager. """ try: _run = cls.__dict__['_run'] except KeyError: pass else: cls._run = _WorkloadRunCMDecorator(_run) tools = { tool for cls in inspect.getmro(cls) for tool in getattr(cls, 'REQUIRED_TOOLS', []) } cls.REQUIRED_TOOLS = sorted(tools) super().__init_subclass__(*args, **kwargs)
[docs] class Workload(_WorkloadBase, PartialInit, Loggable): """ Handle the execution of a command on a target, with custom output processing. :param target: The Target on which to execute this workload :type target: Target :param name: Name of the workload. Useful for naming related artefacts. :type name: str or None :param res_dir: Host directory into which artifacts will be stored :type res_dir: str or None :param run_dir: Target directory into which artifacts will be created. :type run_dir: str or None :param cpus: CPUs on which to restrict the workload execution (taskset) :type cpus: list(int) or None :param cgroup: cgroup in which to run the workload :type cgroup: str or None :param as_root: Whether to run the workload as root or not :type as_root: bool :param timeout: Timeout in seconds for the workload execution. :type timeout: int .. note:: A :class:`Workload` instance can be used as a context manager, which ensures :meth:`cleanup` is eventually invoked. """ REQUIRED_TOOLS = [] """ The tools required to execute the workload. See :meth:`lisa.target.Target.install_tools`. """ def __init__(self, *, target, name=None, res_dir=None, run_dir=None, cpus=None, cgroup=None, as_root=False, timeout=None, command=None, log_std_streams=True, wipe_run_dir=True, wipe_res_dir=False, ): res_dir = res_dir if res_dir else target.get_res_dir( name='{}{}'.format( self.__class__.__qualname__, f'-{name}' if name else '') ) name = name or self.__class__.__qualname__ self.target = target self.name = name self.res_dir = res_dir self._wipe_run_dir = wipe_run_dir self._wipe_res_dir = wipe_res_dir self._setup_cm = None # Generic settings that will be used by _basic_run() self._settings = dict( cpus=cpus, cgroup=cgroup, as_root=as_root, timeout=timeout, command=command, log_std_streams=log_std_streams, ) if run_dir is None: now = datetime.now().strftime('%Y%m%d_%H%M%S') uuid_ = uuid.uuid4().hex run_dir = f"{now}_{uuid_}" wlgen_dir = self.target.path.join( target.working_directory, 'lisa', 'wlgen', ) self.run_dir = os.path.join(wlgen_dir, run_dir) # Deprecated self._output = '' @property def _deployed(self): return self._setup_cm is not None @destroyablecontextmanager def _setup(self): """ Context manager function called to setup the target before the execution, and cleanup anything that was setup. .. note:: Ensure you call ``super()._setup()`` as a context manager in custom implementation:: @contextlib.contextmanager def _setup(self): with super()._setup(): ... yield ... """ os.makedirs(self.res_dir, exist_ok=True) self.target.execute(f'mkdir -p {quote(self.run_dir)}') self.logger.info(f"Created workload's run target directory: {self.run_dir}") self.target.install_tools(self.REQUIRED_TOOLS) try: yield except ContextManagerExit: if self._wipe_run_dir: self.wipe_run_dir() if self._wipe_res_dir: shutil.rmtree(self.res_dir) @property @deprecate('Processed output is returned by run() or by the ".output" attribute of the value returned by the run_background() context manager', deprecated_in='2.0', removed_in='4.0') def output(self): return self._output
[docs] def deploy(self): """ Deploy the workload on the target. If not called manually, it will be called: * If the workload is used as a context manager, in ``__enter__``. * If not, in :meth:`run` or :meth:`run_background`. Calling it manually ahead of time makes can allow less garbage while tracing during the execution of the workload. .. note:: This method can be called any number of times, but will only have an effect the first time. .. note:: This method should not be overridden, see ``_setup()``. """ if self._deployed: return else: cm = self._setup() self._setup_cm = cm cm.__enter__()
[docs] def cleanup(self): """ Remove all the artifacts installed on the target with :meth:`deploy`. If not called manually, it will be called in :meth:`__exit__` or ``__del__`` if the workload is used as a context manager. .. note:: This method can be called any number of times, but will only have an effect the first time and if the target was deployed. .. note:: This method should not be overridden, see ``_setup()``. """ # Since this is called from __del__, we might have to deal with a # partially initialized object if getattr(self, '_deployed', False): cm = self._setup_cm self._setup_cm = None cm.__exit__(None, None, None)
[docs] def run_background(self): """ Run the command asynchronously and give control back to the caller. Returns a transparent wrapper for :class:`devlib.connection.BackgroundCommand`. **Example**:: wload = Workload(...) with wload.run_background() as bg: # do something. "bg" can be monitored, except for stdout/stderr # which are captured for later post-processing by the class. ... time.sleep(42) # Post-processed output, which would be equal to: # print(wload.run()) print(bg.output) .. note:: Subclasses should implement ``_run()``. """ self.deploy() return self._run()
[docs] def run(self, cpus=None, cgroup=None, as_root=False, timeout=None): """ Run the workload and returns the post-processed output. Calls :meth:`deploy` if it has not already been done. stdout and stderr will be saved into files in ``self.res_dir`` .. note:: Subclasses should implement ``_run()``. """ # For backward compatibility. # Note: if the values are the same as the current state, the same # instance will be returned so it's cheap compat = dict( cpus=cpus, cgroup=cgroup, timeout=timeout, ) compat = { key: val for key, val in compat.items() if val is not None } if compat: params = ', '.join( f'{param}={val}' for param, val in compat.items() if val ) warnings.warn(f'Workload.run({params}) parameters are deprecated, please pass them to the constructor instead', DeprecationWarning) self = self(**compat) self.deploy() with self._run() as x: # Wait on the command explicitly, as relying on x.__exit__() will # close its standard streams (stdin/stdout/stderr), leading to an # early termination. x.wait() # Only there to satisfy a deprecated API, do not rely on that in any # new code self._output = x.output return x.output
def _run(self): """ Run the workload. This method must be implemented by all subclasses and is used by :meth:`run` and :meth:`run_background`. The following constraints apply: * It must yield an instance of :class:`devlib.connection.BackgroundCommand`. This can be obtained using :meth:`_basic_run` or using :meth:`lisa.target.Target.background` directly. * It must only yield once. * It must return the post-processed output of the command. .. note:: This API mirrors what is expected from functions decorated with :func:`contextlib.contextmanager`, except that they have to return a post-processed value, rather than having their return value ignored. **Example**:: def _run(self): cmd = 'echo hello world' try: out = yield self._basic_run(cmd) except subprocess.CalledProcessError as e: # The command returned with non-zero exit code. ... # "out" is a dict with the following keys: # * "stdout": content of captured stdout of the command, as a bytestring # * "stderr": content of captured stderr of the command, as a bytestring # * "returncode": return code of the command # Arbitrary post-processing is allowed. If there is no # interesting output to return, the method should return None, # so that it can be repurposed in the future to something # useful. # DO NOT RETURN stdout unless it's something truly interesting # and of use to the caller. The content of stdout is already # dumped to a file for debugging purposes. return out['stdout'].split() """ yield self._basic_run() return None
[docs] def wipe_run_dir(self): """ Wipe all content from the ``run_dir`` target directory and all its empty parents. .. note :: This function should only be called directly in interactive sessions. For other purposes, use :class:`Workload` instances as a context manager. """ logger = self.logger logger.info(f"Wiping target run directory: {self.run_dir}") self.target.remove(self.run_dir) # Also get rid of empty parent folders parent = str(Path(self.run_dir).parent) self.target.execute(f'rmdir -p {quote(parent)}', check_exit_code=False)
def __enter__(self): self.deploy() return self def __del__(self): # This cannot be relied upon, but might improve things self.cleanup()
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """ Cleanup the artifacts of the workload on the target. """ self.cleanup()
def _basic_run(self, **kwargs): """ Basic run function to be used in subclasses' implementation of ``_run()``. By default, it will use the settings saved in ``_settings`` attribute by :class:`Workload`'s ``__init__``, but they can all be overridden manually with keyword arguments if that is really necessary. """ logger = self.logger target = self.target settings = { **self._settings, **kwargs } as_root = settings.get('as_root') timeout = settings.get('timeout') command = settings.get('command') cpus = settings.get('cpus') cgroup = settings.get('cgroup') if command is None: raise ValueError('command must not be None') # Log the "clean" unmodified command. If the user wants the details, # debug log from devlib will provide it logger.info(f"Execution start: {command}") if cpus: target.install_tools(['taskset']) cpumask = list_to_mask(cpus) taskset_cmd = f"taskset {quote(f'0x{cpumask:x}')}" command = f'{taskset_cmd} {command}' if cgroup: command = target.cgroups.run_into_cmd(cgroup, command) command = f'cd {quote(self.run_dir)} && {command}' bg = target.background(command, as_root=as_root, timeout=timeout) return bg
# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab