Source code for lisa.analysis.rta

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2019, Arm Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import namedtuple

import pandas as pd
import holoviews as hv
import numpy as np

from lisa.analysis.base import AnalysisHelpers, TraceAnalysisBase
from lisa.datautils import df_filter_task_ids, df_window, df_split_signals
from lisa.trace import requires_events, requires_one_event_of, may_use_events, MissingTraceEventError
from lisa.utils import memoized, order_as
from lisa.analysis.tasks import TasksAnalysis, TaskID
from lisa.wlgen.rta import RTA, RTAConf
from lisa.notebook import plot_signal


RefTime = namedtuple("RefTime", ['kernel', 'user'])
"""
Named tuple to synchronize kernel and userspace (``rt-app``) timestamps.
"""


PhaseWindow = namedtuple("PhaseWindow", ['id', 'start', 'end', 'properties'])
"""
Named tuple with fields:

    * ``id``: integer ID of the phase or its name.
    * ``start``: timestamp of the start of the phase
    * ``end``: timestamp of the end of the phase
    * ``properties``: properties of the phase, extracted from a
      :mod:`lisa.wlgen.rta` profile.
"""


[docs] class RTAEventsAnalysis(TraceAnalysisBase): """ Support for RTA events analysis. :param trace: input Trace object :type trace: lisa.trace.Trace """ name = 'rta' RTAPP_USERSPACE_EVENTS = [ 'userspace@rtapp_main', 'userspace@rtapp_task', 'userspace@rtapp_loop', 'userspace@rtapp_event', 'userspace@rtapp_stats', ] """ List of ftrace events rtapp is able to emit. """ def _task_filtered(self, df, task=None): if not task: return df task = self.trace.ana.tasks.get_task_id(task) if task not in self.rtapp_tasks: raise ValueError(f"Task [{task}] is not an rt-app task: {self.rtapp_tasks}") return df_filter_task_ids(df, [task], pid_col='__pid', comm_col='__comm') @property @memoized @requires_one_event_of(*RTAPP_USERSPACE_EVENTS) def rtapp_tasks(self): """ List of :class:`lisa.analysis.tasks.TaskID` of the ``rt-app`` tasks present in the trace. """ task_ids = set() for event in self.RTAPP_USERSPACE_EVENTS: try: df = self.trace.df_event(event) except MissingTraceEventError: continue else: task_ids.update( df[['__pid', '__comm']].drop_duplicates().apply( lambda row: TaskID(pid=row['__pid'], comm=row['__comm']), axis=1, ) ) return sorted(task_ids) ############################################################################### # DataFrame Getter Methods ############################################################################### ########################################################################### # rtapp_main events related methods ###########################################################################
[docs] @requires_events('userspace@rtapp_main') def df_rtapp_main(self): """ Dataframe of events generated by the rt-app main task. :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__cpu`` column: the CPU on which the task was running at event generation time * A ``__pid`` column: the PID of the task * A ``data`` column: the data corresponding to the reported event * An ``event`` column: the event generated The ``event`` column can report these events: * ``start``: the start of the rt-app main thread execution * ``end``: the end of the rt-app main thread execution * ``clock_ref``: the time rt-app gets the clock to be used for logfile entries The ``data`` column reports: * the base timestamp used for logfile generated event for the ``clock_ref`` event * ``NaN`` for all the other events """ return self.trace.df_event('userspace@rtapp_main')
@property @memoized @df_rtapp_main.used_events def rtapp_window(self): """ Return the time range the rt-app main thread executed. :returns: a tuple(start_time, end_time) """ df = self.df_rtapp_main() return ( df[df.event == 'start'].index[0], df[df.event == 'end'].index[0]) @property @memoized @df_rtapp_main.used_events def rtapp_reftime(self): """ Return the tuple representing the ``kernel`` and ``user`` timestamp. RTApp log events timestamps are generated by the kernel ftrace infrastructure. This method allows to know which trace timestamp corresponds to the rt-app generated timestamps stored in log files. :returns: a :class:`RefTime` reporting ``kernel`` and ``user`` timestamps. """ df = self.df_rtapp_main() df = df[df['event'] == 'clock_ref'] return RefTime(df.index[0], df.data.iloc[0]) ########################################################################### # rtapp_task events related methods ###########################################################################
[docs] @TraceAnalysisBase.df_method @requires_events('userspace@rtapp_task') def df_rtapp_task(self, task=None): """ Dataframe of events generated by each rt-app generated task. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__cpu`` column: the CPU on which the task was running at event generation time * A ``__line`` column: the ftrace line numer * A ``__pid`` column: the PID of the task * An ``event`` column: the event generated The ``event`` column can report these events: * ``start``: the start of the ``__pid``:``__comm`` task execution * ``end``: the end of the ``__pid``:``__comm`` task execution """ df = self.trace.df_event('userspace@rtapp_task') return self._task_filtered(df, task)
########################################################################### # rtapp_loop events related methods ########################################################################### # @TraceAnalysisBase.df_method
[docs] @requires_events('userspace@rtapp_loop') def df_rtapp_loop(self, task=None, wlgen_profile=None): """ Dataframe of events generated by each rt-app generated task. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: Dictionary of rt-app task names to :class:`~lisa.wlgen.rta.RTAPhase` used to resolve phase name from the trace and will add a ``properties`` column with the :class:`~lisa.wlgen.rta.RTAPhase` properties. :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhase) or None :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__cpu`` column: the CPU on which the task was running at event generation time * A ``__line`` column: the ftrace line numer * A ``__pid`` column: the PID of the task * An ``event`` column: the generated event * A ``phase`` column: the phases counter for each ``__pid``:``__comm`` task * A ``phase_loop`` colum: the phase_loops's counter * A ``thread_loop`` column: the thread_loop's counter The ``event`` column can report these events: * ``start``: the start of the ``__pid``:``__comm`` related event * ``end``: the end of the ``__pid``:``__comm`` related event """ df = self.trace.df_event('userspace@rtapp_loop') df = self._task_filtered(df, task) if wlgen_profile: def get_task_entry(task_name): task = wlgen_profile[task_name] # Build a JSON configuration so that we get the actual number # of phases, after loopification at the task level conf = RTAConf.from_profile( {task_name: task}, plat_info=self.trace.plat_info ) nr_json_phases = len(conf['tasks'][task_name]['phases']) return (task, nr_json_phases) # Map TaskID to wlgen phases task_map = { task_id: get_task_entry(task_name) for task_name, task_ids in RTA.resolve_trace_task_names( self.trace, wlgen_profile.keys() ).items() for task_id in task_ids } class _dict(dict): def __missing__(self, key): raise ValueError(f'Unexpected phase number "{key}"') def f(df): pid, comm = df.name task_id = TaskID(pid=pid, comm=comm) task, nr_json_phases = task_map[task_id] df = df.copy(deep=False) phase_nr = df['thread_loop'] * nr_json_phases + df['phase'] def add_info(get): return phase_nr.map(_dict({ i: get(phase) for i, phase in enumerate(task.phases) })) df['phase'] = add_info(lambda phase: phase.get('name')) df['properties'] = add_info(lambda phase: dict(phase.properties)) return df df = df.groupby(['__pid', '__comm'], observed=True, group_keys=False).apply(f) return df
# @TraceAnalysisBase.df_method @df_rtapp_loop.used_events def _get_rtapp_phases(self, event, task, wlgen_profile=None): df = self.df_rtapp_loop(task, wlgen_profile=wlgen_profile) df = df[df.event == event] # Sort START/END phase loop event from newers/older and... if event == 'start': df = df[df.phase_loop == 0] elif event == 'end': df = df.sort_values(by=['phase_loop', 'thread_loop'], ascending=False) # ... keep only the newest/oldest event grouped = df.groupby( ['__comm', '__pid', 'phase', 'event'], observed=True, sort=False, group_keys=False, ) df = grouped.head(1) # Reorder the index and keep only required cols kept_cols = ['__comm', '__pid', 'phase', 'properties'] kept_cols = order_as( set(kept_cols) & set(df.columns), order_as=kept_cols ) df = ( df.sort_index()[kept_cols] .reset_index() .set_index([col for col in kept_cols if col != 'properties']) ) return df # @TraceAnalysisBase.df_method
[docs] @df_rtapp_loop.used_events def df_phases(self, task, wlgen_profile=None): """ Get phases actual start times and durations :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhase) or None :returns: A :class:`pandas.DataFrame` with index representing the start time of a phase and these column: * ``phase``: the phase number or its name extracted from ``wlgen_profile``. * ``duration``: the measured phase duration. * ``properties``: the properties mapping of the phase extracted from ``wlgen_profile``. """ # Trace windowing can cut the trace anywhere, so we need to remove the # partial loops records to avoid confusion def filter_partial_loop(df): # Get rid of the end of a previous loop if not df.empty and df['event'].iloc[0] == 'end': df = df.iloc[1:] # Get rid of the beginning of a new loop at the end if not df.empty and df['event'].iloc[-1] == 'start': df = df.iloc[:-1] return df def get_duration(phase, df): start = df.index[0] end = df.index[-1] duration = end - start info = { 'phase': phase, 'duration': duration, } # The properties are the same all along for a given phase, so we # can just pick the first one try: properties = df.iloc[0]['properties'] except KeyError: properties = {} info['properties'] = properties return (start, info) loops_df = self.df_rtapp_loop(task, wlgen_profile=wlgen_profile) phases_df_list = [ (cols['phase'], filter_partial_loop(df)) for cols, df in df_split_signals(loops_df, ['phase']) ] durations = sorted( get_duration(phase, df) for phase, df in phases_df_list if not df.empty ) if durations: index, columns = zip(*durations) return pd.DataFrame(columns, index=index) else: return pd.DataFrame()
[docs] @df_phases.used_events def task_phase_windows(self, task, wlgen_profile=None): """ Yield the phases of the specified task. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: See :meth:`df_phases`. :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None Yield :class: `namedtuple` reporting: * `id` : the iteration ID * `start` : the iteration start time * `end` : the iteration end time :return: Generator yielding :class:`PhaseWindow` with start end end timestamps. """ for phase in self.df_phases(task, wlgen_profile=wlgen_profile).itertuples(): start = phase.Index yield PhaseWindow( phase.phase, start, start + phase.duration, phase.properties, )
[docs] @_get_rtapp_phases.used_events def df_rtapp_phases_start(self, task=None, wlgen_profile=None): """ Dataframe of phases start times. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__pid`` column: the PID of the task * A ``phase`` column: the phases counter for each ``__pid``:``__comm`` task The ``index`` represents the timestamp of a phase start event. """ return self._get_rtapp_phases('start', task, wlgen_profile=wlgen_profile)
[docs] @_get_rtapp_phases.used_events def df_rtapp_phases_end(self, task=None, wlgen_profile=None): """ Dataframe of phases end times. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__pid`` column: the PID of the task * A ``phase`` column: the phases counter for each ``__pid``:``__comm`` task The ``index`` represents the timestamp of a phase end event. """ return self._get_rtapp_phases('end', task, wlgen_profile=wlgen_profile)
@df_rtapp_phases_start.used_events def _get_task_phase(self, event, task, phase, wlgen_profile=None): task = self.trace.ana.tasks.get_task_id(task) if event == 'start': df = self.df_rtapp_phases_start(task, wlgen_profile=wlgen_profile) elif event == 'end': df = self.df_rtapp_phases_end(task, wlgen_profile=wlgen_profile) if isinstance(phase, str): lookup = lambda x: x.loc[phase] else: lookup = lambda x: x.iloc[phase] df = df.loc[task.comm, task.pid] if isinstance(phase, str): row = df.loc[phase] else: row = df.iloc[phase] return row.Time
[docs] @_get_task_phase.used_events def df_rtapp_phase_start(self, task, phase=0, wlgen_profile=None): """ Start of the specified phase for a given task. A negative phase value can be used to count from the oldest, e.g. -1 represents the last phase. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param phase: the ID (or name with ``wlgen_profile``) of the phase start to return. :type phase: int or str :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :returns: the requires task's phase start timestamp """ return self._get_task_phase('start', task, phase, wlgen_profile=wlgen_profile)
[docs] @_get_task_phase.used_events def df_rtapp_phase_end(self, task, phase=-1, wlgen_profile=None): """ End of the specified phase for a given task. A negative phase value can be used to count from the oldest, e.g. -1 represents the last phase. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param phase: the id (or name with ``wlgen_profile``) of the phase end to return. :type phase: int or str :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :returns: the requires task's phase end timestamp """ return self._get_task_phase('end', task, phase, wlgen_profile=wlgen_profile)
[docs] @memoized @df_rtapp_task.used_events def task_window(self, task): """ Return the start end end time for the specified task. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID """ task_df = self.df_rtapp_task(task) start_time = task_df[task_df.event == "start"].index[0] end_time = task_df[task_df.event == "end"].index[0] return (start_time, end_time)
[docs] @memoized @df_rtapp_phases_start.used_events def task_phase_window(self, task, phase, wlgen_profile=None): """ Return the window of a requested task phase. For the specified ``task`` it returns a tuple with the (start, end) time of the requested ``phase``. A negative phase number can be used to count phases backward from the last (-1) toward the first. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param phase: the ID (or name with ``wlgen_profile``) of the phase to consider. :type phase: int or str :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :rtype: PhaseWindow """ phase_start = self.df_rtapp_phase_start( task, phase, wlgen_profile=wlgen_profile, ) phase_end = self.df_rtapp_phase_end( task, phase, wlgen_profile=wlgen_profile, ) return PhaseWindow(phase, phase_start, phase_end, {})
[docs] @df_phases.used_events def task_phase_at(self, task, timestamp, wlgen_profile=None): """ Return the :class:`PhaseWindow` for the specified task and timestamp. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param timestamp: the timestamp to get the phase for :type timestamp: int or float :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhaseBase) or None :returns: the ID of the phase corresponding to the specified timestamp. """ df = self.df_phases(task, wlgen_profile=wlgen_profile) def get_info(row): start = row.name end = start + row['duration'] phase = row['phase'] properties = row['properties'] return (phase, start, end, properties) _, _, last_phase_end, _ = get_info(df.iloc[-1]) _, first_phase_start, _, _ = get_info(df.iloc[0]) if timestamp < first_phase_start: raise KeyError(f'timestamp={timestamp} is before the first phase start: {first_phase_start}') elif timestamp > last_phase_end: raise KeyError(f'timestamp={timestamp} is after last phase end: {last_phase_end}') i = df.index.get_indexer([timestamp], method='ffill')[0] return PhaseWindow(*get_info(df.iloc[i]))
########################################################################### # rtapp_phase events related methods ###########################################################################
[docs] @requires_events('userspace@rtapp_event') def df_rtapp_event(self, task=None): """ Returns a :class:`pandas.DataFrame` of all the rt-app generated events. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :returns: a :class:`pandas.DataFrame` with: * A ``__comm`` column: the actual rt-app trace task name * A ``__pid`` column: the PID of the task * A ``__cpu`` column: the CPU on which the task was running at event generation time * A ``__line`` column: the ftrace line numer * A ``type`` column: the type of the generated event * A ``desc`` column: the mnemonic type of the generated event * A ``id`` column: the ID of the resource associated to the event, e.g. the ID of the fired timer The ``index`` represents the timestamp of the event. """ df = self.trace.df_event('userspace@rtapp_event') return self._task_filtered(df, task)
########################################################################### # rtapp_stats events related methods ########################################################################### @TraceAnalysisBase.df_method @requires_events('userspace@rtapp_stats') def _get_stats(self): df = self.trace.df_event('userspace@rtapp_stats').copy(deep=False) # Add performance metrics column, performance is defined as: # slack # perf = ------------- # period - run df['perf_index'] = df['slack'] / (df['c_period'] - df['c_run']) return df
[docs] @_get_stats.used_events def df_rtapp_stats(self, task=None): """ Returns a :class:`pandas.DataFrame` of all the rt-app generated stats. :param task: the (optional) rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :returns: a :class:`pandas.DataFrame` with a set of colums representing the stats generated by rt-app after each loop. .. seealso:: the rt-app provided documentation: https://github.com/scheduler-tools/rt-app/blob/master/doc/tutorial.txt * A ``__comm`` column: the actual rt-app trace task name * A ``__pid`` column: the PID of the task * A ``__cpu`` column: the CPU on which the task was running at event generation time * A ``__line`` column: the ftrace line numer * A ``type`` column: the type of the generated event * A ``desc`` column: the mnemonic type of the generated event * A ``id`` column: the ID of the resource associated to the event, e.g. the ID of the fired timer The ``index`` represents the timestamp of the event. """ df = self._get_stats() return self._task_filtered(df, task)
############################################################################### # Plotting Methods ###############################################################################
[docs] @AnalysisHelpers.plot_method @df_phases.used_events @may_use_events(TasksAnalysis.df_task_states.used_events) def plot_phases(self, task: TaskID, wlgen_profile=None): """ Draw the task's phases colored bands :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param wlgen_profile: See :meth:`df_rtapp_loop` :type wlgen_profile: dict(str, lisa.wlgen.rta.RTAPhase) or None """ phases_df = self.df_phases(task, wlgen_profile=wlgen_profile) try: states_df = self.ana.tasks.df_task_states(task) except MissingTraceEventError: def cpus_of_phase_at(t): return [] else: def cpus_of_phase_at(t): end = t + phases_df['duration'][t] window = (t, end) df = df_window(states_df, window, method='pre') return sorted(int(x) for x in df['cpu'].unique()) def make_band(row): t = row.name end = t + row['duration'] phase = str(row['phase']) return (phase, t, end, cpus_of_phase_at(t)) # Compute phases intervals bands = phases_df.apply(make_band, axis=1) def make_label(cpus, phase): if cpus: cpus = f" (CPUs {', '.join(map(str, cpus))})" else: cpus = '' return f'rt-app phase {phase}{cpus}' return hv.Overlay( [ hv.VSpan( start, end, label=make_label(cpus, phase) ).options( alpha=0.1, ) for phase, start, end, cpus in bands ] ).options( title=f'Task {task} phases' )
[docs] @AnalysisHelpers.plot_method @df_rtapp_stats.used_events def plot_perf(self, task: TaskID): r""" Plot the performance index. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID The perf index is defined as: .. math:: perf_index = \frac{slack}{c_period - c_run} where - ``c_period``: is the configured period for an activation - ``c_run``: is the configured run time for an activation, assuming to run at the maximum frequency and on the maximum capacity CPU. - ``slack``: is the measured slack for an activation The slack is defined as the different among the activation deadline and the actual completion time of the activation. The deadline defines also the start of the next activation, thus in normal conditions a task activation is always required to complete before its deadline. The slack is thus a positive value if a task complete before its deadline. It's zero when a task complete an activation right at its eadline. It's negative when the completion is over the deadline. Thus, a performance index in [0..1] range represents activations completed within their deadlines. While, the more the performance index is negative the more the task is late with respect to its deadline. """ task = self.trace.ana.tasks.get_task_id(task) return plot_signal( self.df_rtapp_stats(task)['perf_index'], name=f'{task} performance index', )
[docs] @AnalysisHelpers.plot_method @df_rtapp_stats.used_events def plot_latency(self, task: TaskID): """ Plot the Latency/Slack and Performance data for the specified task. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID .. seealso:: :meth:`plot_perf` for metrics definition. """ task = self.trace.ana.tasks.get_task_id(task) return hv.Overlay( [ plot_signal( self.df_rtapp_stats(task)[col], name=f'{task} {col} (us)', ) for col in ('slack', 'wu_lat') ] ).options( title=f'Task {task} (start) Latency and (completion) Slack' )
[docs] @AnalysisHelpers.plot_method @df_rtapp_stats.used_events def plot_slack_histogram(self, task: TaskID, bins: int=30): """ Plot the slack histogram. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param bins: number of bins for the histogram. :type bins: int .. seealso:: :meth:`plot_perf` for the slack definition. """ task = self.trace.ana.tasks.get_task_id(task) name = f'slack of {task} (us)' series = self.df_rtapp_stats(task)['slack'] return hv.Histogram(np.histogram(series, bins=bins)).options( xlabel=name, title=name, )
[docs] @AnalysisHelpers.plot_method @df_rtapp_stats.used_events def plot_perf_index_histogram(self, task: TaskID, bins: int=30): """ Plot the perf index histogram. :param task: the rt-app task to filter for :type task: int or str or lisa.analysis.tasks.TaskID :param bins: number of bins for the histogram. :type bins: int .. seealso:: :meth:`plot_perf` for the perf index definition. """ task = self.trace.ana.tasks.get_task_id(task) name = f'perf index of {task} (us)' series = self.df_rtapp_stats(task)['perf_index'] return hv.Histogram( np.histogram(series, bins=bins), label=name, ).options( xlabel=name, title=name, )
# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab