Source code for lisa.analysis.tasks

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from enum import Enum
import itertools
import warnings
import typing
from numbers import Number
from operator import itemgetter
from collections import namedtuple
import re
import functools

import numpy as np
import pandas as pd
import holoviews as hv
import bokeh.models
import polars as pl

from lisa.analysis.base import TraceAnalysisBase
from lisa.utils import memoized, kwargs_forwarded_to, deprecate, order_as
from lisa.datautils import df_filter_task_ids, series_rolling_apply, series_refit_index, df_refit_index, df_deduplicate, df_split_signals, df_add_delta, df_window, df_update_duplicates, df_combine_duplicates, SignalDesc
from lisa.trace import requires_events, will_use_events_from, may_use_events, CPU, MissingTraceEventError
from lisa.notebook import _hv_neutral, plot_signal, _hv_twinx
from lisa._typeclass import FromString


[docs] class TaskID(namedtuple('TaskID', ('pid', 'comm'))): """ Unique identifier of a logical task in a :class:`lisa.trace.Trace`. :param pid: PID of the task. ``None`` indicates the PID is not important. :type pid: int :param comm: Name of the task. ``None`` indicates the name is not important. This is useful to describe tasks like PID0, which can have multiple names associated. :type comm: str """ # Prevent creation of a __dict__. This allows a more compact representation __slots__ = [] def __init__(self, *args, **kwargs): # pylint: disable=unused-argument super().__init__() # This happens when the number of saved PID/comms entries in the trace # is too low if self.comm == '<...>': raise ValueError('Invalid comm name "<...>", please increase saved_cmdlines_nr value on FtraceCollector') def __str__(self): if self.pid is not None and self.comm is not None: out = f'{self.pid}:{self.comm}' else: out = str(self.comm if self.comm is not None else self.pid) return f'[{out}]' _STR_PARSE_REGEX = re.compile(r'\[?([0-9]+):([a-zA-Z0-9_-]+)\]?')
class _TaskIDFromStringInstance(FromString, types=TaskID): """ Instance of :class:`lisa._typeclass.FromString` for :class:`TaskID` type. """ @classmethod def from_str(cls, string): # pylint: disable=unexpected-keyword-arg, no-value-for-parameter try: pid = int(string) comm = None except ValueError: match = cls._STR_PARSE_REGEX.match(string) if match: pid = int(match.group(1)) comm = match.group(2) else: pid = None comm = string return cls(pid=pid, comm=comm) @classmethod def get_format_description(cls, short): if short: return 'task ID' else: return textwrap.dedent(""" Can be any of: * a PID * a task name * a PID (first) and a name (second): pid:name """).strip() class _TaskIDSeqFromStringInstance(FromString, types=(typing.List[TaskID], typing.Sequence[TaskID])): """ Instance of :class:`lisa._typeclass.FromString` for lists :class:`TaskID` type. """ @classmethod def from_str(cls, string): """ The format is a comma-separated list of :class:`TaskID`. """ from_str = FromString(TaskID).from_str return [ from_str(string.strip()) for string in string.split(',') ] @classmethod def get_format_description(cls, short): return 'comma-separated TaskIDs'
[docs] class StateInt(int): """ An tweaked int for :class:`lisa.analysis.tasks.TaskState` """
[docs] def __new__(cls, value, char="", doc=""): new = super().__new__(cls, value) new.char = char new.__doc__ = doc return new
[docs] def __or__(self, other): char = self.char if other.char: char = "|".join(char + other.char) return type(self)( int(self) | int(other), char=(self.char + other.char))
# This is needed for some obscure reason (maybe a bug in std library ?) # In any case, if we don't provide that, Enum's metaclass will "sabbotage" # pickling, and by doing so, will set cls.__module__ = '<unknown>' __reduce__ = int.__reduce__
[docs] class TaskState(StateInt, Enum): """ Represents the task state as visible in sched_switch * Values are extracted from include/linux/sched.h * Chars are extracted from fs/proc/array.c:get_task_state() """ # pylint-suppress: bad-whitespace TASK_RUNNING = 0x0000, "R", "Running" TASK_INTERRUPTIBLE = 0x0001, "S", "Sleeping" TASK_UNINTERRUPTIBLE = 0x0002, "D", "Disk sleep" # __ has a special meaning in Python so let's not do that TASK_STOPPED = 0x0004, "T", "Stopped" TASK_TRACED = 0x0008, "t", "Tracing stop" EXIT_DEAD = 0x0010, "X", "Dead" EXIT_ZOMBIE = 0x0020, "Z", "Zombie" # Apparently not visible in traces # EXIT_TRACE = (EXIT_ZOMBIE[0] | EXIT_DEAD[0]) TASK_PARKED = 0x0040, "P", "Parked" TASK_DEAD = 0x0080, "I", "Idle" TASK_WAKEKILL = 0x0100 TASK_WAKING = 0x0200, "W", "Waking" # LISA-only char definition TASK_NOLOAD = 0x0400 TASK_NEW = 0x0800 TASK_STATE_MAX = 0x1000 # LISA synthetic states # Used to differenciate runnable (R) vs running (A) TASK_ACTIVE = 0x2000, "A", "Active" TASK_RENAMED = 0x2001, "N", "Renamed" # Used when the task state is unknown TASK_UNKNOWN = -1, "U", "Unknown"
[docs] @classmethod def list_reported_states(cls): """ List the states that can be reported in a ``sched_switch`` trace See include/linux/sched.h:TASK_REPORT """ return [state for state in cls if 0 <= state <= cls.TASK_DEAD]
# Could use IntFlag instead once we move to Python 3.6
[docs] @classmethod @memoized def sched_switch_str(cls, value): """ Get the task state string that would be used in a ``sched_switch`` event :param value: The task state value :type value: int Tries to emulate what is done in include/trace/events:TRACE_EVENT(sched_switch) """ def find_states(value, states): return [ state.char for state in states if value & state.value ] reported_states = cls.list_reported_states() res = '|'.join(find_states(value, reported_states)) res = res if res else cls.TASK_RUNNING.char # Flag the presence of unreportable states with a "+" unreportable_states = [ state for state in cls if state.value >= 0 and state not in reported_states ] if find_states(value, unreportable_states): res += '+' return res
[docs] @classmethod def from_sched_switch_str(cls, string): """ Build a :class:`StateInt` from a string as it would be used in ``sched_switch`` event's ``prev_state`` field. :param string: String to parse. :type string: str """ state = 0 for _state in cls: if _state.char in string: state |= _state return state
[docs] class TasksAnalysis(TraceAnalysisBase): """ Support for Tasks signals analysis. :param trace: input Trace object :type trace: lisa.trace.Trace """ name = 'tasks' @memoized def _get_task_maps(self): """ Give the mapping from PID to task names, and the opposite. The names or PIDs are listed in appearance order. """ trace = self.trace.get_view(df_fmt='polars-lazyframe') mapping_df_list = [] def _load(event, name_col, pid_col): df = trace.df_event(event) grouped = df.group_by(name_col, pid_col) # Get timestamp of first occurrences of each key/value combinations mapping_df = grouped.first().select( 'Time', pid=pl.col(pid_col), # Ensure we have a Categorical dtype, otherwise we might not be # able to successfully concatenate a String and Categorical # column name=pl.col(name_col).cast(pl.Categorical), ) mapping_df_list.append(mapping_df) missing = [] def load(event, *args, **kwargs): try: _load(event, *args, **kwargs) except MissingTraceEventError as e: missing.append(e.missing_events) load('task_rename', 'oldcomm', 'pid') load('task_rename', 'newcomm', 'pid') load('sched_switch', 'prev_comm', 'prev_pid') load('sched_switch', 'next_comm', 'next_pid') if not mapping_df_list: missing = OrTraceEventChecker.from_events(events=missing) raise MissingTraceEventError(missing, available_events=trace.available_events) df = pl.concat(mapping_df_list).sort('Time') df = df.unique( subset=['name', 'pid'], keep='first', maintain_order=True, ) df = df.select('name', 'pid') with pl.StringCache(): df = df.collect() def finalize(df, key_col): # Aggregate the values for each key and convert to python types return dict(df.rows_by_key(key_col)) name_to_pid = finalize(df, 'name') pid_to_name = finalize(df, 'pid') return (name_to_pid, pid_to_name) @property def _task_name_map(self): return self._get_task_maps()[0] @property def _task_pid_map(self): return self._get_task_maps()[1]
[docs] def get_task_name_pids(self, name, ignore_fork=True): """ Get the PIDs of all tasks with the specified name. The same PID can have different task names, mainly because once a task is generated it inherits the parent name and then its name is updated to represent what the task really is. :param name: task name :type name: str :param ignore_fork: Hide the PIDs of tasks that initially had ``name`` but were later renamed. This is common for shell processes for example, which fork a new task, inheriting the shell name, and then being renamed with the final "real" task name :type ignore_fork: bool :return: a list of PID for tasks which name matches the required one. """ pids = self._task_name_map[name] if ignore_fork: pids = [ pid for pid in pids # Only keep the PID if its last name was the name we are # looking for. if self._task_pid_map[pid][-1] == name ] return pids
[docs] def get_task_pid_names(self, pid): """ Get the all the names of the task(s) with the specified PID, in appearance order. The same PID can have different task names, mainly because once a task is generated it inherits the parent name and then its name is updated to represent what the task really is. :param name: task PID :type name: int :return: the name of the task which PID matches the required one, the last time they ran in the current trace """ return self._task_pid_map[pid]
[docs] @deprecate('This function raises exceptions when faced with ambiguity instead of giving the choice to the user', deprecated_in='2.0', removed_in='4.0', replaced_by=get_task_pid_names, ) def get_task_by_pid(self, pid): """ Get the name of the task with the specified PID. The same PID can have different task names, mainly because once a task is generated it inherits the parent name and then its name is updated to represent what the task really is. This API works under the assumption that a task name is updated at most one time and it always report the name the task had the last time it has been scheduled for execution in the current trace. :param name: task PID :type name: int :return: the name of the task which PID matches the required one, the last time they ran in the current trace """ name_list = self.get_task_pid_names(pid) if len(name_list) > 2: raise RuntimeError(f'The PID {pid} had more than two names in its life: {name_list}') return name_list[-1]
[docs] def get_task_ids(self, task, update=True): """ Similar to :meth:`get_task_id` but returns a list with all the combinations, instead of raising an exception. :param task: Either the task name, the task PID, or a tuple ``(pid, comm)`` :type task: int or str or tuple(int, str) :param update: If a partially-filled :class:`TaskID` is passed (one of the fields set to ``None``), returns a complete :class:`TaskID` instead of leaving the ``None`` fields. :type update: bool """ def comm_to_pid(comm): try: pid_list = self._task_name_map[comm] except IndexError: # pylint: disable=raise-missing-from raise ValueError(f'trace does not have any task named "{comm}"') return pid_list def pid_to_comm(pid): try: comm_list = self._task_pid_map[pid] except IndexError: # pylint: disable=raise-missing-from raise ValueError(f'trace does not have any task PID {pid}') return comm_list if isinstance(task, str): task_ids = [ TaskID(pid=pid, comm=task) for pid in comm_to_pid(task) ] elif isinstance(task, Number): task_ids = [ TaskID(pid=task, comm=comm) for comm in pid_to_comm(task) ] else: pid, comm = task if pid is None and comm is None: raise ValueError('TaskID needs to have at least one of PID or comm specified') if update and (pid is None or comm is None): non_none = pid if comm is None else comm task_ids = self.get_task_ids(non_none) else: task_ids = [TaskID(pid=pid, comm=comm)] return task_ids
[docs] def get_task_id(self, task, update=True): """ Helper that resolves a task PID or name to a :class:`TaskID`. :param task: Either the task name, the task PID, or a tuple ``(pid, comm)`` :type task: int or str or tuple(int, str) :param update: If a partially-filled :class:`TaskID` is passed (one of the fields set to ``None``), returns a complete :class:`TaskID` instead of leaving the ``None`` fields. :type update: bool :raises ValueError: If there the input matches multiple tasks in the trace. See :meth:`get_task_ids` to get all the ambiguous alternatives instead of an exception. """ task_ids = self.get_task_ids(task, update=update) if len(task_ids) > 1: raise ValueError(f'More than one TaskID matching: {task_ids}') return task_ids[0]
[docs] @deprecate(deprecated_in='2.0', removed_in='4.0', replaced_by=get_task_id) def get_task_pid(self, task): """ Helper that takes either a name or a PID and always returns a PID :param task: Either the task name or the task PID :type task: int or str or tuple(int, str) """ return self.get_task_id(task).pid
[docs] def get_tasks(self): """ Get a dictionary of all the tasks in the Trace. :return: a dictionary which maps each PID to the corresponding list of task name """ return self._task_pid_map
@property @memoized def task_ids(self): """ List of all the :class:`TaskID` in the trace, sorted by PID. """ return [ TaskID(pid=pid, comm=comm) for pid, comms in sorted(self._task_pid_map.items(), key=itemgetter(0)) for comm in comms ]
[docs] @requires_events('sched_switch') def cpus_of_tasks(self, tasks): """ Return the list of CPUs where the ``tasks`` executed. :param tasks: Task names or PIDs or ``(pid, comm)`` to look for. :type tasks: list(int or str or tuple(int, str)) """ trace = self.trace df = trace.df_event('sched_switch')[['next_pid', 'next_comm', '__cpu']] task_ids = [self.get_task_id(task, update=False) for task in tasks] df = df_filter_task_ids(df, task_ids, pid_col='next_pid', comm_col='next_comm') cpus = df['__cpu'].unique() return sorted(cpus)
def _get_task_pid_name(self, pid): """ Get the last name the given PID had. """ return self.get_task_pid_names(pid)[-1] ############################################################################### # DataFrame Getter Methods ###############################################################################
[docs] @TraceAnalysisBase.df_method @requires_events('sched_wakeup') def df_tasks_wakeups(self): """ The number of wakeups per task :returns: a :class:`pandas.DataFrame` with: * Task PIDs as index * A ``wakeups`` column (The number of wakeups) """ df = self.trace.df_event('sched_wakeup') wakeups = df.groupby('pid', observed=True, sort=False, group_keys=False).count()["comm"] df = pd.DataFrame(wakeups).rename(columns={"comm": "wakeups"}) df["comm"] = df.index.map(self._get_task_pid_name) return df
[docs] @TraceAnalysisBase.df_method @df_tasks_wakeups.used_events def df_top_wakeup(self, min_wakeups=100): """ Tasks which wakeup more frequently than a specified threshold. :param min_wakeups: minimum number of wakeups :type min_wakeups: int """ df = self.df_tasks_wakeups() # Compute number of samples above threshold df = df[df.wakeups > min_wakeups] df = df.sort_values(by="wakeups", ascending=False) return df
[docs] @TraceAnalysisBase.df_method @requires_events('sched_switch') def df_rt_tasks(self, min_prio=100): """ Tasks with RT priority .. note:: priorities uses scheduler values, thus: the lower the value the higher is the task priority. RT Priorities: [ 0..100] FAIR Priorities: [101..120] :param min_prio: minimum priority :type min_prio: int :returns: a :class:`pandas.DataFrame` with: * Task PIDs as index * A ``prio`` column (The priority of the task) * A ``comm`` column (The name of the task) """ df = self.trace.df_event('sched_switch') # Filters tasks which have a priority bigger than threshold df = df[df.next_prio <= min_prio] # Filter columns of interest rt_tasks = df[['next_pid', 'next_prio']] rt_tasks = rt_tasks.drop_duplicates() # Order by priority rt_tasks.sort_values( by=['next_prio', 'next_pid'], ascending=True, inplace=True) rt_tasks.rename( columns={'next_pid': 'pid', 'next_prio': 'prio'}, inplace=True) rt_tasks.set_index('pid', inplace=True) rt_tasks['comm'] = rt_tasks.index.map(self._get_task_pid_name) return rt_tasks
@requires_events('sched_switch', 'sched_wakeup') @will_use_events_from('task_rename') @may_use_events('sched_wakeup_new') @TraceAnalysisBase.df_method def _df_tasks_states(self, tasks=None): """ Compute tasks states for all tasks. :param tasks: If specified, states of these tasks only will be yielded. The :class:`lisa.analysis.tasks.TaskID` must have a ``pid`` field specified, since the task state is per-PID. :type tasks: list(lisa.analysis.tasks.TaskID) or list(int) """ ###################################################### # A) Assemble the sched_switch and sched_wakeup events ###################################################### dtypes = dict( state=pl.Int64, comm=pl.Categorical, ) def filters_comm(task): try: return task.comm is not None except AttributeError: return isinstance(task, str) def state(value): return pl.lit(value, dtypes['state']) # Add the rename events if we are interested in the comm of tasks add_rename = any(map(filters_comm, tasks or [])) trace = self.trace.get_view( df_fmt='polars-lazyframe', signals=[ SignalDesc('sched_switch', ['prev_pid', 'prev_comm']), SignalDesc('sched_switch', ['next_pid', 'next_comm']), SignalDesc('sched_wakeup', ['pid', 'comm']), SignalDesc('sched_wakeup_new', ['pid', 'comm']), SignalDesc('task_rename', ['pid']), ], compress_signals_init=True, events=[ 'sched_switch', 'sched_wakeup', 'sched_wakeup_new', *(['task_rename'] if add_rename else []) ] ) def get_df(event): df = trace.df_event(event) if event == 'sched_switch': df = df.with_columns( pl.col('prev_state').cast(dtypes['state']), pl.col('prev_comm').cast(dtypes['comm']), pl.col('next_comm').cast(dtypes['comm']), ) elif event in ('sched_wakeup', 'sched_wakeup_new'): df = df.with_columns( pl.col('comm').cast(dtypes['comm']), ) return df wk_df = get_df('sched_wakeup') sw_df = get_df('sched_switch') try: wkn_df = get_df('sched_wakeup_new') except MissingTraceEventError: pass else: wk_df = pl.concat([wk_df, wkn_df], how='diagonal_relaxed') wk_df = wk_df.select(["Time", "pid", "comm", "target_cpu", "__cpu"]) wk_df = wk_df.with_columns( curr_state=state(TaskState.TASK_WAKING) ) prev_sw_df = sw_df.select(["Time", "__cpu", "prev_pid", "prev_state", "prev_comm"]) next_sw_df = sw_df.select(["Time", "__cpu", "next_pid", "next_comm"]) prev_sw_df = prev_sw_df.rename({ "prev_pid": "pid", "prev_state": "curr_state", "prev_comm": "comm", }) next_sw_df = next_sw_df.with_columns( curr_state=state(TaskState.TASK_ACTIVE) ) next_sw_df = next_sw_df.rename({ 'next_pid': 'pid', 'next_comm': 'comm' }) all_sw_df = pl.concat([prev_sw_df, next_sw_df], how='diagonal_relaxed') if add_rename: rename_df = get_df('task_rename').rename({ 'oldcomm': 'comm', }) rename_df = rename_df.select(['Time', 'pid', 'comm']) rename_df = rename_df.with_columns( curr_state=state(TaskState.TASK_RENAMED), ) all_sw_df = pl.concat([all_sw_df, rename_df], how='diagonal_relaxed') # Integer values are prefered here, otherwise the whole column # is converted to float64 # FIXME: should we just use null here ? all_sw_df = all_sw_df.with_columns(target_cpu=pl.lit(-1, pl.Int32)) df = pl.concat([all_sw_df, wk_df], how='diagonal_relaxed') df = df.sort('Time') df = df.rename({'__cpu': 'cpu'}) # Restrict the set of data we will process to a given set of tasks if tasks is not None: def resolve_task(task): """ Get a TaskID for each task, and only update existing TaskID if they lack a PID field, since that's what we care about in that function. """ try: do_update = task.pid is None except AttributeError: do_update = False return self.get_task_id(task, update=do_update) tasks = list(map(resolve_task, tasks)) df = df_filter_task_ids(df, tasks) df = df.with_columns( next_state=pl.col('curr_state').shift( -1, fill_value=state(TaskState.TASK_UNKNOWN) ).over(pl.col('pid')), duration_delta=pl.col('Time').diff().shift(-1).over(pl.col('pid')), ) df = df.with_columns( delta=pl.col('duration_delta').dt.total_nanoseconds() / 1e9, ) return df @staticmethod def _reorder_tasks_states_columns(df): order = ['Time', 'pid', 'comm', 'target_cpu', 'cpu', 'curr_state', 'next_state', 'delta'] return df.select(order_as(list(df.columns), order))
[docs] @_df_tasks_states.used_events @TraceAnalysisBase.df_method def df_tasks_states(self): """ DataFrame of all tasks state updates events :returns: a :class:`pandas.DataFrame` with: * A ``cpu`` column (the CPU where the event took place) * A ``pid`` column (the PID of the task) * A ``comm`` column (the name of the task) * A ``target_cpu`` column (the CPU where the task has been scheduled). Will be ``NaN`` for non-wakeup events * A ``curr_state`` column (the current task state, see :class:`~TaskState`) * A ``delta`` column (the duration for which the task will remain in this state) * A ``next_state`` column (the next task state) .. warning:: Since ``sched_switch`` event multiplexes the update to two PIDs at the same time, the resulting dataframe would contain duplicated indices, breaking some Pandas functions. In order to avoid that, the duplicated timestamps are updated with the minimum increment possible to remove duplication. """ df = self._df_tasks_states(df_fmt='polars-lazyframe') return self._reorder_tasks_states_columns(df)
[docs] @TraceAnalysisBase.df_method @_df_tasks_states.used_events def df_task_states(self, task, stringify=False): """ DataFrame of task's state updates events :param task: The task's name or PID or tuple ``(pid, comm)`` :type task: int or str or tuple(int, str) :param stringify: Include stringifed :class:`TaskState` columns :type stringify: bool :returns: a :class:`pandas.DataFrame` with: * A ``cpu`` column (the CPU where the event took place) * A ``target_cpu`` column (the CPU where the task has been scheduled). Will be ``-1`` for non-wakeup events * A ``curr_state`` column (the current task state, see :class:`~TaskState`) * A ``next_state`` column (the next task state) * A ``delta`` column (the duration for which the task will remain in this state) """ df = self._df_tasks_states(tasks=[task], df_fmt='polars-lazyframe') df = df.drop(["pid", "comm"]) if stringify: df = self.stringify_df_task_states( df, ["curr_state", "next_state"], inplace=True ) return self._reorder_tasks_states_columns(df)
[docs] @classmethod def stringify_task_state_series(cls, series): """ Stringify a series containing :class:`TaskState` values :param series: The series :type series: pandas.Series The common use case for this will be to pass a dataframe column:: df["state_str"] = stringify_task_state_series(df["state"]) """ def stringify_state(state): # Same logic as in sched_switch format string if state & 0xff: try: return TaskState(state).char except ValueError: return TaskState.sched_switch_str(state) else: return TaskState.sched_switch_str(state) return series.apply(stringify_state)
[docs] @classmethod def stringify_df_task_states(cls, df, columns, inplace=False): """ Adds stringified :class:`TaskState` columns to a Dataframe :param df: The DataFrame to operate on :type df: pandas.DataFrame :param columns: The columns to stringify :type columns: list :param inplace: Do the modification on the original DataFrame :type inplace: bool """ if isinstance(df, pd.DataFrame): df = df if inplace else df.copy() for col in columns: df[f"{col}_str"] = cls.stringify_task_state_series(df[col]) return df elif isinstance(df, pl.LazyFrame): mapping = { int(state): state.char for state in TaskState.list_reported_states() } def fixup(df, col): str_col = (pl.col(col) & 0xff).replace(mapping, default=None) str_col = ( pl.when(str_col.is_null() & (pl.col(col) > 0)) .then(pl.col(col).map_elements(TaskState.sched_switch_str)) .otherwise(str_col) ) return df.with_columns( str_col.alias(f'{col}_str') ) return functools.reduce(fixup, columns, df) else: raise TypeError(f'Cannot handle type dataframe of type {df.__class__}')
[docs] @TraceAnalysisBase.df_method @_df_tasks_states.used_events def df_tasks_runtime(self): """ DataFrame of the time each task spent in TASK_ACTIVE (:class:`TaskState`) :returns: a :class:`pandas.DataFrame` with: * PIDs as index * A ``comm`` column (the name of the task) * A ``runtime`` column (the time that task spent running) .. note:: This function only tracks time spent by each PID. The reported name is the last name associated with the PID in chronological order. """ df = self._df_tasks_states(df_fmt='polars-lazyframe') df = df.group_by('pid').agg( comm=pl.col('comm').last(), runtime=pl.col('delta').filter(pl.col('curr_state') == TaskState.TASK_ACTIVE).sum() ) return df
[docs] @TraceAnalysisBase.df_method @df_task_states.used_events def df_task_total_residency(self, task): """ DataFrame of a task's execution time on each CPU :param task: the task to report runtimes for :type task: int or str or tuple(int, str) :returns: a :class:`pandas.DataFrame` with: * CPU IDs as index * A ``runtime`` column (the time the task spent being active) """ df = self.df_task_states(task) # Get the correct delta for the window we want. df = df_add_delta(df, window=self.trace.window, col='runtime') df = df[df['curr_state'] == TaskState.TASK_ACTIVE] # For each CPU, sum the time spent on each by each task by_cpu = df.groupby('cpu', observed=True, sort=False, group_keys=False) residency_df = by_cpu['runtime'].sum().to_frame() # Add runtime for CPUs that did not appear in the window residency_df = residency_df.reindex( residency_df.index.union(range(self.trace.cpus_count)) ) return residency_df.fillna(0).sort_index()
[docs] @df_task_total_residency.used_events def df_tasks_total_residency(self, tasks=None, ascending=False, count=None): """ DataFrame of tasks execution time on each CPU :param tasks: List of tasks to report, all trace tasks by default :type tasks: list(int or str or tuple(int, str)) :param ascending: Set True to order plot by ascending task runtime False by default :type ascending: bool :param count: Maximum number of tasks to report :type count: int """ if tasks is None: task_ids = self.task_ids else: task_ids = itertools.chain.from_iterable( self.get_task_ids(task) for task in tasks ) def get_task_df(task): try: df = self.ana.tasks.df_task_total_residency(task) except MissingTraceEventError: raise # Not all tasks may be available, e.g. tasks outside the _TraceView # window except Exception: return None else: return df.T.rename(index={'runtime': str(task)}) res_df = pd.concat( df for df in map(get_task_df, task_ids) if df is not None ) res_df['Total'] = res_df.sum(axis=1) res_df.sort_values(by='Total', ascending=ascending, inplace=True) if count is not None: res_df = res_df.head(count) return res_df
[docs] @TraceAnalysisBase.df_method @df_task_states.used_events def df_task_activation(self, task, cpu=None, active_value=1, sleep_value=0, preempted_value=np.NaN): """ DataFrame of a task's active time on a given CPU :param task: the task to report activations of :type task: int or str or tuple(int, str) :param cpu: the CPUs to look at. If ``None``, all CPUs will be used. :type cpu: int or None :param active_value: the value to use in the series when task is active. :type active_value: float :param sleep_value: the value to use in the series when task is sleeping. :type sleep_value: float :param preempted_value: the value to use in the series when task is preempted (runnable but not actually executing). :type sleep_value: float :returns: a :class:`pandas.DataFrame` with: * A timestamp as index * A ``active`` column, containing ``active_value`` when the task is running, ``sleep_value`` when sleeping, and ``preempted_value`` otherwise. * A ``cpu`` column with the CPU the task was running on. * A ``duration`` column containing the duration of the current sleep or activation. * A ``duty_cycle`` column containing the duty cycle in ``[0...1]`` of the task, updated at each pair of activation and sleep. """ df = self.df_task_states(task) def f(state): if state == TaskState.TASK_ACTIVE: return active_value # TASK_RUNNING happens when a task is preempted (so it's not # TASK_ACTIVE anymore but still runnable) elif state == TaskState.TASK_RUNNING: # Return NaN regardless of preempted_value, since some below # code relies on that return np.NaN else: return sleep_value if cpu is not None: df = df[df['cpu'] == cpu] df = df.copy() # TASK_WAKING can just be removed. The delta will then be computed # without it, which means the time spent in WAKING state will be # accounted into the previous state. df = df[df['curr_state'] != TaskState.TASK_WAKING] df['active'] = df['curr_state'].map(f) df = df[['active', 'cpu']] # Only keep first occurence of each adjacent duplicates, since we get # events when the signal changes df = df_deduplicate(df, consecutives=True, keep='first') # Once we removed the duplicates, we can compute the time spent while sleeping or activating df_add_delta(df, col='duration', inplace=True) if not np.isnan(preempted_value): df['active'] = df['active'].fillna(preempted_value) # Merge consecutive activations' duration. They could have been # split in two by a bit of preemption, and we don't want that to # affect the duty cycle. df_combine_duplicates(df, cols=['active'], func=lambda df: df['duration'].sum(), output_col='duration', inplace=True) # Make a dataframe where the rows corresponding to preempted time are # removed, unless preempted_value is set to non-NA preempt_free_df = df.dropna().copy() sleep = preempt_free_df[preempt_free_df['active'] == sleep_value]['duration'] active = preempt_free_df[preempt_free_df['active'] == active_value]['duration'] # Pair an activation time with it's following sleep time sleep = sleep.reindex(active.index, method='bfill') duty_cycle = active / (active + sleep) df['duty_cycle'] = duty_cycle.ffill() return df
############################################################################### # Plotting Methods ############################################################################### def _plot_markers(self, df, label): return hv.Scatter(df, label=label).options(marker='+').options( backend='bokeh', size=5, ) def _plot_overutilized(self): try: return self.ana.status.plot_overutilized() except MissingTraceEventError: return _hv_neutral()
[docs] @TraceAnalysisBase.plot_method @requires_events('sched_switch') def plot_task_residency(self, task: TaskID): """ Plot on which CPUs the task ran on over time :param task: Task to track :type task: int or str or tuple(int, str) """ task_id = self.get_task_id(task, update=False) sw_df = self.trace.df_event("sched_switch") sw_df = df_filter_task_ids(sw_df, [task_id], pid_col='next_pid', comm_col='next_comm') def plot_residency(): if "freq-domains" in self.trace.plat_info: # If we are aware of frequency domains, use one color per domain for domain in self.trace.plat_info["freq-domains"]: series = sw_df[sw_df["__cpu"].isin(domain)]["__cpu"] series = series_refit_index(series, window=self.trace.window) if series.empty: return _hv_neutral() else: return self._plot_markers( series, label=f"Task running in domain {domain}" ) else: return self._plot_markers( series_refit_index(sw_df['__cpu'], window=self.trace.window), label=str(task), ) return ( plot_residency().options(ylabel='cpu') * self._plot_overutilized() ).options( title=f'CPU residency of task {task}' )
[docs] @TraceAnalysisBase.plot_method @df_task_total_residency.used_events def plot_task_total_residency(self, task: TaskID): """ Plot a task's total time spent on each CPU :param task: The task's name or PID or tuple ``(pid, comm)`` :type task: str or int or tuple(int, str) """ df = self.df_task_total_residency(task) return hv.Bars(df['runtime']).options( title=f"CPU residency of task {task}", xlabel='CPU', ylabel='Runtime (s)', invert_axes=True, )
[docs] @TraceAnalysisBase.plot_method @df_tasks_total_residency.used_events def plot_tasks_total_residency(self, tasks: typing.Sequence[TaskID]=None, ascending: bool=False, count: bool=None): """ Plot the stacked total time spent by each task on each CPU :param tasks: List of tasks to plot, all trace tasks by default :type tasks: list(int or str or tuple(int, str)) :param ascending: Set True to order plot by ascending task runtime, False by default :type ascending: bool :param count: Maximum number of tasks to report :type count: int """ df = self.df_tasks_total_residency(tasks, ascending, count) df = df.copy(deep=False) df['task'] = df.index df.columns = list(map(str, df.columns)) df = df.melt(id_vars=['task'], var_name='cpu', value_name='Runtime (s)') return hv.Bars( df, kdims=['cpu', 'task'] ).options( stacked=True, invert_axes=True, title=f"Stacked CPU residency of [{len(df.index)}] selected tasks", ).sort('cpu')
def _plot_cpu_heatmap(self, event, bins, xbins, cmap): """ Plot some data in a heatmap-style 2d histogram """ df = self.trace.df_event(event) df = df_window(df, window=self.trace.window, method='exclusive') x = df.index y = df['target_cpu'] if xbins: warnings.warn('"xbins" parameter is deprecated and will be removed, use "bins" instead', DeprecationWarning) bins = xbins nr_cpus = self.trace.cpus_count hist = np.histogram2d(y, x, bins=[nr_cpus, bins]) z, _, x = hist y = list(range(nr_cpus)) return hv.HeatMap( (x, y, z), kdims=[ # Manually set dimension name/label so that shared_axes works # properly. # Also makes hover tooltip better. hv.Dimension('Time'), hv.Dimension('CPU'), ], vdims=[ hv.Dimension(event), ] ).options( colorbar=True, xlabel='Time (s)', ylabel='CPU', # Viridis works both on bokeh and matplotlib cmap=cmap or 'Viridis', yticks=[ (cpu, f'CPU{cpu}') for cpu in y ] ) @TraceAnalysisBase.plot_method @requires_events("sched_wakeup") def _plot_tasks_X(self, event, name, target_cpus, window, per_sec): df = self.trace.df_event(event) if target_cpus: df = df[df['target_cpu'].isin(target_cpus)] series = series_rolling_apply( df["target_cpu"], lambda x: x.count() / (window if per_sec else 1), window, window_float_index=False, center=True ) if per_sec: label = f"Number of task {name} per second ({window}s windows)" else: label = f"Number of task {name} within {window}s windows" series = series_refit_index(series, window=self.trace.window) series.name = name return plot_signal(series, name=label)
[docs] @TraceAnalysisBase.plot_method def plot_tasks_wakeups(self, target_cpus: typing.Sequence[CPU]=None, window: float=1e-2, per_sec: bool=False): """ Plot task wakeups over time :param target_cpus: :type target_cpus: :param window: The rolling window size for wakeup counts. :type window: float :param per_sec: Display wakeups per second if True, else wakeup counts within the window :type per_sec: bool """ return self._plot_tasks_X( event='sched_wakeup', name='wakeups', target_cpus=target_cpus, window=window, per_sec=per_sec )
[docs] @TraceAnalysisBase.plot_method @requires_events("sched_wakeup") def plot_tasks_wakeups_heatmap(self, bins: int=100, xbins=None, colormap=None): """ Plot tasks wakeups heatmap :param bins: Number of x-axis bins, i.e. in how many slices should time be arranged :type bins: int :param colormap: The name of a colormap: * matplotlib backend: https://matplotlib.org/stable/tutorials/colors/colormaps.html * bokeh backend: https://docs.bokeh.org/en/latest/docs/reference/palettes.html :type colormap: str """ return self._plot_cpu_heatmap( event='sched_wakeup', bins=bins, xbins=xbins, cmap=colormap, ).options( title="Tasks wakeups over time", )
[docs] @TraceAnalysisBase.plot_method @requires_events("sched_wakeup_new") def plot_tasks_forks(self, target_cpus: typing.Sequence[CPU]=None, window: float=1e-2, per_sec: bool=False): """ Plot task forks over time :param target_cpus: :type target_cpus: :param window: The rolling window size for fork counts. :type window: float :param per_sec: Display wakeups per second if True, else wakeup counts within the window :type per_sec: bool """ return self._plot_tasks_X( event='sched_wakeup_new', name='forks', target_cpus=target_cpus, window=window, per_sec=per_sec )
[docs] @TraceAnalysisBase.plot_method @requires_events("sched_wakeup_new") def plot_tasks_forks_heatmap(self, bins: int=100, xbins=None, colormap=None): """ Plot number of task forks over time as a heatmap. :param bins: Number of x-axis bins, i.e. in how many slices should time be arranged :type bins: int :param colormap: The name of a colormap: * matplotlib backend: https://matplotlib.org/stable/tutorials/colors/colormaps.html * bokeh backend: https://docs.bokeh.org/en/latest/docs/reference/palettes.html :type colormap: str """ return self._plot_cpu_heatmap( event='sched_wakeup_new', bins=bins, xbins=xbins, cmap=colormap, ).options( title="Tasks forks over time", )
# Use a class attribute so that there will be only one extra hover tool in # the toolbar rather than one per task when stacking them _BOKEH_TASK_HOVERTOOL = bokeh.models.HoverTool( description='Task activations tooltip', tooltips=[ ('Task', '[@pid:@comm]'), ('CPU', '@cpu'), ('#', '$index'), ('Start', '@start'), ('Duration', '@duration'), ('Duty cycle', '@duty_cycle'), ] ) @df_task_activation.used_events def _plot_tasks_activation(self, tasks, show_legend=None, cpu: CPU=None, alpha: float=None, overlay: bool=False, duration: bool=False, duty_cycle: bool=False, which_cpu: bool=False, height_duty_cycle: bool=False, best_effort=False): logger = self.logger def ensure_last_rectangle(df): # Make sure we will draw the last rectangle, which could be # critical for tasks that are never sleeping if df.empty: return df else: window = self.trace.window # Regenerate the duration so they match the boundaries of the # window df = df_add_delta(df, window=window, col='duration') return df def make_twinx(fig, **kwargs): return _hv_twinx(fig, **kwargs) if which_cpu: def make_rect_df(df): half_height = df['active'] / 2 return pd.DataFrame( dict( Time=df.index, CPU=df['cpu'] - half_height, x1=df.index + df['duration'], y1=df['cpu'] + half_height, ), index=df.index ) else: def make_rect_df(df): if duty_cycle or duration: max_val = max( df[col].max() for select, col in ( (duty_cycle, 'duty_cycle'), (duration, 'duration') ) if select ) height_factor = max_val else: height_factor = 1 return pd.DataFrame( dict( Time=df.index, CPU=0, x1=df.index + df['duration'], y1=df['active'] * height_factor, ), index=df.index, ) def plot_extra(task, df): figs = [] if duty_cycle: figs.append( plot_signal(df['duty_cycle'], name=f'Duty cycle of {task}') ) if duration: def plot_duration(active, label): duration_series = df[df['active'] == active]['duration'] # Add blanks in the plot when the state is not the one we care about duration_series = duration_series.reindex_like(df) return plot_signal(duration_series, name=f'{label} duration of {task}') figs.extend( plot_duration(active, label) for active, label in ( (True, 'Activations'), (False, 'Sleep') ) ) return figs def check_df(task, df, empty_is_none): if df.empty: msg = f'Could not find events associated to task {task}' if empty_is_none: logger.debug(msg) return None else: raise ValueError(msg) else: return ensure_last_rectangle(df) def get_task_data(task, df): df = df.copy() # Preempted == sleep for plots df['active'] = df['active'].fillna(0) if height_duty_cycle: df['active'] *= df['duty_cycle'] data = make_rect_df(df[df['active'] != 0]) if data.empty: return data else: name_df = self.trace.df_event('sched_switch') name_df = name_df[name_df['next_pid'] == task.pid] names = name_df['next_comm'].reindex(data.index, method='ffill') # If there was no sched_switch with next_pid matching task.pid, we # simply take the last known name of the task, which could # originate from another field or another event. # # Note: This prevent an <NA> value, which makes bokeh choke. last_comm = self.get_task_pid_names(task.pid)[-1] if last_comm not in names.cat.categories: names = names.cat.add_categories([last_comm]) names = names.fillna(last_comm) # Use a string for PID so that holoviews interprets it as # categorical variable, rather than continuous. This is important # for correct color mapping data['pid'] = str(task.pid) data['comm'] = names data['start'] = data.index data['cpu'] = df['cpu'] data['duration'] = df['duration'] data['duty_cycle'] = df['duty_cycle'] return data def plot_rect(data): if show_legend: opts = {} else: # If there is no legend, we are gonna plot all the rectangles at once so we use colormapping to distinguish the tasks opts = dict( color='pid', # Colormap from colorcet with a large number of color, so it is # suitable for plotting many tasks cmap='glasbey_hv', ) return hv.Rectangles( data, kdims=[ hv.Dimension('Time'), hv.Dimension('CPU'), hv.Dimension('x1'), hv.Dimension('y1'), ] ).options( show_legend=show_legend, alpha=alpha, **opts, ).options( backend='bokeh', line_width=0, tools=[self._BOKEH_TASK_HOVERTOOL], ) if alpha is None: if overlay or duty_cycle or duration: alpha = 0.2 else: alpha = 1 # For performance reasons, plot all the tasks as one hv.Rectangles # invocation when we get too many tasks if show_legend is None: if overlay: # TODO: twinx() breaks on hv.Overlay, so we are forced to use a # single hv.Rectangles in that case, meaning no useful legend show_legend = False else: show_legend = len(tasks) < 5 cpus_count = self.trace.cpus_count task_dfs = { task: check_df( task, self.df_task_activation(task, cpu=cpu), empty_is_none=best_effort, ) for task in tasks } if best_effort: task_dfs = { task: df for task, df in task_dfs.items() if df is not None } tasks = sorted(task_dfs.keys()) if show_legend: fig = hv.Overlay( [ plot_rect(get_task_data(task, df)).relabel( f'Activations of {task.pid} (' + ', '.join( task_id.comm for task_id in self.get_task_ids(task) ) + ')', ) for task, df in task_dfs.items() ] ).options( legend_limit=len(tasks) * 100, ) else: data = pd.concat( get_task_data(task, df) for task, df in task_dfs.items() ) fig = plot_rect(data) if overlay: fig = make_twinx( fig, y_range=(-1, cpus_count), display=False ) else: if which_cpu: fig = fig.options( 'Rectangles', ylabel='CPU', yticks=[ (cpu, f'CPU{cpu}') for cpu in range(cpus_count) ], ).redim( y=hv.Dimension('y', range=(-0.5, cpus_count - 0.5)) ) elif height_duty_cycle: fig = fig.options( 'Rectangles', ylabel='Duty cycle', ) if duty_cycle or duration: if duty_cycle: ylabel = 'Duty cycle' elif duration: ylabel = 'Duration (s)' # TODO: twinx() on hv.Overlay does not work, so we unfortunately have a # scaling issue here fig = hv.Overlay( [fig] + [ fig for task, df in task_dfs.items() for fig in plot_extra(task, df) ] ).options( ylabel=ylabel, ) return fig.options( title='Activations of {}'.format( ', '.join(map(str, tasks)) ), )
[docs] @TraceAnalysisBase.plot_method @_plot_tasks_activation.used_events @kwargs_forwarded_to(_plot_tasks_activation, ignore=['tasks', 'best_effort']) def plot_tasks_activation(self, tasks: typing.Sequence[TaskID]=None, hide_tasks: typing.Sequence[TaskID]=None, which_cpu: bool=True, overlay: bool=False, **kwargs): """ Plot all tasks activations, in a style similar to kernelshark. :param tasks: Tasks to plot. If ``None``, all tasks in the trace will be used. :type tasks: list(TaskID) or None :param hide_tasks: Tasks to hide. Note that PID 0 (idle task) will always be hidden. :type hide_tasks: list(TaskID) or None :param alpha: transparency level of the plot. :type task: float :param overlay: If ``True``, adjust the transparency and plot activations on a separate hidden scale so existing scales are not modified. :type task: bool :param duration: Plot the duration of each sleep/activation. :type duration: bool :param duty_cycle: Plot the duty cycle of each pair of sleep/activation. :type duty_cycle: bool :param which_cpu: If ``True``, plot the activations on each CPU in a separate row like kernelshark does. :type which_cpu: bool :param height_duty_cycle: Height of each activation's rectangle is proportional to the duty cycle during that activation. :type height_duty_cycle: bool .. seealso:: :meth:`df_task_activation` """ trace = self.trace hidden = set(itertools.chain.from_iterable( self.get_task_ids(task) for task in (hide_tasks or []) )) if tasks: best_effort = False task_ids = list(itertools.chain.from_iterable( map(self.get_task_ids, tasks) )) else: best_effort = True task_ids = self.task_ids full_task_ids = sorted( task for task in task_ids if ( task not in hidden and task.pid != 0 ) ) # Only consider the PIDs in order to: # * get the same color for the same PID during its whole life # * avoid potential issues around task renaming # Note: The task comm will still be displayed in the hover tool task_ids = [ TaskID(pid=pid, comm=None) for pid in sorted(set(x.pid for x in full_task_ids)) ] #TODO: Re-enable the CPU "lanes" once this bug is solved: # https://github.com/holoviz/holoviews/issues/4979 if False and which_cpu and not overlay: # Add horizontal lines to delimitate each CPU "lane" in the plot cpu_lanes = [ hv.HLine(y - offset).options( color='grey', alpha=0.2, ).options( backend='bokeh', line_width=0.5, ) for y in range(trace.cpus_count + 1) for offset in ((0.5, -0.5) if y == 0 else (0.5,)) ] else: cpu_lanes = [] title = 'Activations of ' + ', '.join( map(str, full_task_ids) ) if len(title) > 50: title = 'Task activations' return self._plot_tasks_activation( tasks=task_ids, which_cpu=which_cpu, overlay=overlay, best_effort=best_effort, **kwargs ).options( title=title )
[docs] @TraceAnalysisBase.plot_method @plot_tasks_activation.used_events @kwargs_forwarded_to(plot_tasks_activation, ignore=['tasks']) @deprecate('Deprecated since it does not provide anything more than plot_tasks_activation', deprecated_in='2.0', removed_in='4.0', replaced_by=plot_tasks_activation) def plot_task_activation(self, task: TaskID, **kwargs): """ Plot task activations, in a style similar to kernelshark. :param task: the task to report activations of :type task: int or str or tuple(int, str) .. seealso:: :meth:`plot_tasks_activation` """ return self.plot_tasks_activation(tasks=[task], **kwargs)
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80