# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" Functions Analysis Module """
import json
import os
from operator import itemgetter, attrgetter, mul
from functools import reduce
from itertools import chain
from collections.abc import Mapping
from enum import IntEnum
import pandas as pd
from pandas.api.types import is_numeric_dtype
import numpy as np
import holoviews as hv
from lisa.utils import FrozenDict, memoized, unzip_into
from lisa.datautils import df_merge
from lisa.analysis.base import TraceAnalysisBase, AnalysisHelpers
from lisa.analysis.load_tracking import LoadTrackingAnalysis
from lisa.trace import MissingTraceEventError, requires_one_event_of
from lisa.conf import ConfigKeyError
from lisa.stats import Stats
from lisa.pelt import PELT_SCALE
[docs]
class FunctionsAnalysis(TraceAnalysisBase):
"""
Support for ftrace events-based kernel functions profiling and analysis
"""
name = 'functions'
[docs]
def df_resolve_ksym(self, df, addr_col, name_col='func_name', addr_map=None, exact=True):
"""
Resolve the kernel function names.
.. note:: If the ``addr_col`` is not of a numeric dtype, it will be
assumed to be function names already and the content will be copied
to ``name_col``.
:param df: Dataframe to augment
:type df: pandas.DataFrame
:param addr_col: Name of the column containing a kernel address.
:type addr_col: str
:param name_col: Name of the column to create with symbol names
:param name_col: str
:param addr_map: If provided, the mapping of kernel addresses to symbol
names. If missing, the symbols addresses from the
:class:`lisa.platforms.platinfo.PlatformInfo` attached to the trace
will be used.
:type addr_map: dict(int, str)
:param exact: If ``True``, an exact symbol address is expected. If
``False``, symbol addresses are sorted and paired to form
intervals, which are then used to infer the name. This is suited to
resolve an instruction pointer that could point anywhere inside of
a function (but before the starting address of the next function).
:type exact: bool
"""
trace = self.trace
df = df.copy(deep=False)
# Names already resolved, we can just copy the address column to the
# name one
if not is_numeric_dtype(df[addr_col].dtype):
df[name_col] = df[addr_col]
return df
if addr_map is None:
addr_map = trace.plat_info['kernel']['symbols-address']
if exact:
df[name_col] = df[addr_col].map(addr_map)
# Not exact means the function addresses will be used as ranges, so
# we can find in which function any instruction point value is
else:
# Sort by address, so that each consecutive pair of address
# constitue a range of address belonging to a given function.
addr_list = sorted(
addr_map.items(),
key=itemgetter(0)
)
bins, labels = zip(*addr_list)
# "close" the last bucket with the highest value possible of that column
max_addr = np.iinfo(df[addr_col].dtype).max
bins = list(bins) + [max_addr]
name_i = pd.cut(
df[addr_col],
bins=bins,
# Since our labels are not unique, we cannot pass it here
# directly. Instead, use an index into the labels list
labels=range(len(labels)),
# Include the left boundary and exclude the right one
include_lowest=True,
right=False,
)
df[name_col] = name_i.apply(lambda x: labels[x])
return df
def _df_with_ksym(self, event, *args, **kwargs):
df = self.trace.df_event(event)
try:
return self.df_resolve_ksym(df, *args, **kwargs)
except ConfigKeyError:
self.logger.warning(f'Missing symbol addresses, function names will not be resolved: {e}')
return df
[docs]
@requires_one_event_of('funcgraph_entry', 'funcgraph_exit')
@TraceAnalysisBase.df_method
def df_funcgraph(self, event):
"""
Return augmented dataframe of the event with the following column:
* ``func_name``: Name of the calling function if it could be
resolved.
:param event: One of:
* ``entry`` (``funcgraph_entry`` event)
* ``exit`` (``funcgraph_exit`` event)
:type event: str
"""
event = f'funcgraph_{event}'
return self._df_with_ksym(event, 'func', 'func_name', exact=False)
@df_funcgraph.used_events
@LoadTrackingAnalysis.df_cpus_signal.used_events
def _get_callgraph(self, tag_df=None, thread_root_functions=None):
entry_df = self.df_funcgraph(event='entry').copy(deep=False)
entry_df['event'] = _CallGraph._EVENT.ENTRY
exit_df = self.df_funcgraph(event='exit').copy(deep=False)
exit_df['event'] = _CallGraph._EVENT.EXIT
# Attempt to get the CPU capacity signal to normalize the results
capacity_cols = ['__cpu', 'event', 'capacity']
try:
capacity_df = self.ana.load_tracking.df_cpus_signal('capacity')
except MissingTraceEventError:
capacity_df = pd.DataFrame(columns=capacity_cols)
else:
capacity_df = capacity_df.copy(deep=False)
capacity_df['__cpu'] = capacity_df['cpu']
capacity_df['event'] = _CallGraph._EVENT.SET_CAPACITY
capacity_df = capacity_df[capacity_cols]
# Set a reasonable initial capacity
try:
orig_capacities = self.trace.plat_info['cpu-capacities']['orig']
except KeyError:
pass
else:
orig_capacities_df = pd.DataFrame.from_records(
(
(-1 * cpu, cpu, _CallGraph._EVENT.SET_CAPACITY, cap)
for cpu, cap in orig_capacities.items()
),
columns=['Time', '__cpu', 'event', 'capacity'],
index='Time',
)
capacity_df = pd.concat((orig_capacities_df, capacity_df))
to_merge = [entry_df, exit_df, capacity_df]
if tag_df is not None:
cpu = tag_df['__cpu']
tag_df = tag_df.drop(columns=['__cpu'])
tag_df = pd.DataFrame(dict(
tags=tag_df.apply(pd.Series.to_dict, axis=1),
__cpu=cpu,
))
tag_df['event'] = _CallGraph._EVENT.SET_TAG
to_merge.append(tag_df)
df = df_merge(to_merge)
return _CallGraph.from_df(
df,
thread_root_functions=thread_root_functions
)
[docs]
@_get_callgraph.used_events
def df_calls(self, tag_df=None, thread_root_functions=None, normalize=True):
"""
Return a :class:`pandas.DataFrame` with a row for each function call,
along some metrics:
* ``cum_time``: cumulative time spent in that function. This
includes the time spent in all children too.
* ``self_time``: time spent in that function only. This
excludes the time spent in all children.
:param tag_df: Dataframe containing the tag event, which is used to tag
paths in the callgraph. The ``__cpu`` column is mandatory in order
to know which CPU is to be tagged at any index. Other colunms will
be used as tag keys. Tags are inherited from from both parents and
children. This allows a leaf function to emit an event and use it
for the whole path that lead to there. Equally, if a function emits
a tag, all the children of this call will inherit the tag too. This
allows a top-level function to tag a whole subtree at once.
:type tag_df: pandas.DataFrame
:param thread_root_functions: Functions that are considered to be a
root of threads. When they appear in the callgraph, the profiler
will consider the current function to be preempted and will not
register the call as a child of it and will avoid to count it in
the cumulative time.
:type thread_root_functions: list(str) or None
:param normalize: Normalize metrics according to the current CPU
capacity so that they appear to have run on the fastest CPU at
maximum frequency. This allows merging calls regardless of their
origin (CPU and frequency).
.. note:: Normalization only currently takes into account the
capacity of the CPU when the function is entered. If it changes
during execution, the result will be somewhat wrong.
:type normalize: bool
.. note:: Calls during which the current function name changes are not
accounted for. They are typically a sign of functions that did not
properly return, for example functions triggering a context switch
and returning to userspace.
"""
graph = self._get_callgraph(
tag_df=tag_df,
thread_root_functions=thread_root_functions,
)
metrics = _CallGraphNode._METRICS
def get_metric(node, metric):
val = node[metric]
if normalize:
return (node.cpu_capacity / PELT_SCALE) * val
else:
return val
return pd.DataFrame.from_records(
(
(
node.entry_time, node.cpu, node.func_name, FrozenDict(node.tags), node.tagged_name,
*(
get_metric(node, metric)
for metric in metrics
)
)
for node in graph.all_nodes
),
columns=['Time', 'cpu', 'function', 'tags', 'tagged_name'] + metrics,
index='Time',
)
[docs]
@df_calls.used_events
def compare_with_traces(self, others, normalize=True, **kwargs):
"""
Compare the :class:`~lisa.trace.Trace` it's called on with the other
traces passed as ``others``. The reference is the trace it's called on.
:returns: a :class:`lisa.stats.Stats` object just like
:meth:`profile_stats`.
:param others: List of traces to compare against.
:type others: list(lisa.trace.Trace)
:Variable keyword arguments: Forwarded to :meth:`profile_stats`.
"""
ref = self.trace
traces = [ref] + list(others)
paths = [
trace.trace_path
for trace in traces
]
common_prefix_len = len(os.path.commonprefix(paths))
common_suffix_len = len(os.path.commonprefix(list(map(lambda x: str(reversed(x)), paths))))
def get_name(trace):
name = trace.trace_path[common_prefix_len:common_suffix_len]
if not name:
if trace is ref:
name = 'ref'
else:
name = str(traces.index(trace))
return name
def get_df(trace):
df = self.df_calls(normalize=normalize)
df = df.copy(deep=False)
df['trace'] = get_name(trace)
return df
df = df_merge(map(get_df, traces))
ref_group = {
'trace': get_name(ref)
}
return self._profile_stats_from_df(df, ref_group=ref_group, **kwargs)
[docs]
@df_calls.used_events
def profile_stats(self, tag_df=None, normalize=True, ref_function=None, ref_tags=None, **kwargs):
"""
Create a :class:`lisa.stats.Stats` out of profiling information of the
trace.
:param tag_df: Dataframe of tags, forwarded to :meth:`df_calls`
:type tag_df: pandas.DataFrame or None
:param normalize: Normalize execution time according to CPU capacity,
forwarded to to :meth:`df_calls`
:type normalize: bool
:param metric: Name of the metric to use for statistics. Can be one of:
* ``self_time``: Time spent in the function, not accounting for
time spent in children
* ``cum_time``: Total time spent in the function, including the
time spent in children.
Defaults to ``self_time``.
:type metric: str
:param functions: Restrict the statistics to the given list of
function.
:type functions: list(str) or None
:param ref_function: Function to compare to.
:type ref_function: str or None
:param ref_tags: Function tags to compare to. Ignored if ``ref_function
is None``.
:type ref_tags: dict(str, set(object)) or None
:param cpus: List of CPUs where the functions were called to take into
account. If left to ``None``, all CPUs are considered.
:type cpus: list(int) or None
:param per_cpu: If ``True``, the per-function statistics are separated
for each CPU they ran on. This is useful if the frequency was fixed and
the only variation in speed was coming from the CPU it ran on.
:type per_cpu: bool or None
:param tags: Restrict the statistics to the function tagged with the
given tag values. If a function has multiple values for a given tag
and one of the value is in ``tags``, the function is selected.
:type tags: dict(str, object)
:Variable keyword arguments: Forwarded to :class:`lisa.stats.Stats`.
.. note:: Recursive calls are treated as if they were inlined in their
callers. This means that the count of calls will be counting the
toplevel calls only, and that the ``self_time`` for a recursive
function is directly linked to how much time each level consumes
multiplied by the number of levels. ``cum_time`` will also be
tracked on the top-level call only to provide a more accurate
result.
"""
df = self.df_calls(tag_df=tag_df, normalize=normalize)
if ref_function:
ref_tags = ref_tags or {}
ref_group = {
'f': _CallGraphNode.format_name(ref_function, ref_tags)
}
else:
ref_group = None
return self._profile_stats_from_df(df, ref_group=ref_group, **kwargs)
@staticmethod
def _profile_stats_from_df(df, metric='self_time', functions=None, per_cpu=True, cpus=None, tags=None, **kwargs):
metrics = _CallGraphNode._METRICS
# Get rid of the other value columns to avoid treating them as
# tags
other_metrics = set(metrics) - {metric}
if functions:
df = df[df['function'].isin(functions)]
if cpus is not None:
df = df[df['cpu'].isin(cpus)]
if tags:
# Select all rows that are a subset of the given tags
def select_tag(row_tags):
return all(
val in row_tags.get(tag, [])
for tag, val in tags.items()
)
df = df[df['tags'].apply(select_tag)]
df = df.copy(deep=False)
# Use tagged_name for display
df['f'] = df['tagged_name']
to_drop = list(other_metrics) + ['tags', 'function', 'tagged_name']
# Calls are already uniquely identified by their timestamp, so grouping
# per CPU is optional
if not per_cpu:
to_drop.append('cpu')
df = df.drop(columns=to_drop)
df['unit'] = 's'
index_name = df.index.name
df = df.reset_index()
return Stats(
df,
agg_cols=[index_name],
value_col=metric,
**kwargs,
)
class _CallGraph:
class _EVENT(IntEnum):
"""
To be used as events for the dataframe passed to
:meth:`from_df`.
"""
ENTRY = 1
"""Enter the given function"""
EXIT = 2
"""Exit the given function"""
SET_TAG = 3
"""
Tag the current call graph path (parents and
children) with the given value
"""
SET_CAPACITY = 4
"""
Set the capacity of the current CPU. Values are between 0 and
:attr:`lisa.pelt.PELT_SCALE`.
"""
def __init__(self, cpu_nodes):
self.cpu_nodes = cpu_nodes
@property
def all_nodes(self):
return chain.from_iterable(
node.indirect_children
for node in self.cpu_nodes.values()
)
@classmethod
def from_df(cls, df, thread_root_functions=None, ts_cols=('calltime', 'rettime')):
"""
Build a :class:`_CallGraph` from a :class:`pandas.DataFrame` with the
following columns:
* ``event``: One of :class:`_CallGraph._EVENT` enumeration.
* ``func_name``: Name of the function for ``entry`` and ``exit``
events.
* ``tags``: ``dict(str, object)`` of tags for ``tag`` event.
:param thread_root_functions: Functions that are considered to be a
root of threads. When they appear in the callgraph, the profiler
will consider the current function to be preempted and will not
register the call as a child of it and will avoid to count it in
the cumulative time.
:type thread_root_functions: list(str) or None
:param ts_cols: Name of the columns for the
:attr:`_CallGraph._EVENT.EXIT` rows that contain timestamps for
entry and exit. If they are provided, they will be used instead of
the index.
:type ts_cols: tuple(str) or None
"""
thread_root_functions = set(thread_root_functions) if thread_root_functions else set()
def make_visitor():
_max_thread = -1
def make_thread():
nonlocal _max_thread
_max_thread += 1
return _max_thread
root_node = _CallGraphNode(
func_name=None,
parent=None,
cpu=None,
cpu_capacity=None,
logical_thread=make_thread(),
)
curr_node = root_node
# This is expected to be overriden right away by a SET_CAPACITY
# event
curr_capacity = PELT_SCALE
event_enum = cls._EVENT
def visit(row):
nonlocal curr_node, curr_capacity
curr_event = row['event']
if curr_event == event_enum.ENTRY:
func_name = row['func_name']
cpu = row['__cpu']
# If we got preempted by a function that is considered to
# be part of different logical thread (e.g. the toplevel
# function of an ISR), create a new ID
if func_name in thread_root_functions:
logical_thread = make_thread()
# Otherwise, just inherit it from the parent
else:
logical_thread = curr_node.logical_thread
child = _CallGraphNode(
func_name=func_name,
cpu=cpu,
parent=curr_node,
cpu_capacity=curr_capacity,
entry_time=row.name,
logical_thread=logical_thread,
)
curr_node._children.append(child)
curr_node = child
elif curr_event == event_enum.EXIT:
# We are trying to exit the root, which is probably the sign of
# a missing entry event (could have been cropped out of the
# trace). We therefore just ignore it.
if curr_node is not root_node:
# That node is unusable for stats, since the function
# used to enter the call is not the same one as for the
# exit. This usually means that the kernel returned to
# userspace in between.
if row['func_name'] != curr_node.func_name:
curr_node.valid_metrics = False
if ts_cols is None:
curr_node.exit_time = row.name
else:
entry_ts, exit_ts = ts_cols
curr_node.entry_time = row[entry_ts] * 1e-9
curr_node.exit_time = row[exit_ts] * 1e-9
curr_node = curr_node.parent
elif curr_event == event_enum.SET_TAG:
tags = row['tags']
curr_node.set_tags(tags)
elif curr_event == event_enum.SET_CAPACITY:
curr_capacity = row['capacity']
else:
raise ValueError(f'Unknown event "{curr_event}"')
def finalize(df):
# Fixup the exit time if there were missing exit events
if curr_node is not root_node:
last_time = df.index[-1]
for node in chain([curr_node], curr_node.parents):
node.exit_time = last_time
node.valid_metrics = False
root_children = root_node.children
if root_children:
root_node.entry_time = min(map(attrgetter('entry_time'), root_children))
root_node.exit_time = max(map(attrgetter('exit_time'), root_children))
else:
root_node.entry_time = 0
root_node.exit_time = 0
return (root_node, visit, finalize)
def build_graph(subdf):
root_node, visitor, finalizer = make_visitor()
subdf.apply(visitor, axis=1)
finalizer(subdf)
return root_node
return cls(
cpu_nodes = {
cpu: build_graph(subdf)
for cpu, subdf in df.groupby('__cpu', observed=True, group_keys=False)
}
)
class _CallGraphNode(Mapping):
"""
Represent a function call extracted from some profiling information.
"""
__slots__ = [
'func_name',
'cpu',
'cpu_capacity',
'_tags',
'_children',
'parent',
'logical_thread',
'entry_time',
'exit_time',
'valid_metrics',
'__weakref__',
]
_METRICS = sorted((
'cum_time',
'self_time',
))
def __init__(self, func_name, parent, logical_thread, cpu, cpu_capacity, entry_time=None, exit_time=None, valid_metrics=True):
self.func_name = func_name
self.cpu = cpu
self.cpu_capacity = cpu_capacity
self.parent = parent
self.logical_thread = logical_thread
self._children = []
self._tags = {}
self.entry_time = entry_time
self.exit_time = exit_time
self.valid_metrics = valid_metrics
def __hash__(self):
return id(self)
def __eq__(self, other):
return self is other
@property
@memoized
def _expanded_children(self):
def visit(node):
children = node._children
children_visit = map(visit, children)
is_recursive, children_expansion = unzip_into(2, children_visit)
# Check if we are part of any recursive chain
is_recursive = any(is_recursive) or node.func_name == self.func_name
# If we are part of a recursion chain, expand all of our children
# so that they are reparented into our caller
if is_recursive:
expansion = list(chain.from_iterable(children_expansion))
else:
expansion = [node]
return (is_recursive, expansion)
return visit(self)[1]
@property
@memoized
def children(self):
return [
child
for child in self._expanded_children
if not self._is_preempted_by(child)
]
@property
def _preempting_children(self):
return {
child
for child in self._expanded_children
if self._is_preempted_by(child)
}
def _is_preempted_by(self, node):
return self.logical_thread != node.logical_thread
def _str(self, idt):
idt_str = idt * ' '
if self.children:
children = ':\n' + '\n'.join(child._str(idt + 1) for child in self.children)
else:
children = ''
return f'{idt_str}{self.func_name}, self={self["self_time"]}s cum={self["cum_time"]}s tags={self.tags}{children}'
def __str__(self):
return self._str(0)
@property
def tagged_name(self):
return self.format_name(self.func_name, self.tags)
@staticmethod
def format_name(func_name, tags):
tags = tags or {}
tags = ', '.join(
f'{tag}={"|".join(map(str, vals))}'
for tag, vals in sorted(tags.items())
)
tags = f' ({tags})' if tags else ''
return f'{func_name}{tags}'
@memoized
def __getitem__(self, key):
if not self.valid_metrics:
return np.NaN
delta = self.exit_time - self.entry_time
if key == 'self_time':
return delta - sum(
node.exit_time - node.entry_time
# Substract the time spent in all the children, including the
# ones that preempted us
for node in self._expanded_children
)
elif key == 'cum_time':
# Define cum_time in terms of self_time, so that preempting
# children are properly accounted for recurisvely
return self['self_time'] + sum(
node['cum_time']
for node in self.children
)
else:
raise KeyError(f'Unknown metric "{key}"')
def __iter__(self):
return iter(self._METRICS)
def __len__(self):
return len(self._METRICS)
@property
def _inherited_tags(self):
def merge_tags(tags1, tags2):
common_keys = tags1.keys() & tags2.keys()
new = {
tag: tags1[tag] | tags2[tag]
for tag in common_keys
}
for tags in (tags1, tags2):
new.update({
tag: tags[tag]
for tag in tags.keys() - common_keys
})
return new
# Since merge_tags() is commutative (merge_tags(a, b) == merge_tags(b,
# a)), we don't need any specific ordering on the parents
nodes = chain(self.parents, self.indirect_children)
tags = reduce(merge_tags, map(attrgetter('_tags'), nodes), {})
return tags
@property
@memoized
def tags(self):
return dict(
(key, frozenset(vals))
for key, vals in {
**self._inherited_tags,
**self._tags,
}.items()
)
@property
def parents(self):
parent = self.parent
if parent is not None:
yield parent
yield from parent.parents
@property
def indirect_children(self):
for child in self.children:
yield child
yield from child.indirect_children
def set_tags(self, tags):
for tag, val in tags.items():
self._tags.setdefault(tag, set()).add(val)
[docs]
class JSONStatsFunctionsAnalysis(AnalysisHelpers):
"""
Support for kernel functions profiling and analysis
:param stats_path: Path to JSON function stats as returned by devlib
:meth:`devlib.collector.ftrace.FtraceCollector.get_stats`
:type stats_path: str
"""
name = 'functions_json'
def __init__(self, stats_path):
self.stats_path = stats_path
# Opening functions profiling JSON data file
with open(self.stats_path) as f:
stats = json.load(f)
# Build DataFrame of function stats
frames = {}
for cpu, data in stats.items():
frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index')
# Build and keep track of the DataFrame
self._df = pd.concat(list(frames.values()),
keys=list(frames.keys()))
[docs]
def get_default_plot_path(self, **kwargs):
return super().get_default_plot_path(
default_dir=os.path.dirname(self.stats_path),
**kwargs,
)
[docs]
def df_functions_stats(self, functions=None):
"""
Get a DataFrame of specified kernel functions profile data
For each profiled function a DataFrame is returned which reports stats
on kernel functions execution time. The reported stats are per-CPU and
includes: number of times the function has been executed (hits),
average execution time (avg), overall execution time (time) and samples
variance (s_2).
By default returns a DataFrame of all the functions profiled.
:param functions: the name of the function or a list of function names
to report
:type functions: list(str)
"""
df = self._df
if functions:
return df.loc[df.index.get_level_values(1).isin(functions)]
else:
return df
[docs]
@AnalysisHelpers.plot_method
def plot_profiling_stats(self, functions: str=None, metrics: str='avg'):
"""
Plot functions profiling metrics for the specified kernel functions.
For each speficied metric a barplot is generated which report the value
of the metric when the kernel function has been executed on each CPU.
By default all the kernel functions are plotted.
:param functions: the name of list of name of kernel functions to plot
:type functions: str or list(str)
:param metrics: the metrics to plot
avg - average execution time
time - total execution time
:type metrics: list(str)
"""
df = self.df_functions_stats(functions)
# Check that all the required metrics are acutally availabe
available_metrics = df.columns.tolist()
if not set(metrics).issubset(set(available_metrics)):
msg = f'Metrics {(set(metrics) - set(available_metrics))} not supported, available metrics are {available_metrics}'
raise ValueError(msg)
def plot_metric(metric):
if metric.upper() == 'AVG':
ylabel = 'Average completion time [us]'
if metric.upper() == 'TIME':
ylabel = 'Total execution time [us]'
data = df[metric.casefold()].unstack()
return hv.Bars(data).options(
title=title,
ylabel=ylabel,
xlabel='CPU',
invert_axes=True,
)
return reduce(mul, map(plot_metric, metrics)).options(
title='Execution time stats',
)
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80