Source code for lisa.analysis.latency

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pandas as pd
import numpy as np
import holoviews as hv

from lisa.analysis.base import TraceAnalysisBase
from lisa.notebook import COLOR_CYCLE, _hv_neutral
from lisa.analysis.tasks import TaskState, TasksAnalysis, TaskID
from lisa.datautils import df_refit_index
from lisa.trace import MissingTraceEventError


[docs] class LatencyAnalysis(TraceAnalysisBase): """ Support for plotting Latency Analysis data :param trace: input Trace object :type trace: lisa.trace.Trace """ name = 'latency' LATENCY_THRESHOLD_ZONE_COLOR = COLOR_CYCLE[2] LATENCY_THRESHOLD_COLOR = COLOR_CYCLE[3] ############################################################################### # DataFrame Getter Methods ############################################################################### @TraceAnalysisBase.df_method @TasksAnalysis.df_task_states.used_events def _df_latency(self, task, name, curr_state, next_state): df = self.ana.tasks.df_task_states(task) df = df[ (df.curr_state == curr_state) & (df.next_state == next_state) ][["delta", "cpu", "target_cpu"]] df = df.rename(columns={'delta': name}, copy=False) return df
[docs] @_df_latency.used_events def df_latency_wakeup(self, task): """ DataFrame of a task's wakeup latencies :param task: The task's name or PID :type task: int or str or tuple(int, str) :returns: a :class:`pandas.DataFrame` with: * A ``wakeup_latency`` column (the wakeup latency at that timestamp) * A ``cpu`` column (the CPU where the event took place) * A ``target_cpu`` column (the CPU where the task has been scheduled) """ return self._df_latency( task, 'wakeup_latency', TaskState.TASK_WAKING, TaskState.TASK_ACTIVE )
[docs] @_df_latency.used_events def df_latency_preemption(self, task): """ DataFrame of a task's preemption latencies :param task: The task's name or PID :type task: int or str or tuple(int, str) :returns: a :class:`pandas.DataFrame` with: * A ``preempt_latency`` column (the preemption latency at that timestamp) * A ``cpu`` column (the CPU where the event took place) """ return self._df_latency( task, 'preempt_latency', TaskState.TASK_RUNNING, TaskState.TASK_ACTIVE )[['preempt_latency', 'cpu']]
[docs] @TraceAnalysisBase.df_method @TasksAnalysis.df_task_states.used_events def df_activations(self, task): """ DataFrame of a task's activations :param task: The task's name or PID :type task: int or str or tuple(int, str) :returns: a :class:`pandas.DataFrame` with: * An ``activation_interval`` column (the time since the last activation). """ wkp_df = self.ana.tasks.df_task_states(task) wkp_df = wkp_df[wkp_df.curr_state == TaskState.TASK_WAKING] index = wkp_df.index.to_series() activation_interval = (index.shift(-1) - index).shift(1) return pd.DataFrame({'activation_interval': activation_interval})
[docs] @TraceAnalysisBase.df_method @TasksAnalysis.df_task_states.used_events def df_runtimes(self, task): """ DataFrame of task's runtime each time the task blocks :param task: The task's name or PID :type task: int or str or tuple(int, str) :returns: a :class:`pandas.DataFrame` with: * The times where the task stopped running as an index * A ``curr_state`` column (the current task state, see :class:`lisa.analysis.tasks.TaskState`) * A ``running_time`` column (the cumulated running time since the last activation). """ df = self.ana.tasks.df_task_states(task) runtimes = [] spurious_wkp = False # Using df.apply() is risky for counting (can be called more than once # on the same row), so use a loop instead for index, row in df.iterrows(): runtime = runtimes[-1] if len(runtimes) else 0 if row.curr_state == TaskState.TASK_WAKING: # This is required to capture strange trace sequences where # a switch_in event is followed by a wakeup_event. # This sequence is not expected, but we found it in some traces. # Possible reasons could be: # - misplaced sched_wakeup events # - trace buffer artifacts # TO BE BETTER investigated in kernel space. # For the time being, we account this interval as RUNNING time, # which is what kernelshark does. if spurious_wkp: runtime += row.delta spurious_wkp = False else: # This is a new activation, reset the runtime counter runtime = 0 elif row.curr_state == TaskState.TASK_ACTIVE: # This is the spurious wakeup thing mentionned above if row.next_state == TaskState.TASK_WAKING: spurious_wkp = True runtime += row.delta runtimes.append(runtime) df["running_time"] = runtimes # The runtime column is not entirely correct - at a task's first # TASK_ACTIVE occurence, the running_time will be non-zero, even # though the task has not run yet. However, it's much simpler to # accumulate the running_time the way we do and shift it later. df.running_time = df.running_time.shift(1) df.running_time = df.running_time.fillna(0) return df[~df.curr_state.isin([ TaskState.TASK_ACTIVE, TaskState.TASK_WAKING ])][["curr_state", "running_time"]]
############################################################################### # Plotting Methods ############################################################################### def _plot_threshold(self, y, **kwargs): return hv.HLine( y, group='threshold', **kwargs, ).options( color=self.LATENCY_THRESHOLD_COLOR ).options( backend='bokeh', line_dash='dashed', ) def _plot_markers(self, df, label): return hv.Scatter(df, label=label).options(marker='+').options( backend='bokeh', size=5, ) def _plot_overutilized(self): try: return self.ana.status.plot_overutilized() except MissingTraceEventError: return _hv_neutral()
[docs] @TraceAnalysisBase.plot_method @df_latency_wakeup.used_events @df_latency_preemption.used_events def plot_latencies(self, task: TaskID, wakeup: bool=True, preempt: bool=True, threshold_ms: float=1): """ Plot the latencies of a task over time :param task: The task's name or PID :type task: int or str or tuple(int, str) :param wakeup: Whether to plot wakeup latencies :type wakeup: bool :param preempt: Whether to plot preemption latencies :type preempt: bool :param threshold_ms: The latency threshold to plot :type threshold_ms: int or float """ def make_fig(name, df_getter, label): df = df_getter(task) if df.empty: self.logger.warning(f"No data to plot for {name}") else: df = df_refit_index(df, window=self.trace.window) return self._plot_markers(df, label) return hv.Overlay( [ make_fig(name, df_getter, label) for do_plot, name, label, df_getter in ( (wakeup, 'wakeup', 'Wakeup', self.df_latency_wakeup), (preempt, 'preempt', 'Preemption', self.df_latency_preemption), ) if do_plot ] + [ self._plot_threshold( threshold_ms / 1e3, label=f"{threshold_ms}ms threshold", ) ] ).options( title=f'Latencies of task "{task}"', ylabel='Latency (s)', )
def _get_cdf(self, data, threshold): """ Build the "Cumulative Distribution Function" (CDF) for the given data """ index = data.sort_values() index.name = None series = pd.Series(np.linspace(0, 1, len(index)), index=index) series.name = data.name # Compute percentage of samples above/below the specified threshold below = float(max(series[:threshold])) above = 1 - below return series, above, below @df_latency_wakeup.used_events @df_latency_preemption.used_events def _get_latencies_df(self, task, wakeup, preempt): wkp_df = None prt_df = None if wakeup: wkp_df = self.df_latency_wakeup(task) wkp_df = wkp_df.rename(columns={'wakeup_latency': 'latency'}, copy=False) if preempt: prt_df = self.df_latency_preemption(task) prt_df = prt_df.rename(columns={'preempt_latency': 'latency'}, copy=False) if wakeup and preempt: df = pd.concat([wkp_df, prt_df]) else: df = wkp_df or prt_df return df
[docs] @TraceAnalysisBase.plot_method @_get_latencies_df.used_events def plot_latencies_cdf(self, task: TaskID, wakeup: bool=True, preempt: bool=True, threshold_ms: float=1): """ Plot the latencies Cumulative Distribution Function of a task :param task: The task's name or PID :type task: int or str or tuple(int, str) :param wakeup: Whether to plot wakeup latencies :type wakeup: bool :param preempt: Whether to plot preemption latencies :type preempt: bool :param threshold_ms: The latency threshold to plot :type threshold_ms: int or float """ df = self._get_latencies_df(task, wakeup, preempt) threshold_s = threshold_ms / 1e3 cdf, above, below = self._get_cdf(df['latency'], threshold_s) return ( hv.Curve(cdf, label='CDF') * self._plot_threshold( below, label=f"Latencies below {threshold_ms}ms", ) * hv.VSpan( 0, threshold_s, label=f"{threshold_ms}ms threshold zone", ).options( alpha=0.5, color=self.LATENCY_THRESHOLD_ZONE_COLOR, ) ).options( title=f'Latencies CDF of task "{task}"', xlabel="Latency (s)", ylabel="Latencies below the x value (%)", )
[docs] @TraceAnalysisBase.plot_method @_get_latencies_df.used_events def plot_latencies_histogram(self, task: TaskID, wakeup: bool=True, preempt: bool=True, threshold_ms: float=1, bins: int=64): """ Plot the latencies histogram of a task :param task: The task's name or PID :type task: int or str or tuple(int, str) :param wakeup: Whether to plot wakeup latencies :type wakeup: bool :param preempt: Whether to plot preemption latencies :type preempt: bool :param threshold_ms: The latency threshold to plot :type threshold_ms: int or float """ df = self._get_latencies_df(task, wakeup, preempt) threshold_s = threshold_ms / 1e3 name = f'Latencies histogram of task {task}' return ( hv.Histogram( np.histogram(df['latency'], bins=bins), label=name, ) * hv.VSpan( 0, threshold_s, label=f"{threshold_ms}ms threshold zone", ).options( color=self.LATENCY_THRESHOLD_ZONE_COLOR, alpha=0.5, ) ).options( xlabel='Latency (s)', title=name, )
[docs] @TraceAnalysisBase.plot_method @df_latency_wakeup.used_events @df_latency_preemption.used_events def plot_latency_bands(self, task: TaskID): """ Draw the task wakeup/preemption latencies as colored bands :param task: The task's name or PID :type task: int or str or tuple(int, str) """ wkl_df = self.df_latency_wakeup(task) prt_df = self.df_latency_preemption(task) def plot_bands(df, column, label): df = df_refit_index(df, window=self.trace.window) if df.empty: return _hv_neutral() return hv.Overlay( [ hv.VSpan( start, start + duration, label=label, ).options( alpha=0.5, ) for start, duration in df[[column]].itertuples() ] ) return ( plot_bands(wkl_df, "wakeup_latency", "Wakeup latencies") * plot_bands(prt_df, "preempt_latency", "Preemption latencies") )
[docs] @TraceAnalysisBase.plot_method @df_activations.used_events def plot_activations(self, task: TaskID): """ Plot the :meth:`lisa.analysis.latency.LatencyAnalysis.df_activations` of a task :param task: The task's name or PID :type task: int or str or tuple(int, str) """ wkp_df = self.df_activations(task) wkp_df = df_refit_index(wkp_df, window=self.trace.window) name = f'Activation intervals of task {task}' return ( self._plot_markers(wkp_df, name) * self._plot_overutilized() ).options( title=name, ylabel='Activation interval (s)', )
[docs] @TraceAnalysisBase.plot_method @df_runtimes.used_events def plot_runtimes(self, task: TaskID): """ Plot the :meth:`lisa.analysis.latency.LatencyAnalysis.df_runtimes` of a task :param task: The task's name or PID :type task: int or str or tuple(int, str) """ df = self.df_runtimes(task) df = df_refit_index(df, window=self.trace.window) name = f'Per-activation runtimes of task {task}' return ( self._plot_markers(df, name) * self._plot_overutilized() ).options( title=name, )
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80