Source code for lisa.wa

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2020, Arm Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections.abc import Mapping
from collections import defaultdict
import inspect
import os
import abc
import contextlib
import sqlite3
import pathlib
import warnings
from functools import lru_cache

import pandas as pd

from wa import discover_wa_outputs, Status

from lisa.version import VERSION_TOKEN
from lisa.stats import Stats
from lisa.utils import Loggable, memoized, get_subclasses, LazyMapping
from lisa._git import find_shortest_symref, get_commit_message
from lisa.trace import Trace

def _df_concat(dfs):
    return pd.concat(dfs, ignore_index=True, copy=False, sort=False)


[docs] class WAOutputNotFoundError(Exception): def __init__(self, collectors): # pylint: disable=super-init-not-called self.collectors = collectors def __str__(self): sep = '\n ' return 'Could not find output for collectors{}{}'.format( ':' + sep if len(self.collectors) > 1 else ' ', sep.join( f'{collector.NAME} ({excep.__class__.__qualname__}): {excep}' for collector, excep in self.collectors.items() ) )
[docs] @classmethod def from_collector(cls, collector, excep): return cls({collector: excep})
[docs] @classmethod def from_excep_list(cls, exceps): return cls( collectors={ collector: excep for excep in exceps for collector, excep in excep.collectors.items() } )
[docs] class StatsProp: """ Provides a ``stats`` property. """ _STATS_GROUPS = ['board', 'kernel'] """ Tag columns commonly used to group plots of WA dataframes. """ _AGG_COLS = ['iteration', 'wa_path'] """ Columns that are guaranteed to be found in the dataframes and will always be used as aggregation columns, in addition to what the user selects. """
[docs] def get_stats(self, ensure_default_groups=True, ref_group=None, agg_cols=None, **kwargs): """ Returns a :class:`lisa.stats.Stats` loaded with the result :class:`pandas.DataFrame`. :param ensure_default_groups: If ``True``, ensure `ref_group` will contain appropriate keys for usual Workload Automation result display. :type ensure_default_groups: bool :param ref_group: Forwarded to :class:`lisa.stats.Stats` :Variable keyword arguments: Forwarded to :class:`lisa.stats.Stats` """ def filter_cols(cols): return [ col for col in cols if col in df.columns ] df = self.df if ensure_default_groups: # Make sure the tags to group on are present in the ref_group, even # if the user did not specify them ref_group = { **dict.fromkeys(filter_cols(self._STATS_GROUPS)), **(ref_group or {}), } agg_cols = (agg_cols or []) + self._AGG_COLS agg_cols = filter_cols(agg_cols) return Stats( df, ref_group=ref_group, agg_cols=agg_cols, **kwargs )
@property def stats(self): """ Short-hand property equivalent to ``self.get_stats()`` .. seealso:: :meth:`get_stats` """ return self.get_stats()
[docs] class WAOutput(StatsProp, Mapping, Loggable): """ Recursively parse a ``Workload Automation`` output, using registered collectors (leaf subclasses of :class:`WACollectorBase`). The data collected are accessible through a :class:`pandas.DataFrame` in "database" format: * meaningless index * all values are tagged using tag columns :param path: Path containing a Workload Automation output. :type path: str :param kernel_path: Kernel source path. Used to resolve the name of the kernel which ran the workload. :param kernel_path: str **Example**:: wa_output = WAOutput('wa/output/path') # Pick a specific collector. See also WAOutput.get_collector() stats = wa_output['results'].stats stats.plot_stats(filename='stats.html') """ def __init__(self, path, kernel_path=None): self.path = path self.kernel_path = kernel_path collector_classes = { cls.NAME: cls for cls in get_subclasses(WACollectorBase, only_leaves=True) } auto_collectors = { name: cls for name, cls in collector_classes.items() if not self._needs_params(cls) } self._auto_collectors = auto_collectors self._available_collectors = collector_classes
[docs] def __hash__(self): """ Each instance is different, like regular objects, and unlike dictionaries. """ return id(self)
def __eq__(self, other): return self is other @memoized def __getitem__(self, key): cls = self._available_collectors[key] if key not in self._auto_collectors: raise KeyError(f"Collector {key} needs mandatory parameter, use get_collector('{key}', ...) instead") else: return cls(self) def __iter__(self): return iter(self._auto_collectors) def __contains__(self, key): return key in self._auto_collectors def __len__(self): return len(self._auto_collectors) @property @memoized def df(self): """ DataFrame containing the data collected by all the registered :class:`WAOutput` collectors. """ dfs = [] exceps = {} for name, collector in self.items(): try: df = collector.df except Exception as e: # pylint: disable=broad-except exceps[collector] = e self.logger.debug(f'Could not get dataframe of collector {name}: {e}') else: dfs.append(df) if not dfs: raise WAOutputNotFoundError.from_excep_list([ e if isinstance(e, WAOutputNotFoundError) # Wrap other exceptions in a WAOutputNotFoundError else WAOutputNotFoundError.from_collector(collector, e) for collector, e in exceps.items() ]) return _df_concat(dfs)
[docs] def get_collector(self, name, **kwargs): """ Returns a new collector with custom parameters passed to it. :param name: Name of the collector. :type name: str :Variable keyword arguments: Forwarded to the collector's constructor. **Example**:: WAOutput('wa/output/path').get_collector('energy', postprocess=func) """ return self._available_collectors[name](self, **kwargs)
@staticmethod def _needs_params(col): """ Whether a collector has mandatory parameters. """ parameters = list(inspect.signature(col).parameters.items()) return any( param.default is param.empty and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) # The first parameter is provided so we can skip it for name, param in parameters[1:] ) @property @memoized def jobs(self): """ List containing all the jobs present in the output of 'wa run'. """ if len(self._jobs) > 1: raise ValueError("jobs is only meant to be used with one output of 'wa run'. Please, use _jobs instead.") return list(self._jobs.values())[0][1] @property @memoized def _jobs(self): return { name: (output, [ job for job in output.jobs if job.status == Status.OK ]) for name, output in self.outputs.items() } @property @memoized def outputs(self): """ Dict containing a mapping of 'wa run' names to :class:`RunOutput` objects. """ wa_outputs = list(discover_wa_outputs(self.path)) if len(wa_outputs) > 1: warnings.warn('Passing a directory containing multiple outputs of "wa run" to WAOutput is deprecated. Please, create one object per run.', DeprecationWarning) wa_outputs = { pathlib.Path( wa_output.basepath ).resolve(): wa_output for wa_output in wa_outputs } if len(wa_outputs) > 1: common_prefix = pathlib.Path( os.path.commonpath(wa_outputs.keys()) ) elif wa_outputs: path, = wa_outputs.keys() common_prefix = pathlib.Path(path).parent else: common_prefix = None wa_outputs = { str( name.relative_to(common_prefix) if common_prefix else name ): wa_output for name, wa_output in wa_outputs.items() } return wa_outputs
[docs] class WACollectorBase(StatsProp, Loggable, abc.ABC): """ Base class for all ``Workload Automation`` dataframe collectors. :param wa_output: :class:`WAOutput` parent object. :type wa_output: WAOutput :param df_postprocess: Function called to postprocess the collected :class:`pandas.DataFrame`. :type df_postprocess: collections.abc.Callable .. seealso:: Instances of this classes are typically built using :meth:`WAOutput.get_collector` rather than directly. """ _EXPECTED_WORKLOAD_NAME = None _PURE_GET_JOB_DF = True """ If ``True``, :meth:`WACollectorBase._get_job_df` will be expected to be pure and its result will be cached. If the result depends in any way from user-provided parameters, this should be ``False``. """ def __init__(self, wa_output, df_postprocess=None): self.wa_output = wa_output self._df_postprocess = df_postprocess or (lambda x: x) @abc.abstractclassmethod def _get_job_df(cls, job): """ Process a :class:`wa.framework.JobOutput` and return a :class:`pandas.DataFrame` with the results. :param job: WA job run output to process :type job: wa.framework.JobOutput It is a good idea to then feed the dataframe to :meth:`_add_job_info` to get all the tags from WA before returning it. .. note:: If one of these column provides the same information as some column from artifact dataframe, consider the following: * Which column have values looking better ? Values will be used in legends, titles etc * If the :meth:`_add_job_info` column looks worst, can there still be value in keeping it ? Maybe its values can be fed back to other WA APIs ? * Are you sure the column is *always* provided by :meth:`_add_job_info` ? User can inject arbitrary classifier from WA config, so you might have some columns that cannot be relied upon. """ @property @memoized def df(self): """ :class:`pandas.DataFrame` containing the data collected. """ return self._get_df() def _get_df(self): self.logger.debug(f"Collecting dataframe for {self.NAME}") def load_df(job): def loader(job): cache_path = os.path.join( job.basepath, f'.{self.NAME}-cache.{VERSION_TOKEN}.parquet' ) # _get_job_df usually returns fairly large dataframes, so cache # the result for faster reloading get_df = lambda: self._get_job_df(job) if self._PURE_GET_JOB_DF: try: df = pd.read_parquet(cache_path) except OSError: df = get_df() df.to_parquet(cache_path) return df else: return get_df() try: df = loader(job) except Exception as e: # pylint: disable=broad-except # Swallow the error if that job was not from the expected # workload expected_name = self._EXPECTED_WORKLOAD_NAME if expected_name is None or job.spec.workload_name == expected_name: self.logger.error(f'Could not load {self.NAME} dataframe for job {job}: {e}') else: return None else: return self._df_postprocess(df) dfs = [ self._add_output_info(wa_output, name, df) for name, (wa_output, jobs) in self.wa_output._jobs.items() for df in [ load_df(job) for job in jobs ] if df is not None ] if not dfs: raise WAOutputNotFoundError.from_collector(self, 'Could not find any valid job output') # It is unfortunately not safe to cache the output of load_df, as the # user postprocessing could change at any time df = _df_concat(dfs) return self._add_kernel_id(df) @staticmethod def _add_job_info(job, df): df['iteration'] = job.iteration df['workload'] = job.label df['id'] = job.id df = df.assign(**job.classifiers) return df @staticmethod def _add_output_info(wa_output, name, df): # Kernel version kver = wa_output.target_info.kernel_version df['kernel_name'] = kver.release df['kernel_sha1'] = kver.sha1 # Folder of origin df['wa_path'] = name return df def _add_kernel_id(self, df): kernel_path = self.wa_output.kernel_path resolvers = [ find_shortest_symref, get_commit_message, ] def resolve_readable(sha1): if kernel_path: for resolver in resolvers: with contextlib.suppress(ValueError): return resolver(kernel_path, sha1) return sha1 kernel_ids = { sha1: resolve_readable(sha1) for sha1 in df['kernel_sha1'].unique() if sha1 is not None } # Reduce the human readable id if possible common_prefix = os.path.commonprefix(list(kernel_ids.values())) kernel_ids = { sha1: ref[len(common_prefix):] or ref for sha1, ref in kernel_ids.items() } df['kernel'] = df['kernel_sha1'].map(kernel_ids).fillna( df['kernel_name'] ) df.drop(columns=['kernel_sha1', 'kernel_name'], inplace=True) return df
[docs] class WAResultsCollector(WACollectorBase): """ Collector for the ``Workload Automation`` test results. """ NAME = 'results' @classmethod def _get_job_df(cls, job): df = pd.DataFrame.from_records( { **metric.classifiers, 'metric': metric.name, 'value': metric.value, 'unit': metric.units or '', } for metric in job.metrics ) return cls._add_job_info(job, df)
[docs] class WAArtifactCollectorBase(WACollectorBase): """ ``Workload Automation`` artifact collector base class. """ _ARTIFACT_NAME = None @abc.abstractmethod def _get_artifact_df(self, path): """ Process the artifact at the given path and return a :class:`pandas.DataFrame`. """ def _get_job_df(self, job): path = job.get_artifact_path(self._ARTIFACT_NAME) df = self._get_artifact_df(path) return self._add_job_info(job, df)
[docs] class WAEnergyCollector(WAArtifactCollectorBase): """ WA collector for the energy_measurement augmentation. **Example**:: def postprocess(df): df = df.pivot_table(values='value', columns='metric', index=['sample', 'iteration', 'workload']) df = pd.DataFrame({ 'CPU_power': ( df['A55_power'] + df['A76_1_power'] + df['A76_2_power'] ), }) df['unit'] = 'Watt' df = df.reset_index() df = df.melt(id_vars=['sample', 'iteration', 'workload', 'unit'], var_name='metric') return df WAOutput('wa/output/path').get_collector( 'energy', df_postprocess=postprocess, ).df """ NAME = 'energy' _ARTIFACT_NAME = "energy_instrument_output" _ARTIFACT_METRIC_SUFFIX_UNIT = defaultdict( # Default is an empty string str, { 'power': 'Watt', 'voltage': 'V', }, ) def _get_artifact_df(self, path): df = pd.read_csv(path) # Record the CSV line as sample nr so each measurement is uniquely # identified by a metric and a sample number df.index.name = 'sample' df.reset_index(inplace=True) df = df.melt(id_vars=['sample'], var_name='metric') suffix_unit = self._ARTIFACT_METRIC_SUFFIX_UNIT df['unit'] = df['metric'].apply( lambda x: suffix_unit[x.rsplit('_', 1)[-1]] ) return df
[docs] def get_stats(self, **kwargs): return super().get_stats( # Aggregate over pairs (sample, iteration) agg_cols=['sample', 'iteration'], **kwargs, )
def _stub_trace_to_df(df): raise ValueError('trace_to_df was not specified for the TraceCollector')
[docs] class WATraceCollector(WAArtifactCollectorBase): """ WA collector for the trace augmentation. :param trace_to_df: Function used by the collector to convert the :class:`lisa.trace.Trace` to a :class:`pandas.DataFrame`. :type trace_to_df: collections.abc.Callable :Variable keyword arguments: Forwarded to :class:`lisa.trace.Trace`. **Example**:: def trace_idle_analysis(trace): cpu = 0 df = trace.ana.idle.df_cluster_idle_state_residency([cpu]) df = df.reset_index() df['cpu'] = cpu # Melt the column 'time' into lines, so that the dataframe is in # "database" format: each value is uniquely identified by "tag" # columns return df.melt( var_name='metric', value_vars=['time'], id_vars=['idle_state'], ) WAOutput('wa/output/path').get_collector( 'trace', trace_to_df=trace_idle_analysis, ).df """ NAME = 'trace' _ARTIFACT_NAME = 'trace-cmd-bin' _PURE_GET_JOB_DF = False def __init__(self, wa_output, trace_to_df=_stub_trace_to_df, **kwargs): self._trace_to_df = trace_to_df self._trace_kwargs = kwargs super().__init__(wa_output, df_postprocess=None) def _get_artifact_df(self, path): trace = Trace(path, **self._trace_kwargs) return self._trace_to_df(trace) @property def traces(self): """ :class:`lisa.utils.LazyMapping` that maps job names & iteration numbers to their corresponding :class:`lisa.trace.Trace`. """ return LazyMapping({ f"{job.label}-{job.iteration}": lru_cache()(lambda k, job=job: Trace(job.get_artifact_path('trace-cmd-bin'), **self._trace_kwargs)) for job in self.wa_output.jobs })
[docs] class WAJankbenchCollector(WAArtifactCollectorBase): """ WA collector for the jankbench frame timings. The collector framework will return a single :class:`pandas.DataFrame` with the results from every jankbench job in :class:`lisa.stats.Stats` format (i.e. the returned dataframe is arranged such that each reported metric is separated as a separate row). The metrics reported are: . ``total_duration``: Time in milliseconds to complete the frame . ``jank_frame``: Boolean indicator of missed frame deadline. ``1`` is a Jank frame, ``0`` is not. . ``name``: Subtest name, provided by the Jankbench app . ``frame_id``: monotonically increasing frame number, starts from ``1`` for each subtest iteration. An example plotter matching the old-style output can be found in the jupyter notebook working directory at :file:`ipynb/wltests/WAOutput-JankbenchDemo.ipynb` If you have existing code expecting a more direct translation of the original sqlite database format, you can massage the collected dataframe back into a closer resemblance to the original source database with this sequence of pandas operations:: wa_output = WAOutput('wa/output/path') df = wa_output['jankbench'].df db_df = df.pivot(index=['iteration', 'id', 'kernel', 'frame_id'], columns=['variable']) db_df = db_df['value'].reset_index() db_df.columns.name = None # db_df now looks more like the original format """ NAME = 'jankbench' _EXPECTED_WORKLOAD_NAME = 'jankbench' _ARTIFACT_NAME = "jankbench-results" def _get_artifact_df(self, path): with contextlib.closing(sqlite3.connect(path)) as con: raw_df = pd.read_sql_query("SELECT total_duration, jank_frame, name, _id as frame_id from ui_results", con) df = raw_df.melt(id_vars=['frame_id'], value_vars=['total_duration', 'jank_frame']) # supply units - everything is ms time except jank frames df['unit'] = 'ms' df.loc[df['variable'] == 'jank_frame', 'unit'] = '' return df
[docs] def get_stats(self, **kwargs): return super().get_stats( # Aggregate over pairs (iteration, frame_id) agg_cols=['iteration', 'frame_id'], **kwargs, )
[docs] class WASysfsExtractorCollector(WAArtifactCollectorBase): """ WA collector for the syfs-extractor augmentation. **Example**:: def pixel6_energy_meter(df): # Keep only CPU's meters df = df[df.value.str.contains('S4M_VDD_CPUCL0|S3M_VDD_CPUCL1|S2M_VDD_CPUCL2')] df[['variable', 'value']] = df.value.str.split(', ', expand=True) def _clean_variable(variable): if 'S4M_VDD_CPUCL0' in variable: return 'little-energy' if 'S3M_VDD_CPUCL1' in variable: return 'mid-energy' if 'S2M_VDD_CPUCL2' in variable: return 'big-energy' return '' df['variable'] = df['variable'].apply(_clean_variable) df['value'] = df['value'].astype(int) df['unit'] = "bogo-ujoules" # Add a total energy variable df = pd.concat([ df, pd.DataFrame(data={ 'variable': 'total-energy', 'value': [df['value'].sum()] }) ]) df.ffill(inplace=True) return df df = WAOutput('.').get_collector( 'sysfs-extractor', path='/sys/bus/iio/devices/iio:device0/energy_value', df_postprocess=pixel6_energy_meter ).df """ NAME = 'sysfs-extractor' def __init__(self, wa_output, path, type='diff', **kwargs): allowed_types = ['diff', 'before', 'after'] if type not in allowed_types: ValueError(f'{self.__class__.__qualname__} type must be one of {allowed_types}') self._ARTIFACT_NAME = f'{path} [{type}]' # TODO: WA's sysfs-extractor augmentation can actually use a path # to collect several files not only one... self._filename = os.path.basename(path) super().__init__(wa_output, **kwargs) def _get_artifact_df(self, path): # WA given path is actually dirname path = os.path.join(path, self._filename) # Expects a sysfs/procfs file e.g # $ cat /sys/bus/iio/devices/iio:device0/energy_value # t=199784 # CH0(T=199784)[S10M_VDD_TPU], 965182 # CH1(T=199784)[VSYS_PWR_MODEM], 23570587 # CH2(T=199784)[VSYS_PWR_RFFE], 2850053 # CH3(T=199784)[S2M_VDD_CPUCL2], 88055221 # CH4(T=199784)[S3M_VDD_CPUCL1], 38098419 # CH5(T=199784)[S4M_VDD_CPUCL0], 98955128 # CH6(T=199784)[S5M_VDD_INT], 6657870 # CH7(T=199784)[S1M_VDD_MIF], 29268952 with open(path, 'r') as f: raw_file = f.readlines() raw_file = [line.strip() for line in raw_file] df = pd.DataFrame(data={ 'variable': self._filename, 'value': raw_file, 'unit': '' }) return df