# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" Trace Parser Module """
import shutil
import uuid
import sys
import re
import gc
import math
import abc
import copy
import io
import os
import os.path
import json
import inspect
import shlex
import contextlib
import tempfile
from functools import wraps
from collections.abc import Set, Mapping, Sequence, Iterable
from operator import itemgetter, attrgetter
from numbers import Number, Integral, Real
import multiprocessing
import textwrap
import subprocess
import itertools
import functools
import fnmatch
import typing
from pathlib import Path
from difflib import get_close_matches
import weakref
import atexit
import threading
import warnings
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
import types
import numpy as np
import pandas as pd
import pyarrow.lib
import pyarrow.parquet
import polars as pl
import devlib
from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, unzip_into, order_as
from lisa.conf import SimpleMultiSrcConf, LevelKeyDesc, KeyDesc, TopLevelKeyDesc, Configurable
from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to_polars, _df_to_pandas, _df_to, _polars_df_in_memory, Timestamp
from lisa.version import VERSION_TOKEN
from lisa._typeclass import FromString
from lisa._kmod import LISADynamicKmod
from lisa._assets import get_bin
def _deprecated_warn(msg, **kwargs):
warnings.warn(msg, DeprecationWarning, **kwargs)
def __getattr__(name):
if name == 'TaskID':
_deprecated_warn(f'TaskID has been moved to lisa.analysis.tasks.TaskID, update your import accordingly')
from lisa.analysis.tasks import TaskID
return TaskID
else:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
_DEALLOCATORS = weakref.WeakSet()
_DEALLOCATORS_LOCK = threading.RLock()
def _dealloc_all():
with _DEALLOCATORS_LOCK:
for deallocator in _DEALLOCATORS:
deallocator.run()
atexit.register(_dealloc_all)
def _file_cleanup(paths):
for path in paths:
try:
shutil.rmtree(path)
except Exception:
pass
def _identity(x):
return x
def _make_identity():
return _identity
class _Deallocator:
def __init__(self, f, on_del=True, at_exit=True):
self.f = f
self._on_del = on_del
if at_exit:
with _DEALLOCATORS_LOCK:
_DEALLOCATORS.add(self)
self._lock = threading.Lock()
def run(self):
with self._lock:
if (f := self.f) is not None:
self.f = None
f()
def __del__(self):
if self._on_del:
self.run()
# We never make a new copy. This is required so that only a single
# deallocator ever handles a given resource.
def __copy__(self):
return self
def __deepcopy__(self, memo):
return self
class _LazyFrameOnDelete(_Deallocator):
@classmethod
def _attach(cls, df, f):
return df.map_batches(
cls(f),
streamable=True,
validate_output_schema=False,
)
@classmethod
def attach_file_cleanup(cls, df, paths):
paths = sorted(set(paths))
if paths:
df = cls._attach(
df,
functools.partial(
_file_cleanup,
paths,
)
)
return df
def __call__(self, x):
return x
def __reduce__(self):
# Replace itself with an identity function upon deserialization, so
# that we don't accidentally end up with 2 objects owning the same set
# of paths.
return (_make_identity, tuple())
def _make_hardlink(src, dst):
try:
os.link(src, dst)
except FileExistsError:
pass
def _logical_plan_resolve_paths(cache, plan, kind):
swap_dir = Path(cache.swap_dir).resolve()
def normalize(url):
url = str(url)
_url = urlparse(url)
scheme = _url.scheme or 'file'
if scheme == 'file':
url = _url.path
return (scheme, url)
hardlinks_base = Path(uuid.uuid4().hex)
hardlinks = set()
def update_path(path):
if kind == 'dump':
scheme, path = normalize(path)
if scheme == 'file':
path = Path(path)
try:
path = path.relative_to(swap_dir)
except ValueError:
return path
else:
# Remove the "hardlinks" part of the path so we point at the file
# in the cache
if path.parts[0] == 'hardlinks':
path = Path(path.name)
assert not path.is_absolute()
path = 'PATH_IN_LISA_CACHE' / path
return path
else:
return path
elif kind == 'load':
scheme, path = normalize(path)
if scheme == 'file':
path = Path(path)
if path.parent.name == 'PATH_IN_LISA_CACHE':
path = Path(path.name)
# Create a hardlink to the data so that the data backing the
# LazyFrame we are reloading is guaranteed to stay around long
# enough and will not be scrubbed away.
hardlink_base, hardlink_path = cache._hardlink_path(
hardlinks_base,
path.name,
)
_make_hardlink(swap_dir / path, hardlink_path)
hardlinks.add(hardlink_base)
return hardlink_path
# This path comes from somewhere else on the system so do not
# rewrite it
else:
return path
else:
return path
else:
raise ValueError(f'Unknown kind {kind}')
plan = _logical_plan_update_paths(plan, update_path=update_path)
return (plan, hardlinks)
def _logical_plan_update_paths(plan, update_path):
def fixup_scans(obj):
if isinstance(obj, Mapping):
try:
scan = obj['Scan']
except KeyError:
return fixup_scans(obj.values())
else:
scan['paths'] = [
str(update_path(path))
for path in scan['paths']
]
elif isinstance(obj, str):
return
elif isinstance(obj, Iterable):
for value in obj:
fixup_scans(value)
plan = plan.copy()
fixup_scans(plan)
return plan
def _convert_df_from_parser(df, parser, cache):
def to_polars(df):
if isinstance(df, pd.DataFrame):
df.index.name = 'Time'
df = _df_to_polars(df)
return df
def move_to_cache(df, cache):
"""
Move data files backing a :class:`polars.LazyFrame` into the trace
cache, so that freeing the temporary storage used by the trace parser
does not break the returned objects.
"""
def update_path(path, hardlinks_base, hardlinks):
cache_path = cache.insert_disk_only(
spec=dict(
# This unique key ensures we will never accidentally re-use
# that path for anything else.
key=uuid.uuid4().hex,
),
compute_cost=None,
)
hardlink_base, hardlink_path = cache._hardlink_path(hardlinks_base, cache_path.name)
# Move the original file under the hardlinks/ folder
shutil.move(path, hardlink_path)
# Hardlink the new path to a normal swap entry
_make_hardlink(hardlink_path, cache_path)
# Use the path under hardlinks/ so that if the swap entry gets
# scrubed away, the existing LazyFrame will not break as they
# will refer to the ones maintained by the trace.
hardlinks.add(hardlink_base)
return hardlink_path
def fixup(df):
if isinstance(df, pl.LazyFrame) and parser._STEAL_FILES:
# We can only steal files if we have a swap to put it into
try:
cache.swap_dir
except (AttributeError, ValueError):
# If we cannot move the backing data to the swap folder, we
# are forced to just load the data eagerly as backing
# storage (e.g. tmp folder) will probably disappear
df = df.collect()
else:
hardlinks = set()
df = _lazyframe_rewrite(
df=df,
update_plan=functools.partial(
_logical_plan_update_paths,
update_path=functools.partial(
update_path,
hardlinks_base=uuid.uuid4().hex,
hardlinks=hardlinks
)
)
)
# TODO: Ideally we should only attach that to the LazyFrame
# right before leaking it in df_event, otherwise we end up
# serializing in the cache the deallocator as well. This is
# not a huge problem though, but it does mean that we will
# end up with 2 layers of "map_batches(identity)" on
# LazyFrames reloaded from the cache.
df = _LazyFrameOnDelete.attach_file_cleanup(df, hardlinks)
return to_polars(df)
return fixup(df)
df = _ParsedDataFrame.from_df(df)
return df.with_df(
move_to_cache(df.df, cache=cache)
)
def _lazyframe_rewrite(df, update_plan):
assert isinstance(df, pl.LazyFrame)
# TODO: once this is solved, we can just inspect the plan rather than
# serialize()/deserialize() in JSON
# https://github.com/pola-rs/polars/issues/9771
plan = df.serialize()
plan = json.loads(plan)
plan = update_plan(plan)
plan = json.dumps(plan)
plan = io.StringIO(plan)
df = pl.LazyFrame.deserialize(plan)
return df
CPU = newtype(int, 'CPU', doc='Alias to ``int`` used for CPU IDs')
class _CPUSeqFromStringInstance(FromString, types=(typing.List[CPU], typing.Sequence[CPU])):
from_str = FromString(typing.Sequence[int]).from_str
@classmethod
def get_format_description(cls, short):
return FromString(typing.Sequence[int]).get_format_description(short=short)
class _AllEvents(Iterable):
def __iter__(self):
return iter([])
_ALL_EVENTS = _AllEvents()
class _ParsedDataFrame:
def __init__(self, df, **meta):
self.df = df
self.meta = {
'mem_cacheable': True,
'swap_cacheable': True,
**meta,
}
def with_df(self, df):
return self.__class__(
df=df,
**self.meta,
)
@classmethod
def from_df(cls, df, **meta):
if isinstance(df, cls):
meta = {
**df.meta,
**meta,
}
df = df.df
return cls(df, **meta)
[docs]
class TraceParserBase(abc.ABC, Loggable, PartialInit):
"""
Abstract Base Class for trace parsers.
:param events: Iterable of events to parse. An empty iterable can be
passed, in which case some metadata may still be available. If
``_ALL_EVENTS`` is passed, the caller may subsequently call
:meth:`parse_all_events`.
:param events: collections.abc.Iterable(str)
:param needed_metadata: Set of metadata name to gather in the parser.
:type needed_metadata: collections.abc.Iterable(str)
The parser will be used as a context manager whenever it is queried for
either events dataframes. Querying for metadata could happen immediately
after object creation, but without expectation of success. Expensive
metadata should only be computed when the object is used as a context
manager. Note that the same parser object might be used as a context
manager multiple times in its lifetime.
"""
_STEAL_FILES = False
"""
If ``True``, files backing a :class:`polars.LazyFrame` will be stolen by
:class:`~lisa.trace.Trace` to add them to the cache.
"""
METADATA_KEYS = [
'time-range',
'symbols-address',
'cpus-count',
'available-events',
'trace-id',
]
"""
Possible metadata keys
"""
def __init__(self, events, temp_dir, needed_metadata=None, path=None):
# pylint: disable=unused-argument
self._requested_metadata = set(needed_metadata or [])
self._requested_events = events if events is _ALL_EVENTS else set(events)
self._temp_dir = Path(temp_dir)
[docs]
def get_parser_id(self):
"""
Get the unique ID of that parser. Any parameter affecting the output
dataframes or metadata must be somehow part of that ID, so that the
cache is not accidentally hit with stale data.
"""
# We rely on having a PartialInit object here, since TraceParserBase
# inherits from it.
assert isinstance(self, PartialInit)
cls = type(self)
id_ = '+'.join(
f'{attr}={val!r}'
for attr, val in sorted(self.__dict__['_kwargs'].items())
)
id_ = id_.encode('utf-8')
id_ = checksum(io.BytesIO(id_), 'md5')
return f'{cls.__module__}.{cls.__qualname__}-{id_}'
[docs]
@abc.abstractmethod
def parse_event(self, event):
"""
Parse the given event from the trace and return a
:class:`pandas.DataFrame` with the following columns:
* ``Time`` index: floating point absolute timestamp in seconds. The
index *must not* have any duplicated values.
* One column per event field, with the appropriate dtype.
* Columns prefixed with ``__``: Header of each event, usually containing the following fields:
* ``__cpu``: CPU number the event was emitted from
* ``__pid``: PID of the current process scheduled at the time the event was emitted
* ``__comm``: Task command name going with ``__pid`` at the point the event was emitted
:param event: name of the event to parse
:type event: str
:raises MissingTraceEventError: If the event cannot be parsed.
.. note:: The caller is free to modify the index of the data, and it
must not affect other dataframes.
"""
[docs]
def parse_all_events(self):
"""
Parse all available events.
.. note:: A parser that does not support querying the
``available-events`` metadata may raise an exception. This might
also lead to multilple scans of the trace in some implementations.
"""
try:
events = self.get_metadata('available-events')
return self.parse_events(events)
except Exception:
raise NotImplementedError(f'{self.__class__.__qualname__} parser does not support parsing all events')
[docs]
def parse_events(self, events, best_effort=False, **kwargs):
"""
Same as :meth:`parse_event` but taking a list of events as input, and
returning a mapping of event names to :class:`pandas.DataFrame` for
each.
:param events: Event names to parse.
:type events: list(str)
:param best_effort: If ``True``, do not raise
:exc:`MissingTraceEventError`, silently skip the event.
Must default to ``False``.
:type best_effort: bool
:Variable keyword arguments: Forwarded to :meth:`parse_event`
"""
def parse(event):
try:
return self.parse_event(event, **kwargs)
except MissingTraceEventError:
if best_effort:
return None
else:
raise
return {
event: df
for event, df in (
(event, parse(event))
for event in events
)
if df is not None
}
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
return
[docs]
class MockTraceParser(TraceParserBase):
"""
Mock parser that just returns the dataframes it was given.
:param dfs: Dictionary of :class:`pandas.DataFrame` for each event that the
parser can handle.
:type dfs: dict(str, pandas.DataFrame)
:param path: Useless for now, but it's part of the Trace API, and it will
be used for the dataframe cache as well.
:type path: str or None
:param events: Unused.
:param events: collections.abc.Iterable(str)
:param time_range: Time range of the trace in seconds. If not specified,
the min and max timestamp of all ``dfs`` will be extracted, but it can
lead to wrong analysis results (especially for signals that are not
updated very often).
:type time_range: tuple(float, float)
:Variable keyword arguments: Forwarded to :class:`TraceParserBase`
As a subclass of :class:`lisa.utils.PartialInit`, its constructor supports
being applied to a partial set of parameters, leaving the rest to the
internals of :class:`lisa.trace.Trace`::
dfs = {
'sched_wakeup': pd.DataFrame.from_records(
[
(0, 1, 1, 'task1', 'task1', 1, 1, 1),
(1, 2, 1, 'task1', 'task1', 1, 1, 2),
(2, 4, 2, 'task2', 'task2', 2, 1, 4),
],
columns=('Time', '__cpu', '__pid', '__comm', 'comm', 'pid', 'prio', 'target_cpu'),
index='Time',
),
}
trace = Trace(parser=MockTraceParser(dfs))
print(trace.df_event('sched_wakeup'))
"""
@kwargs_forwarded_to(TraceParserBase.__init__)
def __init__(self, dfs, time_range=None, events=None, path=None, **kwargs):
self.dfs = {
event: _df_to_polars(df)
for event, df in dfs.items()
}
# "path" is useless for now, but it's part of the Trace API, and it will
# be used for the dataframe cache as well.
#
# "events" is
super().__init__(events=events, **kwargs)
self._time_range = time_range
@property
def _available_events(self):
return set(self.dfs.keys())
[docs]
def parse_event(self, event):
try:
return self.dfs[event]
except KeyError as e:
raise MissingTraceEventError(
[event],
available_events=self._available_events
)
[docs]
class TraceDumpError(Exception):
"""
Exception containing errors forwarded from the trace-dump parser
"""
def __init__(self, errors, event=None, cmd=None):
self.errors = sorted(set(errors))
self.event = event
self.cmd = cmd
def __str__(self):
event = self.event
errors = self.errors
nr_errors = len(errors)
cmd = self.cmd
cmd = ' '.join(map(shlex.quote, map(str, cmd))) if cmd else ''
try:
errors, = errors
except ValueError:
errors = '\n'.join(map(str, errors))
else:
errors = str(errors)
if event:
if nr_errors > 1:
sep = '\n '
errors = errors.replace('\n', sep)
return f'{cmd}{event}:{sep}{errors}'
else:
return f'{cmd}{event}: {errors}'
else:
return errors
[docs]
class TraceDumpTraceParser(TraceParserBase):
"""
trace.dat parser shipped by LISA
"""
_STEAL_FILES = True
@kwargs_forwarded_to(TraceParserBase.__init__)
def __init__(self, path, events, needed_metadata=None, **kwargs):
super().__init__(events=events, needed_metadata=needed_metadata, **kwargs)
self._trace_path = str(Path(path).resolve())
self._metadata = {}
def __enter__(self):
temp_dir = self._temp_dir
events = self._requested_events
path = self._trace_path
needed_metadata = (
self._requested_metadata -
self._metadata.keys()
)
meta = {}
# time-range will not be available in the basic metadata, this requires
# a full parse
if events or events is _ALL_EVENTS:
meta = self._make_parquets(
path=path,
events=events,
temp_dir=temp_dir
)
for desc in meta['events-info']:
_path = desc.get('path')
if _path is not None:
_path = temp_dir / _path
event = desc['event']
elif needed_metadata:
meta = self._make_metadata(
path=path,
temp_dir=temp_dir,
needed_metadata=needed_metadata,
)
self._metadata.update(meta)
return self
def __exit__(self, *args, **kwargs):
pass
[docs]
@classmethod
def from_dat(cls, path, events, **kwargs):
return cls(path=path, events=events, **kwargs)
@classmethod
def _run(cls, cli_args, temp_dir):
logger = cls.get_logger()
trace_dump = get_bin('trace-dump')
errors_path = 'errors.json'
cmd = (
trace_dump,
'--errors-json', errors_path,
*cli_args,
)
pretty_cmd = ' '.join(map(shlex.quote, map(str, cmd)))
def log(stdout, stderr):
stderr = stderr.decode()
logger.debug(f'{pretty_cmd}:\n{stderr}')
try:
completed = subprocess.run(
cmd,
check=True,
capture_output=True,
cwd=temp_dir,
)
except subprocess.CalledProcessError as e:
log(e.stdout, e.stderr)
raise
else:
stdout = completed.stdout
log(stdout, completed.stderr)
return stdout
finally:
try:
with open(temp_dir / errors_path) as f:
errors = json.load(f)
except FileNotFoundError:
pass
else:
# Die on general errors that are not associated with a specific
# event
if errors := errors['errors']:
raise TraceDumpError(errors, cmd=cmd)
@classmethod
def _process_metadata(cls, meta):
try:
start, end = meta['time-range']
except KeyError:
pass
else:
start = Timestamp(start, unit='ns', rounding='down')
end = Timestamp(end, unit='ns', rounding='up')
meta['time-range'] = (start, end)
try:
meta['pid-comms'] = dict(meta['pid-comms'])
except KeyError:
pass
# symbols-address is stored as a list of items in JSON, since JSON
# objects can only have string keys.
try:
meta['symbols-address'] = dict(meta['symbols-address'])
except KeyError:
pass
try:
events_info = meta['events-info']
except KeyError:
pass
else:
# The parser reports all the events that appeared in the trace even
# if they were not collected.
meta['available-events'] = {
desc['event']
for desc in events_info
if not desc.get('errors')
}
return meta
@classmethod
def _make_metadata(cls, path, temp_dir, needed_metadata):
stdout = cls._run(
cli_args=(
'metadata',
'--trace', path,
*(
arg
for key in sorted(set(needed_metadata or []))
for arg in ('--key', key)
)
),
temp_dir=temp_dir,
)
return cls._process_metadata(json.loads(stdout))
@classmethod
def _make_parquets(cls, events, path, temp_dir):
# Let the parser parse all available events
if events is _ALL_EVENTS:
events = []
else:
events = [
arg
for event in sorted(set(events))
# If we don't enable the source of the meta event, the
# parser will reject the source and we will never get a
# chance to get the meta event itself, so we need both.
for _event in Trace.get_event_sources(event)
for arg in ('--event', _event)
]
cls._run(
cli_args=(
'parquet',
'--trace', path,
'--compression', 'lz4',
'--unique-timestamps',
*events,
),
temp_dir=temp_dir,
)
with open(temp_dir / 'meta.json') as f:
meta = json.load(f)
meta = cls._process_metadata(meta)
return meta
@property
@memoized
def _event_descs(self):
return {
desc['event']: desc
for desc in self._metadata['events-info']
}
@property
@memoized
def _pid_comms(self):
pid_comms = {
0: '<idle>',
**self._metadata['pid-comms']
}
class Mapper(dict):
# This avoids getting NaN and is consistent with trace-cmd display
def __missing__(self, _):
return '<...>'
return Mapper(pid_comms)
[docs]
def parse_event(self, event):
try:
desc = self._event_descs[event]
except KeyError:
raise MissingTraceEventError([event])
else:
pid_comms = self._pid_comms
temp_dir = Path(self._temp_dir)
if errors := desc.get('errors', []):
# Only raise a TraceDumpError if the event is contained within
# the trace but we had problems parsing it out
raise TraceDumpError(errors, event=event)
else:
if (path := desc.get('path')) is None:
raise FileNotFoundError(f'No parquet file for event "{event}"')
else:
df = pl.scan_parquet(temp_dir / path)
df = self._fixup_df(
event=event,
df=df,
pid_comms=pid_comms,
)
return _ParsedDataFrame.from_df(
df=df,
swap_cacheable=True,
mem_cacheable=True,
nr_rows=desc['nr-rows'],
)
def _fixup_df(self, event, df, pid_comms):
df = df.rename({
'common_ts': 'Time',
'common_pid': '__pid',
'common_cpu': '__cpu',
})
df = df.with_columns([
pl.col('Time').cast(pl.Duration("ns")),
pl.col('__pid').replace(pid_comms).alias('__comm')
])
df = df.drop('common_type', 'common_flags', 'common_preempt_count')
monotonic_clocks = (
'local',
'global',
'uptime',
'x86-tsc',
'mono',
'mono_raw',
'counter'
)
if self._metadata.get('trace-clock') in monotonic_clocks:
df = df.set_sorted('Time')
else:
df = df.sort('Time')
# Turn all string columns into categorical columns, since strings are
# typically extremely repetitive
categorical_cols = [
col
for col, dtype in df.schema.items()
if isinstance(dtype, (pl.String, pl.Binary))
]
if categorical_cols:
df = df.with_columns([
pl.col(col).cast(pl.Categorical)
for col in categorical_cols
])
return df
[docs]
class EventParserBase:
"""
Base class for trace event parser.
Required attributes or properties:
* ``event``: name of the event
* ``regex``: full regex to parse a line of the event
* ``fields``: mapping of field names to :mod:`pandas` dtype to use for
the :class:`pandas.DataFrame` column.
"""
PARSER_REGEX_TERMINALS = dict(
# To be used with the re.ASCII regex flag
identifier=r'\w+',
integer=r'\d+',
floating=r'\d+\.\d+',
blank=r' +',
)
"""
Snippets of regex to be used in building more complex regexes in textual trace parsers.
.. note:: Meant to be used with the :data:`re.ASCII` flags.
"""
def __init__(self, event, fields):
self.event = event
self.fields = fields
[docs]
class TxtEventParser(EventParserBase):
"""
Trace event parser for raw output of ``trace-cmd report -R trace.dat``.
:param event: name of the event
:type event: str
:param fields: mapping of field name to :class:`pandas.DataFrame` column
dtype to use for each.
:type fields: dict(str, str)
:param positional_field: Name of the positional field. If ``None``, no
positional field will be parsed.
:type positional_field: str or None
:param greedy_field: Name of a greedy field that will consume the
remainder of the line, no matter what the content is. This allows
parsing events with a field containing a string formatted itself as
an event
:type greedy_field: str or None
:param raw: If ``True``, `trace-cmd report` will be used in raw mode. This
usually ensures compliance with the format, but may sometimes be a problem.
For exampe ``const char*`` are displayed as an hex pointer in raw mode,
which is not helpful.
:type raw: bool
Parses events with the following format::
devlib: <idle>-0 [001] 76.214046: sched_wakeup: something here: comm=watchdog/1 pid=15 prio=0 success=1 target_cpu=1
\\____/ \\___________________________________________/ \\____________/ \\_________________________________________________/
buffer header positional fields
(optional)
"""
def __init__(self, event, fields, positional_field=None, greedy_field=None, raw=True):
super().__init__(
event=event,
fields=fields,
)
regex = self._get_regex(event, fields, positional_field, greedy_field)
self.regex = re.compile(regex, flags=re.ASCII)
self.raw = raw
@property
def bytes_regex(self):
"""
Same as ``regex`` but acting on :class:`bytes` instead of :class:`str`.
"""
regex = self.regex
return re.compile(
self.regex.pattern.encode('ascii'),
flags=regex.flags,
)
@classmethod
def _get_fields_regex(cls, event, fields, positional_field, greedy_field):
"""
Returns the regex to parse the fields part of the event line.
:param event: name of the event
:type event: str
:param fields: Mapping of field names to dataframe column dtype
:type fields: dict(str, str)
:param positional_field: Name to give to the positional field column,
or ``None`` if it should not be parsed.
:type positional_field: str or None
:param greedy_field: Name of a greedy field that will consume the
remainder of the line, no matter what the content is. This allows
parsing events with a field containing a string formatted itself as
an event
:type greedy_field: str or None
"""
# pylint: disable=unused-argument
fields = fields.keys() - {positional_field}
if fields:
def combine(fields):
return r'(?:{})+'.format(
'|'.join(fields)
)
def make_regex(field):
if field == greedy_field:
return r'{field}=(?P<{field}>.*)'.format(
field=re.escape(field),
**cls.PARSER_REGEX_TERMINALS
)
else:
# The non-capturing group with positive lookahead is
# necessary to be able to correctly collect spaces in the
# values of fields
return r'{field}=(?P<{field}>.+?)(?:{blank}|$)'.format(
field=re.escape(field),
**cls.PARSER_REGEX_TERMINALS
)
fields_regexes = list(map(make_regex, fields))
# Catch-all field that will consume any unknown field, allowing for
# partial parsing (both for performance/memory consumption and
# forward compatibility)
fields_regexes.append(r'{identifier}=\S+?(?:{blank}|$)'.format(
other_fields='|'.join(fields),
**cls.PARSER_REGEX_TERMINALS
))
fields = combine(fields_regexes)
else:
fields = ''
if positional_field:
# If there are more fields to match, use the first ":" or spaces as
# separator, otherwise just consume everything
if fields:
fields = fr' *:? *{fields}'
fields = r'(?P<{pos}>.*?){fields}$'.format(pos=positional_field, fields=fields, **cls.PARSER_REGEX_TERMINALS)
return fields
@classmethod
def _get_header_regex(cls, event):
"""
Return the regex for the header of the event.
:param event: Name of the event
:type event: str
"""
blank = cls.PARSER_REGEX_TERMINALS['blank']
regex_map = dict(
__comm=r'.+',
__pid=cls.PARSER_REGEX_TERMINALS['integer'],
__cpu=cls.PARSER_REGEX_TERMINALS['integer'],
__timestamp=cls.PARSER_REGEX_TERMINALS['floating'],
__event=re.escape(event),
)
compos = {
field: fr'(?P<{field}>{regex})'
for field, regex in regex_map.items()
}
# We don't need to capture these ones as they have already been parsed
# in the skeleton dataframe, and fixed up for __timestamp
compos.update(
(field, regex)
for field, regex in regex_map.items()
if field in ('__timestamp', '__event')
)
regex = r'(?:(?:^.*?:)|^){blank}{__comm}-{__pid}{blank}\[{__cpu}\]{blank}{__timestamp}:{blank}{__event}:'.format(**compos, blank=blank)
return regex
def _get_regex(self, event, fields, positional_field, greedy_field):
"""
Return the full regex to parse the event line.
This includes both the header and the fields part.
"""
fields = self._get_fields_regex(event, fields, positional_field, greedy_field)
header = self._get_header_regex(event)
return r'{header} *{fields}'.format(header=header, fields=fields, **self.PARSER_REGEX_TERMINALS)
[docs]
class PrintTxtEventParser(TxtEventParser):
"""
Event parser for the folling events, displayed in non-raw format by
``trace-cmd``:
* ``print``
* ``bprint``
* ``bputs``
.. note:: ``bputs`` and ``print`` could be parsed in raw format, but that
would make them harder to parse (function resolution needed), and
``bprint`` is just impossible to parse in raw format, since the data to
interpolate the format string with are not displayed by ``trace-cmd``.
"""
def __init__(self, event, func_field, content_field):
fields = {
func_field: 'string',
# Use bytes so that we can easily parse meta events out of it
# without conversion
content_field: 'bytes',
}
self._func_field = func_field
self._content_field = content_field
super().__init__(event=event, fields=fields, raw=False)
def _get_fields_regex(self, event, fields, positional_field, greedy_field):
return fr'(?P<{self._func_field}>.*?): *(?P<{self._content_field}>.*)'
[docs]
class CustomFieldsTxtEventParser(TxtEventParser):
"""
Subclass of :class:`TxtEventParser` to be used for funky formats.
When the format of the textual event does not respect at all the raw
``trace-cmd`` format, and if raw format cannot be used (e.g. because of
``const char*`` fields), this class provides a way out. For example, this
event can be parsed with this class, but would be impossible to be parse
using :class:`TxtEventParser`::
# non-raw format lacks a field delimiter for the "reason"
kworker/u16:6-262 [003] 177.417147: ipi_raise: target_mask=00000000,00000020 (Function call interrupts)
# raw format, even less usable because of the const char* pointer not being resolved
kworker/u16:6-262 [003] 177.417147: ipi_raise: target_cpus=ARRAY[20, 00, 00, 00, 00, 00, 00, 00] reason=0xffffff8c0774fe6b
.. note:: Use :class:`TxtEventParser` if possible, since it provides a more
optimized fields regex than what you are likely to come up with, and
can deal with missing fields.
:param event: Name of the event.
:type event: str
:param fields_regex: Regex to parse the fields part of the event occurence.
Regex groups are used to delimit fields, e.g.
``r"field1=(?P<field1>[0-9]+)"`` would recognize ``"field1=42"`` as a
``field1`` column.
:type fields_regex: str
:param fields: Mapping of field names (group names in the regex) to dtype
to use in the :class:`pandas.DataFrame`. This is passed to
:func:`lisa.datautils.series_convert` so the accepted values are a bit
wider than :mod:`pandas` dtypes.
:type fields: dict(str, object)
:param raw: If ``True``, the event will be parsed as raw by ``trace-cmd``.
If you have ``const char*`` fields, this must be ``False`` in order to
get the string instead of the pointer.
:type raw: bool
"""
def __init__(self, event, fields_regex, fields, raw):
self._fields_regex = fields_regex
super().__init__(event=event, fields=fields, raw=raw)
def _get_fields_regex(self, event, fields, positional_field, greedy_field):
return self._fields_regex
[docs]
class TxtTraceParserBase(TraceParserBase):
"""
Text trace parser base class.
:param lines: Iterable of text lines as :class:`bytes`.
:type lines: collections.abc.Iterable(bytes)
:param events: List of events that will be available using
:meth:`parse_event`. If not provided, all events will be considered.
.. note:: Restricting the set of events can speed up some operations.
:type events: list(str)
:param event_parsers: Pre-built event parsers. Missing event parsers will
be inferred from the fields parsed in the trace, which is costly and
can lead to using larger dtypes than necessary (e.g. ``int64`` rather
than ``uint16``).
.. seealso:: :class:`TxtEventParser`
:type event_parsers: list(EventParserBase)
:param default_event_parser_cls: Class used to build event parsers inferred from the trace.
:type default_event_parser_cls: type
:param pre_filled_metadata: Metadata pre-filled by the caller of the
constructor.
:type pre_filled_metadata: dict(str, object) or None
"""
_KERNEL_DTYPE = {
'timestamp': 'uint64',
'pointer': 'uint64',
'symbol': 'string',
'cpu': 'uint16',
'pid': 'uint32',
'signed_pid': 'int32',
'comm': 'string',
'cgroup_path': 'string',
# prio in [-1, 140]
'prio': 'int16',
'util': 'uint16',
}
"""
Dtypes for various columns that occur frequently in multiple events.
"""
HEADER_FIELDS = {
'__comm': 'string',
'__pid': 'uint32',
'__cpu': 'uint32',
'__timestamp': 'float64',
'__event': 'string',
}
"""
Pandas dtype of the header fields.
"""
DTYPE_INFERENCE_ORDER = ['int64', 'uint64', 'float64']
"""
When the dtype of a field is not provided by a user-defined parser, these
dtypes will be tried in order to convert the column from string to
something more appropriate.
.. note:: ``uint64`` allows testing for hexadecimal formatting of numbers.
"""
DEFAULT_EVENT_PARSER_CLS = None
"""
Class used to create event parsers when inferred from the trace.
"""
EVENT_DESCS = {}
"""
Mapping of event names to parser description as a dict.
Each event description can include the constructor parameters of the class
used as :attr:`DEFAULT_EVENT_PARSER_CLS`, which will be used to build event
parsers from the descriptions.
If an instance of :class:`EventParserBase` is passed instead of a dict, it
will be used as is.
"""
_RE_MATCH_CLS = re.Match
@kwargs_forwarded_to(TraceParserBase.__init__)
def __init__(self,
lines,
events=None,
needed_metadata=None,
event_parsers=None,
default_event_parser_cls=None,
pre_filled_metadata=None,
**kwargs,
):
needed_metadata = set(needed_metadata or [])
super().__init__(events, needed_metadata=needed_metadata, **kwargs)
self._pre_filled_metadata = pre_filled_metadata or {}
events = set(events or [])
if events or needed_metadata - {'trace-id'}:
default_event_parser_cls, event_parsers = self._resolve_event_parsers(event_parsers, default_event_parser_cls)
# Remove all the parsers that are unnecessary
event_parsers = {
event: parser
for event, parser in event_parsers.items()
if event in events
}
# If we don't need the fields in the skeleton df, avoid collecting them
# to save memory and speed things up
need_fields = (events != event_parsers.keys())
skeleton_regex = self._get_skeleton_regex(need_fields)
self.logger.debug(f'Scanning the trace for metadata {needed_metadata} and events: {events}')
events_df, skeleton_df, time_range, available_events = self._eagerly_parse_lines(
lines=lines,
skeleton_regex=skeleton_regex,
event_parsers=event_parsers,
events=events,
)
self._events_df = events_df
self._time_range = time_range
self._skeleton_df = skeleton_df
self._available_events = available_events
inferred_event_descs = self._get_event_descs(skeleton_df, events, event_parsers)
# We only needed the fields to infer the descriptors, so let's drop
# them to lower peak memory usage
with contextlib.suppress(KeyError):
self._skeleton_df = self._skeleton_df.drop(['__fields'])
event_parsers = {
**{
event: default_event_parser_cls(
event=event,
**desc,
)
for event, desc in inferred_event_descs.items()
},
# Existing parsers take precedence so the user can override
# autodetected events
**event_parsers,
}
self._event_parsers = event_parsers
else:
self._events_df = {}
self._time_range = None
self._skeleton_df = None
self._available_events = None
self._event_parsers = {}
@classmethod
def _resolve_event_parsers(cls, event_parsers, default_event_parser_cls):
default_event_parser_cls = default_event_parser_cls or cls.DEFAULT_EVENT_PARSER_CLS
event_descs = list(cls.EVENT_DESCS.items() or [])
# Hack so that we use the format of the "lisa__<event>" event even for
# non-LISA events. This allows compatibility with old traces where
# events generated by LISA were not namespaced with the prefix.
extra_descs = [
(event[len('lisa__'):], desc)
for event, desc in event_descs
if event.startswith('lisa__')
]
event_descs = event_descs + extra_descs
event_parsers = {
**{
event: (
desc
if isinstance(desc, EventParserBase)
else default_event_parser_cls(event=event, **desc)
)
for event, desc in event_descs
},
**{
parser.event: parser
for parser in event_parsers or []
}
}
return (default_event_parser_cls, event_parsers)
[docs]
@PartialInit.factory
@kwargs_forwarded_to(__init__, ignore=['lines'])
def from_string(cls, txt, path=None, **kwargs):
"""
Build an instance from a single multiline string.
:param txt: String containing the trace. It will be encoded as ASCII
before being forwarded to the constructor if it's not already
:class:`bytes`.
:type txt: bytes or str
:Variable keyword arguments: Forwarded to ``__init__``
"""
# The text could already be bytes, in which case this will fail
with contextlib.suppress(AttributeError):
txt = txt.encode('ascii')
return cls(lines=txt.splitlines(), **kwargs)
[docs]
@PartialInit.factory
@kwargs_forwarded_to(__init__, ignore=['lines'])
def from_txt_file(cls, path, **kwargs):
"""
Build an instance from a path to a text file.
:Variable keyword arguments: Forwarded to ``__init__``
"""
with open(path, 'rb') as f:
return cls(lines=f, **kwargs)
@abc.abstractmethod
def _get_skeleton_regex(self, need_fields):
"""
Return a :class:`bytes` regex that provides the following groups:
* ``__event``: name of the event
* ``__timestamp``: timestamp of event occurence
* ``__fields`` if ``need_fields == True``: a string containing all
the named fields of each event occurence.
.. note:: This regex is critical for performances as it will be used to
scan the whole trace.
"""
@staticmethod
def _make_df_from_data(regex, data, extra_cols=None):
extra_cols = extra_cols or []
columns = sorted(
regex.groupindex.keys(),
# Order columns so that we can directly append the
# groups() tuple of the regex match
key=lambda field: regex.groupindex[field]
)
# Rename regex columns to avoid clashes that were explicitly added as
# extra
columns = [
f'__parsed_{col}' if col in extra_cols else col
for col in columns
]
columns += extra_cols
df = pl.DataFrame(
data,
orient='row',
schema=columns,
# Scan the entire dataset to figure out the best dtype
infer_schema_length=None,
).lazy()
df = df.with_columns(
pl.col(col).cast(pl.String)
for col, dtype in df.schema.items()
if isinstance(dtype, pl.Binary)
)
# Put the timestamp first so it's recognized as the index
df = df.select(
order_as(columns, ['__timestamp'])
)
return df
def _eagerly_parse_lines(self, lines, skeleton_regex, event_parsers, events, time=None, time_unit='s'):
"""
Filter the lines to select the ones with events.
Also eagerly parse events from them to avoid the extra memory
consumption from line storage, and to speed up parsing by acting as a
pipeline on lazy lines stream.
"""
# Recompile all regex so that they work on bytes rather than strings.
# This simplifies the rest of the code while allowing the raw output
# from a process to be fed
def encode(string):
return string.encode('ascii')
events = list(map(encode, events))
event_parsers = {
encode(event): parser
for event, parser in event_parsers.items()
}
# Only add an extra iterator and tuple unpacking if that is strictly
# necessary, as it comes with a performance cost
time_is_provided = time is not None
skel_search = skeleton_regex.search
if time_is_provided:
lines = zip(time, lines)
drop_filter = lambda line: not skel_search(line[1])
else:
drop_filter = lambda line: not skel_search(line)
# First, get rid of all the lines coming before the trace
lines = itertools.dropwhile(drop_filter, lines)
# Appending to lists is amortized O(1). Inside the list, we store
# tuples since they are:
# 1) the most compact Python representation of a product type
# 2) output directly by regex.search()
skeleton_data = []
events_data = {
**{event: (None, None) for event in events},
**{
event: (parser.bytes_regex.search, [])
for event, parser in event_parsers.items()
},
}
available_events = set()
begin_time = None
end_time = None
# THE FOLLOWING LOOP IS A THE MOST PERFORMANCE-SENSITIVE PART OF THAT
# CLASS, APPLY EXTREME CARE AND BENCHMARK WHEN MODIFYING
# Best practices:
# - resolve all dotted names ahead of time
# - minimize the amount of local variables. Prefer anonymous
# expressions
# - Catch exceptions for exceptional cases rather than explicit check
# Pre-lookup methods out of the loop to speed it up
append = list.append
group = self._RE_MATCH_CLS.group
groups = self._RE_MATCH_CLS.groups
inf = math.inf
prev_time = Timestamp(0, unit='ns')
parse_time = '__timestamp' in skeleton_regex.groupindex.keys()
for line in lines:
if time_is_provided:
line_time, line = line
match = skel_search(line)
# Stop at the first non-matching line
try:
event = group(match, '__event')
# The line did not match the skeleton regex, so skip it
except TypeError:
if b'EVENTS DROPPED' in line:
raise DroppedTraceEventError('The trace buffer got overridden by new data, increase the buffer size to ensure all events are recorded')
# Unknown line, could be coming e.g. from stderr
else:
continue
else:
if not time_is_provided:
line_time = Timestamp(
group(match, '__timestamp').decode('utf-8')
)
# Do a global deduplication of timestamps, across all
# events regardless of the one we will parse. This ensures
# stable results and joinable dataframes from multiple
# parser instance.
line_time = line_time.as_nanoseconds
if line_time <= prev_time:
line_time += prev_time - line_time + 2
prev_time = line_time
if begin_time is None:
begin_time = line_time
# If we can parse it right away, let's do it now
try:
search, data = events_data[event]
append(
data,
# Add the fixedup time
groups(search(line)) + (line_time,)
)
# If we don't have a parser for it yet (search == None),
# just store the line so we can infer its parser later
except TypeError:
# Add the fixedup time and the full line for later
# parsing as well
append(
skeleton_data,
groups(match) + (line_time, line)
)
# We are not interested in that event, but we still remember the
# pareseable events
except KeyError:
available_events.add(event)
# This should have been set on the first line.
# Note: we don't raise the exception if no events were asked for, to
# allow creating dummy parsers without any line
if begin_time is None and events:
raise ValueError('No lines containing events have been found')
available_events = {
event.decode('ascii')
for event in available_events
}
end_time = line_time
available_events.update(
event.decode('ascii')
for event, (search, data) in events_data.items()
if data
)
events_df = {}
for event, parser in event_parsers.items():
try:
# Remove the tuple data from the dict as we go, to free memory
# before proceeding to the next event to smooth the peak memory
# consumption
_, data = events_data.pop(event)
except KeyError:
pass
else:
decoded_event = event.decode('ascii')
df = self._make_df_from_data(parser.regex, data, ['__timestamp'])
# Post-process immediately to shorten the memory consumption
# peak
df = self._postprocess_df(
decoded_event,
parser,
df,
)
events_df[decoded_event] = df
# Compute the skeleton dataframe for the events that have not been
# parsed already. It contains the event name, the time, and potentially
# the fields if they are needed
skeleton_df = self._make_df_from_data(skeleton_regex, skeleton_data, ['__timestamp', 'line'])
skeleton_df = skeleton_df.with_columns(
pl.col('__event').cast(pl.Categorical)
)
# Drop unnecessary columns that might have been parsed by the regex
to_keep = {'__timestamp', '__event', '__fields', 'line'}
skeleton_df = skeleton_df.select(sorted(to_keep & set(skeleton_df.columns)))
# This is very fast on a category dtype
available_events.update(
skeleton_df.select(
pl.col('__event').unique()
).collect()['__event']
)
if time_is_provided:
begin_time = Timestamp(begin_time, unit=time_unit)
end_time = Timestamp(end_time, unit=time_unit)
else:
begin_time = Timestamp(begin_time, unit='ns')
end_time = Timestamp(end_time, unit='ns')
return (events_df, skeleton_df, (begin_time, end_time), available_events)
def _lazily_parse_event(self, event, parser, df):
# Only parse the lines that have a chance to match
df = df.filter(
pl.col('__event') == event
)
pattern = parser.bytes_regex.pattern.decode('utf-8')
new_df = df.select((
pl.col('__timestamp'),
pl.col('line').str.strip_chars().str.extract_groups(pattern),
)).unnest('line')
new_df = self._postprocess_df(event, parser, new_df)
return new_df
@staticmethod
def _get_event_descs(df, events, event_parsers):
user_supplied = event_parsers.keys()
all_events = events is None
if not all_events and set(events) == user_supplied:
return {}
else:
def encode(string):
return string.encode('ascii')
# Find the field names only for the events we don't already know about,
# since the inference is relatively expensive
if all_events:
df = df.filter(
pl.col('__event').is_in(user_supplied).not_()
)
else:
events = set(events) - user_supplied
df = df.filter(
pl.col('__event').is_in(events)
)
fields_regex = r'({identifier})='.format(
**TxtEventParser.PARSER_REGEX_TERMINALS
)
df = df.with_columns(
pl.col('__fields')
.cast(pl.String)
.str.extract_all(fields_regex)
# We can't match groups and extract all occurences at the
# moment, so we postprocess the field name instead:
# https://github.com/pola-rs/polars/issues/11857
.list.eval(
pl.element()
.str.strip_chars_end('=')
)
)
df = df.group_by('__event').agg(
pl.col('__fields')
.list.explode()
.unique()
)
def make_desc(event, fields):
new = dict(
positional_field=None,
fields=dict.fromkeys(fields, None)
)
return new
dct = {
event: make_desc(event, fields)
for event, fields in df.select((
pl.col('__event').cast(pl.String),
pl.col('__fields'),
)).collect().iter_rows()
}
return dct
[docs]
def parse_event(self, event):
try:
parser = self._event_parsers[event]
except KeyError as e:
raise MissingTraceEventError([event])
# Maybe it was eagerly parsed
try:
df = self._events_df[event]
except KeyError:
df = self._lazily_parse_event(event, parser, self._skeleton_df)
# Everything resides in memory anyway, so make that clear to the _Trace
# infrastructure so it does not accidentally tries to serialize a
# LazyFrame as a huge JSON.
df = df.collect()
# Since there is no way to distinguish between no event entry and
# non-collected events in text traces, map empty dataframe to missing
# event
if len(df):
return df
else:
raise MissingTraceEventError([event])
@classmethod
def _postprocess_df(cls, event, parser, df):
assert isinstance(df, pl.LazyFrame)
def get_representative(df, col):
df = (
df
.select(col)
.filter(
pl.col(col).is_null().not_()
)
.head(1)
.collect()
)
try:
return df.item()
except ValueError:
return None
def infer_schema(df):
# Select a subset of the dataframe that has no null value in it, so
# we can accurately infer the dtype.
df = pl.DataFrame({
col: [get_representative(df, col)]
for col in df.columns
})
# Ugly hack: dump the first row to CSV to infer the schema of the
# dataframe based on text data, since that's what we have.
bytes_io = io.BytesIO()
df.write_csv(bytes_io)
df = pl.read_csv(bytes_io)
schema = df.schema
schema = {
# Cast all strings as categorical since they are typically very
# repetitive
col: pl.Categorical if isinstance(dtype, (pl.String, pl.Binary)) else dtype
for col, dtype in schema.items()
}
return schema
# Polars will fail to convert strings with trailing spaces (e.g. "42 "
# into 42), so ensure we clean that up before conversion
df = df.with_columns(
pl.col(col).str.strip_chars()
for col, dtype in df.schema.items()
if isinstance(dtype, pl.String)
)
df = df.with_columns(
pl.col(name).cast(dtype)
for name, dtype in infer_schema(df).items()
)
df = df.rename({'__timestamp': 'Time'})
if event == 'sched_switch':
if isinstance(df.schema['prev_state'], (pl.String, pl.Categorical)):
# Avoid circular dependency issue by importing at the last moment
# pylint: disable=import-outside-toplevel
from lisa.analysis.tasks import TaskState
df = df.with_columns(
pl.col('prev_state').map_elements(
TaskState.from_sched_switch_str,
return_dtype=pl.UInt64,
).cast(pl.UInt16)
)
elif event in ('sched_overutilized', 'lisa__sched_overutilized'):
df = df.with_columns(
pl.col('overutilized').cast(pl.Boolean)
)
if isinstance(df.schema.get('span'), (pl.String, pl.Binary, pl.Categorical)):
df = df.with_columns(
pl.col('span')
.cast(pl.String)
.str.strip_chars()
.str.strip_prefix('0x')
.str.to_integer(base=16)
)
elif event in ('thermal_power_cpu_limit', 'thermal_power_cpu_get_power'):
# In-kernel name is "cpumask", "cpus" is just an artifact of the pretty
# printing format string of ftrace, that happens to be used by a
# specific parser.
df = df.rename({'cpus': 'cpumask'})
if event == 'thermal_power_cpu_get_power':
if isinstance(df.schema['load'], (pl.String, pl.Binary, pl.Categorical)):
df = df.with_columns(
# Parse b'{2 3 2 8}'
pl.col('load')
.cast(pl.String)
.str.strip_chars('{}')
.str.split(' ')
.list.eval(
pl.element()
.str.to_integer()
)
)
elif event in ('ipi_entry', 'ipi_exit'):
df = df.with_columns(
pl.col('reason').str.strip_chars('()')
)
return df
[docs]
class TxtTraceParser(TxtTraceParserBase):
"""
Text trace parser for the raw output of ``trace-cmd report -R trace.dat``.
:param lines: Iterable of text lines.
:type lines: collections.abc.Iterable(str)
:param events: List of events that will be available using
:meth:`~TxtTraceParserBase.parse_event`. If not provided, all events will be considered.
.. note:: Restricting the set of events can speed up some operations.
:type events: list(str) or None
:param event_parsers: Pre-built event parsers. Missing event parsers will
be inferred from the fields parsed in the trace, which is costly and can
lead to using larger dtypes than necessary (e.g. ``int64`` rather than
``uint16``).
.. seealso:: :class:`TxtEventParser`
:type event_parsers: list(EventParserBase)
:param default_event_parser_cls: Class used to build event parsers inferred from the trace.
"""
DEFAULT_EVENT_PARSER_CLS = TxtEventParser
_KERNEL_DTYPE = TxtTraceParserBase._KERNEL_DTYPE
EVENT_DESCS = {
'print': PrintTxtEventParser(
event='print',
func_field='ip',
content_field='buf',
),
'bprint': PrintTxtEventParser(
event='bprint',
func_field='ip',
content_field='buf',
),
'bputs': PrintTxtEventParser(
event='bputs',
func_field='ip',
content_field='str',
),
'thermal_power_cpu_limit': dict(
fields={
'cpus': 'bytes',
'freq': 'uint32',
'cdev_state': 'uint64',
'power': 'uint32',
},
# Allow parsing the cpus bitmask
raw=False,
),
'thermal_power_cpu_get_power': dict(
fields={
'cpus': 'bytes',
'freq': 'uint32',
'load': 'bytes',
'dynamic_power': 'uint32',
},
# Allow parsing the cpus bitmask and load array
raw=False,
),
'cpuhp_enter': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'target': 'uint16',
'idx': 'uint16',
'fun': _KERNEL_DTYPE['symbol'],
},
),
'funcgraph_entry': dict(
fields={
'func': _KERNEL_DTYPE['pointer'],
'depth': 'uint16',
},
),
'funcgraph_exit': dict(
fields={
'func': _KERNEL_DTYPE['pointer'],
'depth': 'uint16',
'overrun': 'bool',
'calltime': 'uint64',
'rettime': 'uint64',
},
),
'ipi_entry': dict(
fields={
'reason': 'string',
},
positional_field='reason',
# const char* reason is not displayed properly in raw mode
raw=False,
),
'ipi_exit': dict(
fields={
'reason': 'string',
},
positional_field='reason',
# const char* reason is not displayed properly in raw mode
raw=False,
),
'ipi_raise': CustomFieldsTxtEventParser(
event='ipi_raise',
fields_regex=r'target_mask=(?P<target_cpus>[0-9,]+) +\((?P<reason>[^)]+)\)',
fields={
'target_cpus': 'bytes',
'reason': 'string',
},
raw=False,
),
'sched_switch': dict(
fields={
'prev_comm': _KERNEL_DTYPE['comm'],
'prev_pid': _KERNEL_DTYPE['pid'],
'prev_prio': _KERNEL_DTYPE['prio'],
'prev_state': 'uint16',
'next_comm': _KERNEL_DTYPE['comm'],
'next_pid': _KERNEL_DTYPE['pid'],
'next_prio': _KERNEL_DTYPE['prio'],
},
),
'sched_wakeup': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'pid': _KERNEL_DTYPE['pid'],
'prio': _KERNEL_DTYPE['prio'],
'target_cpu': _KERNEL_DTYPE['cpu'],
# This field does exist but it's useless nowadays as it has a
# constant value of 1, so save some memory by just not parsing
# it
# 'success': 'bool',
},
),
'task_rename': dict(
fields={
'pid': _KERNEL_DTYPE['pid'],
'oldcomm': _KERNEL_DTYPE['comm'],
'newcomm': _KERNEL_DTYPE['comm'],
},
),
'cpu_frequency': dict(
fields={
'cpu_id': _KERNEL_DTYPE['cpu'],
'state': 'uint32',
},
),
'cpu_idle': dict(
fields={
'cpu_id': _KERNEL_DTYPE['cpu'],
# Technically, the value used in the kernel can go down to -1,
# but it ends up stored in an unsigned type (why ? ...), which
# means we actually get 4294967295 in the event, as an unsigned
# value. Therefore, it's up to sanitization to fix that up ...
'state': 'int64',
},
),
'sched_compute_energy': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'dst_cpu': _KERNEL_DTYPE['cpu'],
'energy': 'uint64',
'pid': _KERNEL_DTYPE['pid'],
'prev_cpu': _KERNEL_DTYPE['cpu'],
},
),
'lisa__sched_cpu_capacity': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'capacity': _KERNEL_DTYPE['util'],
'capacity_orig': _KERNEL_DTYPE['util'],
'capacity_curr': _KERNEL_DTYPE['util'],
},
),
'lisa__sched_pelt_cfs': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'load': _KERNEL_DTYPE['util'],
'path': _KERNEL_DTYPE['cgroup_path'],
'rbl_load': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
'update_time': _KERNEL_DTYPE['timestamp'],
},
),
'lisa__sched_pelt_se': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'cpu': _KERNEL_DTYPE['cpu'],
'load': _KERNEL_DTYPE['util'],
'path': _KERNEL_DTYPE['cgroup_path'],
'pid': _KERNEL_DTYPE['signed_pid'],
'rbl_load': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
'update_time': _KERNEL_DTYPE['timestamp'],
},
),
'sched_migrate_task': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'dest_cpu': _KERNEL_DTYPE['cpu'],
'orig_cpu': _KERNEL_DTYPE['cpu'],
'pid': _KERNEL_DTYPE['pid'],
'prio': _KERNEL_DTYPE['prio'],
},
),
'lisa__sched_overutilized': dict(
fields={
'overutilized': 'bool',
'span': 'string',
},
),
'lisa__sched_pelt_dl': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'load': _KERNEL_DTYPE['util'],
'rbl_load': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
},
),
'lisa__sched_pelt_irq': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'load': _KERNEL_DTYPE['util'],
'rbl_load': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
},
),
'lisa__sched_pelt_rt': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'load': _KERNEL_DTYPE['util'],
'rbl_load': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
},
),
'sched_process_wait': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'pid': _KERNEL_DTYPE['pid'],
'prio': _KERNEL_DTYPE['prio'],
},
),
'lisa__sched_util_est_cfs': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'path': _KERNEL_DTYPE['cgroup_path'],
'enqueued': _KERNEL_DTYPE['util'],
'ewma': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
},
),
'lisa__sched_util_est_se': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'comm': _KERNEL_DTYPE['comm'],
'pid': _KERNEL_DTYPE['signed_pid'],
'path': _KERNEL_DTYPE['cgroup_path'],
'enqueued': _KERNEL_DTYPE['util'],
'ewma': _KERNEL_DTYPE['util'],
'util': _KERNEL_DTYPE['util'],
},
),
'sched_wakeup_new': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'pid': _KERNEL_DTYPE['pid'],
'prio': _KERNEL_DTYPE['prio'],
'success': 'bool',
'target_cpu': _KERNEL_DTYPE['cpu'],
},
),
'sched_waking': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'pid': _KERNEL_DTYPE['pid'],
'prio': _KERNEL_DTYPE['prio'],
'success': 'bool',
'target_cpu': _KERNEL_DTYPE['cpu'],
},
),
'lisa__uclamp_util_cfs': dict(
fields={
'cpu': _KERNEL_DTYPE['cpu'],
'uclamp_avg': _KERNEL_DTYPE['util'],
'uclamp_max': _KERNEL_DTYPE['util'],
'uclamp_min': _KERNEL_DTYPE['util'],
'util_avg': _KERNEL_DTYPE['util'],
},
),
'lisa__uclamp_util_se': dict(
fields={
'comm': _KERNEL_DTYPE['comm'],
'cpu': _KERNEL_DTYPE['cpu'],
'pid': _KERNEL_DTYPE['pid'],
'uclamp_avg': _KERNEL_DTYPE['util'],
'uclamp_max': _KERNEL_DTYPE['util'],
'uclamp_min': _KERNEL_DTYPE['util'],
'util_avg': _KERNEL_DTYPE['util'],
},
),
}
[docs]
@PartialInit.factory
@kwargs_forwarded_to(TxtTraceParserBase.__init__, ignore=['lines'])
def from_dat(cls, path, events, needed_metadata=None, event_parsers=None, default_event_parser_cls=None, **kwargs):
"""
Build an instance from a path to a trace.dat file created with
``trace-cmd``.
:Variable keyword arguments: Forwarded to ``__init__``
.. note:: We unfortunately cannot use ``-F`` filter option to
pre-filter on some events, since global timestamp deduplication has
to happen. The returned dataframe must be stable, because it could
be reused in another context (cached on disk), and the set of
events in a :class:`Trace` object can be expanded dynamically.
"""
bin_ = get_bin('trace-cmd')
needed_metadata = set(needed_metadata or [])
default_event_parser_cls, event_parsers = cls._resolve_event_parsers(event_parsers, default_event_parser_cls)
event_parsers = event_parsers.values()
pre_filled_metadata = {}
if 'symbols-address' in needed_metadata:
# Get the symbol addresses in that trace
def parse(line):
addr, sym = line.split(' ', 1)
addr = int(addr, base=16)
sym = sym.strip()
return (addr, sym)
symbols_address = dict(
parse(line)
for line in subprocess.check_output(
[bin_, 'report', '-N', '-f', '--', path],
stderr=subprocess.DEVNULL,
universal_newlines=True,
).splitlines()
)
# If we get only "0" as a key, that means kptr_restrict was in use and
# no useable address is available
if symbols_address.keys() != {0}:
pre_filled_metadata['symbols-address'] = symbols_address
if 'cpus-count' in needed_metadata:
regex = re.compile(rb'cpus=(?P<cpus>\d+)')
with subprocess.Popen(
[bin_, 'report', '-N', '--', path],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
) as p:
try:
match, _ = take(1, itertools.filterfalse(None, map(regex.search, p.stdout)))
except ValueError:
pass
else:
pre_filled_metadata['cpus-count'] = match.group('cpus')
kwargs.update(
events=events,
needed_metadata=needed_metadata,
event_parsers=event_parsers,
default_event_parser_cls=default_event_parser_cls,
pre_filled_metadata=pre_filled_metadata,
)
cmd = cls._tracecmd_report(
bin_=bin_,
path=path,
events=events,
event_parsers=event_parsers,
default_event_parser_cls=default_event_parser_cls,
# We unfortunately need to parse every single line in order to
# ensure each event has a unique timestamp in the trace, as pandas
# cannot deal with duplicated timestamps. Having unique timestamps
# inside an event dataframe is not enough as dataframes of
# different events can be combined.
filter_events=False,
)
# A fairly large buffer reduces the interaction overhead
bufsize = 10 * 1024 * 1024
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=bufsize) as p:
# Consume the lines as they come straight from the stdout object to
# avoid the memory overhead of storing the whole output in one
# gigantic string
return cls(lines=p.stdout, **kwargs)
@classmethod
def _tracecmd_report(cls, bin_, path, events, event_parsers=None, default_event_parser_cls=None, filter_events=True):
if not os.path.exists(path):
raise FileNotFoundError(f'Unable to locate specified trace file: {path}')
# Make sure we only ask to trace-cmd events that can exist, otherwise
# it might bail out and give nothing at all, especially with -F
kernel_events = {
event.split(':', 1)[1]
for event in subprocess.check_output(
[bin_, 'report', '-N', '-E', '--', path],
stderr=subprocess.DEVNULL,
universal_newlines=True,
).splitlines()
if not event.startswith('version =')
}
events = kernel_events if events is None else set(events)
events &= kernel_events
filter_events &= (events != kernel_events)
default_event_parser_cls, event_parsers = cls._resolve_event_parsers(event_parsers, default_event_parser_cls)
def use_raw(event):
try:
parser = event_parsers[event]
except KeyError:
# If we don't have a known parser, use the raw output by
# default, since it will be either the same as human readable,
# or unparseable without a dedicated parser.
return True
else:
return parser.raw
if filter_events:
filter_ = list(itertools.chain.from_iterable(
('-F', event)
for event in events
))
else:
filter_ = []
raw_events = list(itertools.chain.from_iterable(
('-r', event) if use_raw(event) else []
for event in events
))
cmd = [
bin_,
'report',
# Do not load any plugin, so that we get fully reproducible results
'-N',
# Full accuracy on timestamp
'-t',
# Event filter
*filter_,
# All events in raw format
*raw_events,
'--', path
]
return cmd
def _get_skeleton_regex(self, need_fields):
regex = r'\] +(?P<__timestamp>{floating}): *(?P<__event>{identifier}):'.format(**TxtEventParser.PARSER_REGEX_TERMINALS)
if need_fields:
regex += r' *(?P<__fields>.*)'
return re.compile(regex.encode('ascii'), flags=re.ASCII)
[docs]
class SimpleTxtTraceParser(TxtTraceParserBase):
"""
Simple text trace parser (base) class.
:param lines: Lines of the text to parse.
:type lines: collections.abc.Iterable(str)
:param events: List of events that will be potentially parsed by
:meth:`~TraceParserBase.parse_events`. If ``None``, all available
will be considered but that may increase the initial parsing stage.
:type events: list(str)
:param event_parsers: Optional list of :class:`TxtTraceParserBase` to
provide fully customized event regex.
.. note:: See :data:`EVENT_DESCS` for class-provided special case
handling.
:type event_parsers: list(TxtTraceParserBase)
:param header_regex: Regex used to parse the header of each event. See
:data:`HEADER_REGEX` documentation.
:type header_regex: str
.. note:: This class is easier to customize than :class:`TxtTraceParser`
but may have higher processing time and peak memory usage.
"""
EVENT_DESCS = {}
"""
Mapping of event names to parser description as a dict.
Each event description can include the following dict keys:
* ``header_regex``: Regex to parse the event header. If not set, the
header regex from the trace parser will be used.
* ``fields_regex``: Regex to parse the fields part of the event (i.e.
the part after the header). This is the most commonly modified
setting to take into account special cases in event formatting.
* ``fields``: Mapping of field names to :class:`pandas.DataFrame`
column dtype. This allows using a smaller dtype or the use of a
non-inferred dtype like ``boolean``.
* ``positional_field``: Name of the positional field (comming before
the named fields). If ``None``, the column will be suppressed in the
parsed dataframe.
"""
HEADER_REGEX = None
"""
Default regex to use to parse event header.
It must parse the following groups:
* ``__timestamp``: the timestamp of the event
* ``__event``: the name of the event
* ``__cpu`` (optional): the CPU by which the event was emitted
* ``__pid`` (optional): the currently scheduled PID at the point the event was emitted
* ``__comm`` (optional): the currently scheduled task's name at the point the event was emitted
.. note:: It must *not* capture the event fields, as it will be
concatenated with the field regex of each event to parse full lines.
"""
# Removing default_event_parser_cls restricts the API of __init__, so that
# means that inherited alternative constructors such as from_txt_file would
# need to be overridden to be strictly accurate too.
@kwargs_forwarded_to(TxtTraceParserBase.__init__, ignore=['default_event_parser_cls'])
def __init__(self, header_regex=None, **kwargs):
header_regex = header_regex or self.HEADER_REGEX
self.header_regex = header_regex
# Do not parse for each event unnecessary columns that are already
# parsed in the skeleton dataframe
regex = header_regex
for field in ('__timestamp', '__event'):
regex = regex.replace(fr'(?P<{field}>', r'(?:')
event_parser_header_regex = regex
class SimpleTxtEventParser(TxtEventParser):
def __init__(self, header_regex=event_parser_header_regex, fields_regex=None, **kwargs):
self.fields_regex = fields_regex
self.header_regex = header_regex
super().__init__(**kwargs)
def _get_header_regex(self, event):
return self.header_regex
def _get_fields_regex(self, *args, **kwargs):
if self.fields_regex:
return self.fields_regex
else:
return super()._get_fields_regex(*args, **kwargs)
super().__init__(
default_event_parser_cls=SimpleTxtEventParser,
**kwargs,
)
def _get_skeleton_regex(self, need_fields):
# Parse the whole header, which is wasteful but provides a simpler interface
regex = self.header_regex + r' *(?P<__fields>.*)'
return re.compile(regex.encode('ascii'), flags=re.ASCII)
[docs]
class HRTxtTraceParser(SimpleTxtTraceParser):
"""
Parse text trace in their human readable format (as opposed to the raw
format).
The accepted format is the one produced by the kernel, after formatting
event records with their format string. This means that format strings
deviating from the classic ``field=value`` format need a custom regex.
.. note:: This parser is provided for convenience but is probably not
complete. More specifically, it does not contain custom regex for all the
events which format deviates from the raw format as output by ``trace-cmd
report -R``.
For a better supported format, see :class:`TxtTraceParser`.
"""
_KERNEL_DTYPE = SimpleTxtTraceParser._KERNEL_DTYPE
EVENT_DESCS = {
'sched_switch': dict(
fields_regex=r'prev_comm=(?P<prev_comm>.+?) +prev_pid=(?P<prev_pid>\d+) +prev_prio=(?P<prev_prio>\d+) +prev_state=(?P<prev_state>[^ ]+) ==> next_comm=(?P<next_comm>.+?) +next_pid=(?P<next_pid>\d+) +next_prio=(?P<next_prio>\d+)',
fields={
'prev_comm': _KERNEL_DTYPE['comm'],
'prev_pid': _KERNEL_DTYPE['pid'],
'prev_prio': _KERNEL_DTYPE['prio'],
'prev_state': 'string',
'next_comm': _KERNEL_DTYPE['comm'],
'next_pid': _KERNEL_DTYPE['pid'],
'next_prio': _KERNEL_DTYPE['prio'],
},
),
'tracing_mark_write': dict(
fields={
'buf': 'bytes',
},
positional_field='buf',
),
}
HEADER_REGEX = r'\s*(?P<__comm>.+)-(?P<__pid>\d+)[^[]*\[(?P<__cpu>\d*)\][^\d]+(?P<__timestamp>\d+\.\d+): +(?P<__event>\w+):'
[docs]
class SysTraceParser(HRTxtTraceParser):
"""
Parse Google's systrace format.
.. note:: This parser is based on :class:`HRTxtTraceParser` and is
therefore provided for convenience but may lack some events custom
field regex.
"""
[docs]
@PartialInit.factory
@wraps(HRTxtTraceParser.from_txt_file)
def from_html(cls, *args, **kwargs):
return super().from_txt_file(*args, **kwargs)
class _InternalTraceBase(abc.ABC):
"""
Base class for common functionalities between :class:`Trace` and
:class:`_TraceViewBase`.
:Attributes:
* ``start``: The timestamp of the first trace event.
* ``end``: The timestamp of the last trace event.
* ``basetime``: Absolute timestamp when the tracing started. This might
differ from ``start`` as the latter can be affected by various
normalization or windowing features.
* ``endtime``: Absolute timestamp when the tracing stopped. It has
similar characteristics as ``basetime``.
"""
def __init__(self):
pass
@property
def trace_state(self):
"""
State of the trace object that might impact the output of dataframe
getter functions like :meth:`lisa.trace.TraceBase.df_event`.
It must be hashable and serializable to JSON, so that it can be
recorded when analysis methods results are cached to the swap.
"""
return None
@property
def time_range(self):
"""
Duration of that trace.
"""
return self.end - self.start
@property
def window(self):
"""
Same as ``(trace.start, trace.end)``.
This is handy to pass to functions expecting a window tuple.
"""
return (self.start, self.end)
@property
def available_events(self):
return _AvailableTraceEventsSet(self)
def get_view(self, **kwargs):
view = _TraceViewBase.make_view(self, **kwargs)
assert isinstance(view, _TraceViewBase)
return view
@abc.abstractmethod
def _internal_df_event(self, event, **kwargs):
"""
Internal function creating the :class:`polars.LazyFrame` for the given
``event`` and returning associated metadata.
Unrecognized keyword arguments should be passed down to the parent
trace view when that's relevant or simply ignored.
"""
pass
@abc.abstractmethod
def _preload_events(self, events):
pass
def __getitem__(self, window):
if not isinstance(window, slice):
raise TypeError("Cropping window must be an instance of slice")
if window.step is not None:
raise ValueError("Slice step is not supported")
return self.get_view(window=(window.start, window.stop))
@deprecate('Prefer adding delta once signals have been extracted from the event dataframe for correctness',
deprecated_in='2.0',
removed_in='4.0',
replaced_by=df_add_delta,
)
def add_events_deltas(self, df, col_name='delta', inplace=True):
"""
Store the time between each event in a new dataframe column
This function assumes that at time [n] the event starts and at [n+1]
the event stops, so the formula for the returned value is::
| | |
| | |
| | |
------+--------+------------+------
[n-1] [n] [n+1]
delta[n] = index[n+1] - index[n]
:param df: The DataFrame to operate one
:type df: pandas.DataFrame
:param col_name: The name of the column to add
:type col_name: str
:param inplace: Whether to operate on the passed DataFrame, or to use
a copy of it
:type inplace: bool
This method only really makes sense for events tracking an on/off state
(e.g. overutilized, idle)
"""
if col_name in df.columns:
raise RuntimeError(f"Column {col_name} is already present in the dataframe")
return df_add_delta(df, col=col_name, inplace=inplace, window=self.window)
@deprecate('This method has been deprecated and is an alias for "trace.ana.notebook.df_all_events()"',
deprecated_in='2.0',
removed_in='4.0',
replaced_by='lisa.analysis.notebook.NotebookAnalysis.df_all_event',
)
def df_all_events(self, *args, **kwargs):
return self.ana.notebook.df_all_events(*args, **kwargs)
# User-facing
[docs]
class TraceBase(_InternalTraceBase):
[docs]
@abc.abstractmethod
def df_event(self, event, **kwargs):
"""
Get a dataframe containing all occurrences of the specified trace event
in the parsed trace.
:param event: Trace event name.
In addition to actual events, the following formats for meta events
are supported:
* ``trace_printk@``: The event format is described by the
``bprint`` event format string, and the field values are decoded
from the variable arguments buffer. Note that:
* The field values *must* be in the buffer, i.e. the format
string is only used as the event format, no "literal value"
will be extracted from it.
* The event *must* have fields. If not, ``trace_printk()``
will emit a bputs event that will be ignored at the moment.
We need to get a bprint event.
* Field names *must* be unique.
.. code-block:: C
// trace.df_event('trace_printk@myevent')
void foo(void) {
trace_printk("myevent: field1=%s field2=%i", "foo", 42);
}
* ``userspace@``: the event is generated by userspace:
.. code-block:: shell
# trace.df_event('userspace@myevent')
echo "myevent: field1=foo field2=42" > /sys/kernel/debug/tracing/trace_marker
Note that the field names must be unique.
.. note:: All meta event names are expected to be valid C language
identifiers. Usage of other characters will prevent correct
parsing.
:type event: str
:param signals: List of signals to fixup if ``signals_init == True``.
If left to ``None``, :meth:`lisa.datautils.SignalDesc.from_event`
will be used to infer a list of default signals.
:type signals: list(SignalDesc)
:param compress_signals_init: Give a timestamp very close to the
beginning of the sliced dataframe to rows that are added by
``signals_init``. This allows keeping a very close time span
without introducing duplicate indices.
:type compress_signals_init: bool
"""
pass
[docs]
def get_view(self, *args, **kwargs):
"""
Get a view on a trace cropped time-wise to fit in ``window`` and with
event dataframes post processed with ``process_df``.
:param window: Crop the dataframe to include events that are inside the
given window. This includes the event immediately preceding the
left boundary if there is no exact timestamp match. This can also
include more rows before the beginning of the window based on the
``signals`` required by the user. A ``None`` boundary will extend
to the beginning/end of the trace.
:type window: tuple(float or None, float or None) or None
:param signals: List of :class:`lisa.datautils.SignalDesc` to use when
selecting rows before the beginning of the ``window``. This allows
ensuring that all the given signals have a known value at the beginning
of the window.
:type signals: list(lisa.datautils.SignalDesc) or None
:param compress_signals_init: If ``True``, the timestamp of the events
before the beginning of the ``window`` will be compressed to be
either right before the beginning of the window, or at the exact
timestamp of the beginning of the window (depending on the
dataframe library chosen, since pandas cannot cope with more than
one row for each timestamp).
:type compress_signals_init: bool or None
:param normalize_time: If ``True``, the beginning of the ``window``
will become timestamp 0. If no ``window`` is used, the beginning of
the trace is taken as T=0. This allows easier comparison of traces
that were generated with absolute timestamps (e.g. timestamp
related to the uptime of the system). It also allows comparing
various slices of the same trace.
:type normalize_time: bool or None
:param events_namespaces: List of namespaces of the requested events.
Each namespace will be tried in order until the event is found. The
``None`` namespace can be used to specify no namespace. The full
event name is formed with ``<namespace>__<event>``.
:type events_namespaces: list(str or None)
:param events: Preload the given events when creating the view. This
can be advantageous as a single instance of the parser will be
spawned, so if the parser supports it, multiple events will be
parsed in one trace traversal.
:type events: list(str) or lisa.trace.TraceEventCheckerBase or None
:param strict_events: If ``True``, will raise an exception if the
``events`` specified cannot be loaded from the trace. This allows
failing early in trace processing.
:param strict_events: bool or None
:param process_df: Function called on each dataframe returned by
:meth:`lisa.trace.TraceBase.df_event`. The parameters are as follow:
1. Name of the event being queried.
2. A :class:`polars.LazyFrame` of the event.
It is expected to return a :class:`polars.LazyFrame` as well.
:type process_df: typing.Callable[[str, polars.LazyFrame], polars.LazyFrame] or None
:param df_fmt: Format of the dataframes returned by
:meth:`lisa.trace.TraceBase.df_events`. One of:
* ``"pandas"``: :class:`pandas.DataFrame`.
* ``"polars-lazyframe"``: :class:`polars.LazyFrame`.
* ``None``: defaults to ``"pandas"`` for
backward-compatibility.
:type df_fmt: str or None
:Variable arguments: Forwarded to the contructor of the view.
"""
return super().get_view(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='2.0',
removed_in='4.0',
replaced_by='df_event',
)
def df_events(self, *args, **kwargs):
return self.df_event(*args, **kwargs)
@property
@abc.abstractmethod
def ana(self):
pass
@property
@abc.abstractmethod
def analysis(self):
pass
[docs]
def show(self):
"""
Open the parsed trace using the most appropriate native viewer.
The native viewer depends on the specified trace format:
- ftrace: open using kernelshark
- systrace: open using a browser
In both cases the native viewer is assumed to be available in the host
machine.
"""
path = self.trace_path
if not path:
raise ValueError('No trace file is backing this Trace instance')
if path.endswith('.dat'):
cmd = 'kernelshark'
else:
cmd = 'xdg-open'
return os.popen(f"{cmd} {shlex.quote(path)}")
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_name_pids',
)
def get_task_name_pids(self, *args, **kwargs):
return self.ana.tasks.get_task_name_pids(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='2.0',
removed_in='4.0',
replaced_by=get_task_name_pids,
)
def get_task_by_name(self, name):
return self.get_task_name_pids(name, ignore_fork=True)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_pid_names',
)
def get_task_pid_names(self, *args, **kwargs):
return self.ana.tasks.get_task_pid_names(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_by_pid',
)
def get_task_by_pid(self, *args, **kwargs):
return self.ana.tasks.get_task_by_pid(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_ids',
)
def get_task_ids(self, *args, **kwargs):
return self.ana.tasks.get_task_ids(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_id',
)
def get_task_id(self, *args, **kwargs):
return self.ana.tasks.get_task_id(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_task_pid',
)
def get_task_pid(self, *args, **kwargs):
return self.ana.tasks.get_task_pid(*args, **kwargs)
[docs]
@deprecate('This method has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.get_tasks',
)
def get_tasks(self, *args, **kwargs):
return self.ana.tasks.get_tasks(*args, **kwargs)
@deprecate('This property has been deprecated and is an alias',
deprecated_in='3.0',
removed_in='4.0',
replaced_by='lisa.analysis.tasks.TasksAnalysis.task_ids',
)
@property
def task_ids(self):
"""
List of all the :class:`lisa.analysis.tasks.TaskID` in the trace,
sorted by PID.
"""
return self.ana.tasks.task_ids
class _TraceViewBase(_InternalTraceBase):
def __init__(self, trace):
self.base_trace = trace
super().__init__()
def __getattr__(self, name):
return getattr(self.base_trace, name)
@classmethod
def make_view(cls, trace, *, window=None, signals=None, compress_signals_init=None, normalize_time=False, events_namespaces=None, events=None, strict_events=False, process_df=None, df_fmt=None, clear_base_cache=None):
if clear_base_cache is not None:
_deprecated_warn(f'"clear_base_cache" parameter has no effect anymore')
view = trace
# Most views are stacked regardless whether they were needed right away
# in order to have them ready to handle the parameters they add to
# df_event() (via _internal_df_event())
view = _NamespaceTraceView(view, namespaces=events_namespaces)
# Preload events as early as possible in case other views need to
# access some of the costly metadata that would be gathered when
# preloading.
view = _PreloadEventsTraceView(
view,
events=events,
strict_events=strict_events
)
window_cls = _NormalizedTimeTraceView if normalize_time else _WindowTraceView
view = window_cls(
view,
window=window,
signals=signals,
compress_signals_init=compress_signals_init,
)
if process_df:
view = _ProcessTraceView(view, process_df)
return view
@property
def trace_state(self):
return (
self.__class__.__qualname__,
self.base_trace.trace_state,
)
def _preload_events(self, *args, **kwargs):
return self.base_trace._preload_events(*args, **kwargs)
def _internal_df_event(self, *args, **kwargs):
return self.base_trace._internal_df_event(*args, **kwargs)
class _WindowTraceView(_TraceViewBase):
"""
A view on a :class:`Trace`.
:param trace: The base trace.
:type trace: Trace
:param window: The time window to base this view on. If ``None``, the whole
base trace will be selected.
:type window: tuple(float, float) or None
:param process_df: Function used to post process the event dataframes
returned by :meth:`TraceBase.df_event`.
:type process_df: typing.Callable[[str, pandas.DataFrame], pandas.DataFrame] or None
:Attributes:
* ``base_trace``: The original :class`:`Trace` this view is based on.
* ``ana``: The analysis proxy on the trimmed down :class`:`Trace`.
* ``start``: The timestamp of the first trace event in the view (>=
``window[0]``)
* ``end``: The timestamp of the last trace event in the view (<=
``window[1]``)
You can substitute an instance of :class:`Trace` with an instance of
:class:`_WindowTraceView`. This means you can create a view of a trimmed down trace
and run analysis code/plots that will only use data within that window, e.g.::
trace = Trace(...)
view = trace.get_view((2, 4))
# Alias for the above
view = trace[2:4]
# This will only use events in the (2, 4) time window
df = view.ana.tasks.df_tasks_runtime()
"""
def __init__(
self,
trace,
*,
window=None,
signals=None,
compress_signals_init=None,
):
super().__init__(trace)
self._window = window
self._signals = set(signals or [])
self._compress_signals_init = compress_signals_init
# These attributes are absolutely critical for the performance of the view
# stack, but pre-computing them is very harmful as it might trigger a
# trace-parse to find the boundary timestamps before we need to parse a
# dataframe, which can be very costly on a big trace.
@property
@memoized
def start(self):
t_min, _ = self._window or (None, None)
if t_min is None:
return self.base_trace.start
else:
return t_min
@property
@memoized
def end(self):
_, t_max = self._window or (None, None)
if t_max is None:
return self.base_trace.end
else:
end = t_max
# Ensure we never end up with end < start
end = max(end, self.start)
return end
def _fixup_window(self, window):
_start = self.start
_end = self.end
# Add missing window and fill-in None values
if window is None:
start = _start
end = _end
else:
start, end = window
# Clip the window to our own window
start = _start if start is None else max(start, _start)
end = _end if end is None else min(end, _end)
# If we are not restricting more than the base trace, we don't want to
# apply any windowing
if start == self.base_trace.start:
start = None
if end == self.base_trace.end:
end = None
window = (start, end)
if window == (None, None):
window = None
return window
@property
def trace_state(self):
return (
super().trace_state,
self.window,
self._signals,
self._compress_signals_init,
)
def _internal_df_event(self, event, *, df_fmt=None, _legacy_signals=None, _inner_window=False, window=None, compress_signals_init=None, signals_init=None, signals=None, **kwargs):
compress_signals_init = self._compress_signals_init if compress_signals_init is None else compress_signals_init
compress_signals_init = False if compress_signals_init is None else compress_signals_init
_legacy_signals = df_fmt == 'pandas' if _legacy_signals is None else _legacy_signals
if window is not None:
_deprecated_warn('"window", "signals" and "signals_init" are deprecated parameter of Trace.df_event(). Instead, use the matching parameters on trace.get_view(...).df_event(...)')
def fixup_signals(events):
return {
signal_desc
for event in events
for signal_desc in self._fixup_signals(
event=event,
signals=signals,
signals_init=signals_init,
legacy_signals=_legacy_signals,
inner_window=_inner_window,
)
}
_window = self._fixup_window(window)
_signals = self._fixup_signals(
event=event,
signals=signals,
signals_init=signals_init,
legacy_signals=_legacy_signals,
inner_window=_inner_window,
)
df, meta = self.base_trace._internal_df_event(
event,
_inner_window=True,
df_fmt=df_fmt,
# We propagate the list signals to base traces. This will let some
# events "leak into the window" if a window was set, but that is
# necessary to reliably implement self-contained analysis.
# Otherwise, an analysis function would have no way of asking for
# some specific signals it needs to work. Even if the user manually
# specified signals to be friendly to that function, it could still
# break the day the function makes use of a new signal (e.g. using
# a new equivalent signal the legacy user code could not know about
# at the time of writing).
signals=_signals,
# This needs to be passed to the base trace, in case we are not the
# view applying a windowing. This can happen if there is e.g. a
# no-op _WindowTraceView() layer.
compress_signals_init=compress_signals_init,
**kwargs,
)
df = self._apply_window(
df=df,
window=_window,
signals=_signals,
compress_signals_init=compress_signals_init,
)
return (df, meta)
def _fixup_signals(self, event, signals, signals_init, legacy_signals, inner_window):
if legacy_signals:
# Warning-less backward compat for the default parameter value, so
# that existing code will keep running without extra output
if signals_init is None:
signals_init = True
# If the user specified manually, they will get the warning and be
# asked to make a change that will ease transition to polars.
else:
_deprecated_warn('signals_init should not be used anymore. Instead, use trace.get_view(signals=[...]) with the list of signals you need', stacklevel=2)
# Legacy behavior where signals are inferred from the event. We
# only keep this in pandas for backward commpat. New code based
# on polars has to define its own signals, so we don't have to
# maintain an ever-growing list of signals in LISA itself, and
# possibly breaking user code when we add a new one.
signals = list(SignalDesc._from_event(event) if signals is None else signals)
signals = signals if signals_init else []
else:
signals = list(signals or [])
if signals_init is not None:
raise ValueError(f'"signals_init" parameter is only supported for df_fmt="pandas". Use the TraceBase.get_view(signals=...) parameter.')
# If we are in an inner window, it is pointless to add our
# self._signals since they will be discarded by the outer window layer
# (e.g. if the outer layer asked for no signals, it will simply apply a
# normal window and not make use of any extra data we would have
# included)
if inner_window:
pass
# Otherwise, the signals are coming from the user and we should add our
# own, since the user will be able to see their effect
else:
signals = {*signals, *self._signals}
signals = {
signal_desc
for signal_desc in signals
if signal_desc.event == event and signal_desc.fields
}
return signals
def _apply_window(self, df, window, signals, compress_signals_init):
if window is None or window == (None, None):
return df
else:
if signals:
df = df_window_signals(
df,
window=window,
signals=signals,
compress_init=compress_signals_init,
)
else:
df = df_window(df, window, method='pre')
return df
class _ProcessTraceView(_TraceViewBase):
def __init__(self, trace, process_df):
super().__init__(trace)
self._process_df = process_df
@property
def trace_state(self):
f = self._process_df
return (
super().trace_state,
# This likely will be a value that cannot be serialized to JSON if
# it was user-provided. This will prevent caching as it should.
None if f is None else f,
)
def _internal_df_event(self, event, **kwargs):
df, meta = self.base_trace._internal_df_event(event, **kwargs)
if (process := self._process_df):
df = process(event, df)
return (df, meta)
class _PreloadEventsTraceView(_TraceViewBase):
def __init__(self, trace, events=None, strict_events=False):
super().__init__(trace)
trace = self.base_trace
if isinstance(events, str):
raise ValueError('Events passed to Trace(events=...) must be a list of strings, not a string.')
elif events is _ALL_EVENTS:
pass
else:
events = set(events or [])
events = AndTraceEventChecker.from_events(events)
if events or events is _ALL_EVENTS:
preloaded = trace._preload_events(events)
if events is _ALL_EVENTS:
events = AndTraceEventChecker.from_events(set(trace.available_events))
if strict_events:
events.check_events(preloaded)
self._events = events
@property
def events(self):
try:
base_events = self.base_trace.events
except AttributeError:
base_events = set()
else:
base_events = base_events.get_all_events()
events = sorted({*self._events, *base_events})
return AndTraceEventChecker.from_events(events)
class _NamespaceTraceView(_TraceViewBase):
def __init__(self, trace, namespaces):
# Allow using self.base_trace.events
trace = _PreloadEventsTraceView(trace)
super().__init__(trace)
self._events_namespaces = namespaces or []
self._preload_events(self.base_trace.events)
def _preload_events(self, events):
# It is critical to not enter that path if we don't have any actual
# namespaces, otherwise the overhead of the loop will get multiplicated
# at every layer and we end up with large overheads.
if self._events_namespaces:
if events is _ALL_EVENTS:
preloaded = super()._preload_events(events)
if None in self._events_namespaces:
return preloaded
else:
return set()
else:
mapping = {
_event: event
for event in events
for _event in self._expand_namespaces(event)
}
# Preload the events in a single batch. This is critical for
# performance, otherwise we will spin up the parser multiple times.
preloaded = self.base_trace._preload_events(mapping.keys())
return {
# Return the requested name instead of the actual event
# preloaded so that _PreloadEventsTraceView() can check for
# what it actually asked for
mapping.get(_event, _event)
for _event in preloaded
}
else:
return super()._preload_events(events)
@property
def trace_state(self):
return (
super().trace_state,
self._events_namespaces,
)
@property
@memoized
def events_namespaces(self):
try:
base_namespaces = self.base_trace.events_namespaces
except AttributeError:
base_namespaces = (None,)
return deduplicate(
[
*self._events_namespaces,
*base_namespaces,
],
keep_last=False,
)
@classmethod
def _resolve_namespaces(cls, namespaces):
return namespaces or (None,)
def _expand_namespaces(self, event, namespaces=None):
return self._do_expand_namespaces(
event,
self._resolve_namespaces(
namespaces or self._events_namespaces
)
)
@classmethod
def _do_expand_namespaces(cls, event, namespaces):
namespaces = cls._resolve_namespaces(namespaces)
def expand_namespace(event, namespace):
if namespace:
if namespace.endswith('_'):
ns_prefix = namespace
else:
ns_prefix = f'{namespace}__'
if event.startswith(ns_prefix):
return event
else:
return f'{ns_prefix}{event}'
else:
return event
def expand(event, namespaces):
if _Trace._is_meta_event(event):
return [event]
else:
return deduplicate(
[
expand_namespace(event, namespace)
for namespace in namespaces
],
keep_last=False,
)
return expand(event, namespaces)
def _internal_df_event(self, event, namespaces=None, **kwargs):
if namespaces is not None:
_deprecated_warn('"namespaces" is a deprecated parameter of Trace.df_event(). Instead, use trace.get_view(events_namespaces=...).df_event(...)')
if namespaces or self._events_namespaces:
events = self._expand_namespaces(event, namespaces)
events = events or [event]
trace = self.base_trace
# Preload all the events as we likely will only get one match
# anyway, so we can avoid spinning up a parser several times for
# nothing.
events = sorted(trace._preload_events(events))
last_excep = MissingTraceEventError([event])
for _event in events:
try:
return trace._internal_df_event(_event, **kwargs)
except MissingTraceEventError as e:
last_excep = e
else:
raise last_excep
else:
return super()._internal_df_event(event, **kwargs)
class _NormalizedTimeTraceView(_TraceViewBase):
def __init__(self, trace, window, **kwargs):
window = window or (trace.start, None)
try:
start, end = window
except ValueError:
raise ValueError('A window with non-None left bound must be provided with normalize_time=True')
else:
if start is None:
raise ValueError('A window with non-None left bound must be provided with normalize_time=True')
else:
self._offset = start
view = trace.get_view(window=window)
view = self._with_time_offset(view, start)
view = view.get_view(**kwargs)
super().__init__(view)
self.start = 0
self.end = self.base_trace.end - start
@classmethod
def _with_time_offset(cls, trace, start):
# Round down to avoid ending up with negative Time for anything that
# does not actually happen before the start
offset = _polars_duration_expr(start, rounding='down')
def time_offset(event, df):
return df.with_columns(
pl.col('Time') - offset
)
return trace.get_view(
process_df=time_offset
)
@property
def basetime(self):
return self.base_trace.basetime - self._offset
@property
def endtime(self):
return self.base_trace.endtime - self._offset
@property
def trace_state(self):
return (
super().trace_state,
self._offset,
)
@property
def normalize_time(self):
return True
# One might be tempted to make that a subclass of collections.abc.Set: don't.
# The problem is that Set expects new instances to be created by passing an
# iterable to __init__(), but that container cannot be randomly instanciated
# with values, it is tied to a Trace.
class _AvailableTraceEventsSet:
"""
Smart container that uses demand event loading on the trace to check
whether an event is present or not.
This container can be iterated over to get the current available events,
and supports membership tests using ``in``.
"""
def __init__(self, trace):
self._trace = trace
def __contains__(self, event):
return self.contains(event)
def contains(self, event, namespaces=None):
trace = self._trace
view = trace.get_view(events_namespaces=namespaces)
try:
trace._internal_df_event(event=event)
except MissingTraceEventError:
return False
else:
return True
@property
def _available_events(self):
trace = self._trace
# Prime the trace metadata with what the events the parser reports.
# This will not include meta events.
try:
available = trace.get_metadata('available-events')
except MissingMetadataError:
available = []
available = set(available)
# List all the events we have succesfully parsed in the past. This will
# be updated in various places, so the result will not be stable.
try:
parseable = trace._cache.get_metadata('parseable-events')
except MissingMetadataError:
# This won't include meta-events, but better than nothing
return available
else:
return {
_event
for _event, _parseable in parseable.items()
if _parseable
}
def __iter__(self):
return iter(self._available_events)
def __len__(self):
return len(self._available_events)
def __str__(self):
return str(self._available_events)
class _CacheDataDesc(Mapping):
"""
Cached data descriptor.
:param spec: Specification of the data as a key/value mapping.
This holds all the information needed to uniquely identify a
:class:`pandas.DataFrame` or :class:`pandas.Series` or
:class:`polars.LazyFrame` or :class:`polars.DataFrame`. It is used to
manage the cache and swap.
It implements the :class:`collections.abc.Mapping` interface, so
specification keys can be accessed directly like from a dict.
.. note:: Once introduced in a container, instances must not be modified,
directly or indirectly.
:Attributes:
* ``normal_form``: Normal form of the descriptor. Equality is
implemented by comparing this attribute.
"""
def __init__(self, spec, fmt):
if fmt == 'polars-lazyframe':
spec = {
'polars-version': pl.__version__,
**spec
}
self.fmt = fmt
self.spec = spec
self.normal_form = _CacheDataDescNF.from_spec(self.spec, fmt)
def __getitem__(self, key):
return self.spec[key]
def __iter__(self):
return iter(self.spec)
def __len__(self):
return len(self.spec)
def __repr__(self):
return '{}({})'.format(
self.__class__.__name__,
', '.join(
f'{key}={val!r}'
for key, val in self.__dict__.items()
)
)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.normal_form == other.normal_form
else:
return False
def __hash__(self):
return hash(self.normal_form)
class _CacheDataDescNF:
"""
Normal form of :class:`_CacheDataDesc`.
The normal form of the descriptor allows removing any possible differences
in shape of values, and is serializable to JSON. The serialization is
allowed to destroy some information (type mainly), as long as it does make
two descriptors wrongly equal.
"""
def __init__(self, nf, fmt):
assert fmt != _CacheDataSwapEntry.META_EXTENSION
self._fmt = fmt
self._nf = nf
# Since it's going to be inserted in dict for sure, precompute the hash
# once and for all.
self._hash = hash(self._nf)
@classmethod
def from_spec(cls, spec, fmt):
"""
Build from a spec that can include any kind of Python objects.
"""
nf = tuple(sorted(
(key, cls._coerce(val))
for key, val in spec.items()
))
return cls(nf=nf, fmt=fmt)
@classmethod
def _coerce(cls, val):
"Coerce data to a normal form that must be hashable"
if isinstance(val, Integral):
val = int(val)
elif isinstance(val, Real):
val = float(val)
elif isinstance(val, (type(None), str, Number)):
pass
elif isinstance(val, Mapping):
val = tuple(
(cls._coerce(key), cls._coerce(val))
for key, val in sorted(val.items())
)
val = ('mapping', val)
elif isinstance(val, Set):
val = tuple(map(cls._coerce, sorted(val)))
val = ('set', val)
elif isinstance(val, Sequence):
val = tuple(map(cls._coerce, val))
val = ('sequence', val)
# In other cases save the name of the type along the value to make
# sure we are not going to compare apple and oranges in the future
else:
type_name = f'{val.__class__.__module__}.{val.__class__.__qualname__}'
val = (type_name, val)
return val
def __str__(self):
return str(self._nf)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self._fmt == other._fmt and self._nf == other._nf
else:
return False
def __hash__(self):
return self._hash
def to_json_map(self):
return dict(
fmt=self._fmt,
data=self._nf,
)
@classmethod
def _coerce_json(cls, x):
"""
JSON converts the original tuples into lists, so we need to convert it
back.
"""
if isinstance(x, str):
return x
elif isinstance(x, Sequence):
return tuple(map(cls._coerce_json, x))
else:
return x
@classmethod
def from_json_map(cls, mapping):
"""
Build from a mapping that was created using :meth:`to_json_map`.
JSON does not preserve tuples for example, so they need to be converted
back.
"""
fmt = mapping['fmt']
data = dict(mapping['data'])
nf = tuple(sorted(
(key, cls._coerce_json(val))
for key, val in data.items()
))
return cls(nf=nf, fmt=fmt)
class _CannotWriteSwapEntry(Exception):
pass
class _CacheDataSwapEntry:
"""
Entry in the data swap area of :class:`Trace`.
:param cache_desc_nf: Normal form descriptor describing what the entry
contains.
:type cache_desc_nf: _CacheDataDescNF
:param name: Name of the entry. If ``None``, a random UUID will be
generated.
:type name: str or None
:param written: ``True`` if the swap entry is already written on-disk.
:type written: bool
"""
META_EXTENSION = 'meta'
"""
Extension used by the metadata file of the swap entry in the swap.
"""
def __init__(self, cache_desc_nf, name=None, written=False):
self.cache_desc_nf = cache_desc_nf
self.name = name or uuid.uuid4().hex
self.written = written
@property
def meta_filename(self):
"""
Filename of the metadata file in the swap.
"""
return f'{self.name}.{self.META_EXTENSION}'
@property
def fmt(self):
"""
Format of the swap entry.
"""
return self.cache_desc_nf._fmt
@property
def data_filename(self):
"""
Filename of the data file in the swap.
"""
return f'{self.name}.{self.fmt}'
def to_json_map(self):
"""
Return a mapping suitable for JSON serialization.
"""
desc = self.cache_desc_nf.to_json_map()
try:
# Use json.dumps() here to fail early if the descriptor cannot be
# dumped to JSON
desc = json.dumps(desc)
except TypeError as e:
raise _CannotWriteSwapEntry(e)
return {
'version-token': VERSION_TOKEN,
'name': self.name,
'encoded_desc': desc,
}
@classmethod
def from_json_map(cls, mapping, written=False):
"""
Create an instance with a mapping created using :meth:`to_json_map`.
"""
if mapping['version-token'] != VERSION_TOKEN:
raise _TraceCacheSwapVersionError('Version token differ')
desc = json.loads(mapping['encoded_desc'])
cache_desc_nf = _CacheDataDescNF.from_json_map(desc)
name = mapping['name']
return cls(cache_desc_nf=cache_desc_nf, name=name, written=written)
def to_path(self, path):
"""
Save the swap entry metadata to the given ``path``.
"""
data = self.to_json_map()
with open(path, 'w') as f:
json.dump(data, f)
f.write('\n')
@classmethod
def from_path(cls, path):
"""
Load the swap entry metadata from the given ``path``.
"""
with open(path) as f:
mapping = json.load(f)
return cls.from_json_map(mapping, written=True)
class _TraceCacheSwapVersionError(ValueError):
"""
Exception raised when the swap entry was created by another version of LISA
than the one loading it.
"""
class _TraceCache(Loggable):
"""
Cache of a :class:`Trace`.
:param max_mem_size: Maximum amount of memory to use in bytes. If the data
hold in memory exceed that size, they will be evicted to the swap if
possible and if it would be faster to reload them from swap rather than
recomputing them. If there is no swap area, the data is just discarded.
:type max_mem_size: int or None
:param max_swap_size: Maximum amount of swap to use in bytes. When the
amount of data saved to the swap exceeds that threshold, older files
are discarded.
:type max_swap_size: int or None
:param swap_dir: Folder to use as swap area.
:type swap_dir: str or None
:param trace_path: Absolute path of the trace file.
:type trace_path: str or None
:param trace_id: Unique id of the trace file, to invalidate the cache
if the file changed.
:type trace_id: str or None
:param metadata: Metadata mapping to store in the swap area.
:type metadata: dict or None
:param swap_content: Initial content of the swap area.
:type swap_content: dict(_CacheDataDescNF, _CacheDataSwapEntry) or None
The cache manages both the :class:`pandas.DataFrame` and
:class:`pandas.Series` generated in memory and a swap area used to evict
them, and to reload them quickly. Some other data (typically JSON) can also
be stored in the cache by analysis method.
"""
INIT_SWAP_COST = 1e-8
"""
Somewhat arbitrary number, must be small enough so that we write at
least one dataset to the cache, which will allow us getting a better
estimation. If the value is too high from the start, we will never
write anything, and the value will never have a chance to re-adjust.
"""
TRACE_META_FILENAME = 'trace.meta'
"""
Name of the trace metadata file in the swap area.
"""
DATAFRAME_SWAP_FORMAT = 'polars-lazyframe'
"""
Data storage format used to swap.
"""
def __init__(self, max_mem_size=None, trace_path=None, trace_id=None, swap_dir=None, max_swap_size=None, swap_content=None, metadata=None):
self._lock = threading.RLock()
self._cache = {}
self._swappable = {}
self._data_cost = {}
self._swap_content = swap_content or {}
self._cache_desc_swap_filename = {}
self.swap_cost = self.INIT_SWAP_COST
self._swap_dir = swap_dir
self.max_swap_size = max_swap_size if max_swap_size is not None else math.inf
self._swap_size = self._get_swap_size()
self.max_mem_size = max_mem_size if max_mem_size is not None else math.inf
self._data_mem_swap_ratio = 1
self._metadata = metadata or {}
self.trace_path = os.path.abspath(trace_path) if trace_path else trace_path
self._trace_id = trace_id
self._unique_id = uuid.uuid4().hex
# Limit to one worker, as we will likely take the self._lock anyway
self._thread_executor = ThreadPoolExecutor(max_workers=1)
@property
def swap_dir(self):
if (swap_dir := self._swap_dir) is None:
raise ValueError(f'swap_dir is not set')
else:
return swap_dir
@property
@memoized
def _hardlinks_base(self):
path = Path(self.swap_dir) / 'hardlinks' / str(self._unique_id)
return path.resolve()
@property
@memoized
def _hardlinks_base_deallocator(self):
path = self._hardlinks_base
f = functools.partial(_file_cleanup, paths=[path])
return _Deallocator(
f=f,
on_del=False,
at_exit=True,
)
def _hardlink_path(self, base, name):
path = self._hardlinks_base / base
path.mkdir(parents=True, exist_ok=True)
# Make sure the deallocator has been created
self._hardlinks_base_deallocator
return (
path,
path / name,
)
def __del__(self):
# Only try with rmdir first, so that we don't sabbotage existing
# LazyFrame that might still be alive.
try:
path = self._hardlinks_base
os.rmdir(path)
except Exception:
pass
# Ensure we block until all workers are finished. Otherwise, e.g.
# removing the swap area might fail because an worker is still creating
# the metadata file in there.
self._thread_executor.shutdown()
def _parser_temp_path(self):
try:
swap_dir = self.swap_dir
except ValueError:
path = None
else:
path = Path(swap_dir) / 'temp'
path.mkdir(parents=True, exist_ok=True)
return tempfile.TemporaryDirectory(dir=path)
def update_metadata(self, metadata, blocking=True):
"""
Update the metadata mapping with the given ``metadata`` mapping and
write it back to the swap area.
"""
if metadata:
with self._lock:
self._metadata.update(metadata)
self.to_swap_dir(blocking=blocking)
def get_metadata(self, key):
"""
Get the value of the given metadata ``key``.
"""
try:
with self._lock:
return self._metadata[key]
except KeyError as e:
raise MissingMetadataError(key) from e
def to_json_map(self):
"""
Returns a dictionary suitable for JSON serialization.
"""
with self._lock:
trace_path = self.trace_path
if trace_path:
try:
swap_dir = self.swap_dir
except ValueError:
trace_path = os.path.abspath(trace_path)
else:
trace_path = os.path.relpath(trace_path, swap_dir)
return ({
'version-token': VERSION_TOKEN,
'metadata': self._metadata,
'trace-path': trace_path,
'trace-id': self._trace_id,
})
def to_path(self, path, blocking=True):
"""
Write the persistent state to the given ``path``.
"""
def f():
mapping = self.to_json_map()
with open(path, 'w') as f:
json.dump(mapping, f)
f.write('\n')
with measure_time() as m:
if blocking:
f()
else:
self._thread_executor.submit(f)
@classmethod
def _from_swap_dir(cls, swap_dir, trace_id, trace_path=None, metadata=None, **kwargs):
metapath = os.path.join(swap_dir, cls.TRACE_META_FILENAME)
with open(metapath) as f:
mapping = json.load(f)
if mapping['version-token'] != VERSION_TOKEN:
raise _TraceCacheSwapVersionError('Version token differ')
swap_trace_path = mapping['trace-path']
swap_trace_path = os.path.join(swap_dir, swap_trace_path) if swap_trace_path else None
metadata = metadata or {}
if trace_path and not os.path.samefile(swap_trace_path, trace_path):
invalid_swap = True
else:
old_trace_id = mapping['trace-id']
invalid_swap = (old_trace_id != trace_id)
if invalid_swap:
# Remove the invalid swap and create a fresh directory
shutil.rmtree(swap_dir)
os.makedirs(swap_dir)
swap_content = None
else:
def load_swap_content(swap_dir):
swap_entry_filenames = {
filename
for filename in os.listdir(swap_dir)
if filename.endswith(f'.{_CacheDataSwapEntry.META_EXTENSION}')
}
for filename in swap_entry_filenames:
path = os.path.join(swap_dir, filename)
try:
swap_entry = _CacheDataSwapEntry.from_path(path)
# If there is any issue with that entry, just ignore it
# pylint: disable=broad-except
except Exception:
continue
else:
yield (swap_entry.cache_desc_nf, swap_entry)
swap_content = dict(load_swap_content(swap_dir))
metadata_ = mapping['metadata']
metadata = {**metadata_, **metadata}
return cls(swap_content=swap_content, swap_dir=swap_dir, metadata=metadata, trace_path=trace_path, trace_id=trace_id, **kwargs)
def to_swap_dir(self, blocking=True):
"""
Write the persistent state to the swap area if any, no-op otherwise.
"""
try:
swap_dir = self.swap_dir
except ValueError:
pass
else:
path = os.path.join(swap_dir, self.TRACE_META_FILENAME)
self.to_path(path, blocking=blocking)
@classmethod
def from_swap_dir(cls, swap_dir, **kwargs):
"""
Reload the persistent state from the given ``swap_dir``.
:Variable keyword arguments: Forwarded to :class:`_TraceCache`.
"""
if swap_dir:
try:
return cls._from_swap_dir(swap_dir=swap_dir, **kwargs)
except (FileNotFoundError, _TraceCacheSwapVersionError, json.decoder.JSONDecodeError):
pass
return cls(swap_dir=swap_dir, **kwargs)
def _estimate_data_swap_cost(self, data):
return self._estimate_data_swap_size(data) * self.swap_cost
def _estimate_data_swap_size(self, data):
return self._data_mem_usage(data) * self._data_mem_swap_ratio
def _update_ewma(self, attr, new, alpha=0.25, override=False):
with self._lock:
old = getattr(self, attr)
if override:
updated = new
else:
updated = (1 - alpha) * old + alpha * new
setattr(self, attr, updated)
def _update_data_swap_size_estimation(self, data, size):
mem_usage = self._data_mem_usage(data)
if mem_usage:
self._update_ewma('_data_mem_swap_ratio', size / mem_usage)
@staticmethod
def _data_mem_usage(data):
if data is None:
return 1
elif isinstance(data, (pd.DataFrame, pd.Series)):
mem = data.memory_usage()
try:
return mem.sum()
except AttributeError:
return mem
else:
return sys.getsizeof(data)
def _should_evict_to_swap(self, cache_desc, data):
# If we don't have any cost info, assume it is expensive to compute
with self._lock:
compute_cost = self._data_cost.get(cache_desc, math.inf)
swap_cost = self._estimate_data_swap_cost(data)
return swap_cost <= compute_cost
def _path_of_swap_entry(self, swap_entry):
return os.path.join(self.swap_dir, swap_entry.meta_filename)
def _cache_desc_swap_path(self, cache_desc, create=False):
cache_desc_nf = cache_desc.normal_form
if create and not self._is_written_to_swap(cache_desc):
self.insert(
cache_desc,
data=None,
compute_cost=None,
write_swap=True,
ignore_cost=True,
# We do not write the swap_entry meta file, so that the
# user can write the data file before the swap entry is
# added. This way, another process will not be tricked into
# believing the data is available whereas in fact it's in
# the process of being populated.
write_meta=False,
)
with self._lock:
swap_entry = self._swap_content[cache_desc_nf]
filename = swap_entry.data_filename
return os.path.join(self.swap_dir, filename)
def _update_swap_cost(self, data, swap_cost, mem_usage, swap_size):
# Take out from the swap cost the time it took to write the overhead
# that comes with the file format, assuming the cost is
# proportional to amount of data written in the swap.
if not swap_size:
swap_cost = 0
new_cost = swap_cost / mem_usage
override = self.swap_cost == self.INIT_SWAP_COST
# EWMA to keep a relatively stable cost
self._update_ewma('swap_cost', new_cost, override=override)
def _is_written_to_swap(self, cache_desc):
try:
with self._lock:
swap_entry = self._swap_content[cache_desc.normal_form]
except KeyError:
return False
else:
return swap_entry.written
@staticmethod
def _data_to_parquet(data, path, compression='lz4', **kwargs):
kwargs['compression'] = compression
if isinstance(data, pd.DataFrame):
# Data must be convertible to bytes so we dump them as JSON
attrs = json.dumps(data.attrs)
table = pyarrow.Table.from_pandas(data)
updated_metadata = dict(
table.schema.metadata or {},
lisa=attrs,
)
table = table.replace_schema_metadata(updated_metadata)
pyarrow.parquet.write_table(table, path, **kwargs)
elif isinstance(data, pl.DataFrame):
data.write_parquet(path, **kwargs)
elif isinstance(data, pl.LazyFrame):
with pl.StringCache():
try:
data.sink_parquet(path, **kwargs)
# Some LazyFrame cannot be sunk lazily to a parquet file
except pl.InvalidOperationError:
data.collect().write_parquet(path, **kwargs)
else:
data.to_parquet(path, **kwargs)
def _write_data(self, fmt, data, path):
if fmt == 'disk-only':
return
elif fmt == 'parquet':
self._data_to_parquet(data, path)
elif fmt == 'polars-lazyframe':
assert isinstance(data, pl.LazyFrame)
def to_parquet():
self._data_to_parquet(data, path)
def to_json(plan):
plan = json.loads(plan)
plan, _ = _logical_plan_resolve_paths(
self,
plan=plan,
kind='dump',
)
with open(path, 'wt') as f:
json.dump(plan, f)
# If the LazyFrame is actually backed by an in-memory DataFrame, we
# just execute it and dump it to parquet. Otherwise, that would
# force rendering the entire dataframe to JSON first which would
# not scale well at all.
if _polars_df_in_memory(data):
to_parquet()
else:
try:
plan = data.serialize()
# We failed to serialize the logical plan. This could happen
# because it contains references to UDF (e.g. a lambda passed
# to Expr.map_elements())
except ValueError:
to_parquet()
else:
to_json(plan)
elif fmt == 'json':
with open(path, 'wt') as f:
try:
json.dump(data, f, separators=(',', ':'))
except Exception as e:
raise ValueError(f'Does not know how to write data type {data.__class__} to the cache: {e}') from e
else:
raise ValueError(f'Does not know how to dump to disk format: {fmt}')
def _load_data(self, fmt, path):
def load_parquet(path):
path = Path(path)
hardlink_base, hardlink_path = self._hardlink_path(
uuid.uuid4().hex,
path.name
)
# We make a hardlink to the cache entry that will not be
# subject to scrubbing. This way we ensure that we can keep the
# LazyFrame working even if another process decides to scrub
# the swap area.
_make_hardlink(path, hardlink_path)
try:
df = pl.scan_parquet(hardlink_path)
except BaseException:
try:
shutil.rmtree(hardlink_base)
except Exception:
pass
raise
else:
# Ensure we actually trigger a file read, in case we are trying
# to interpret as parquet something that is not parquet
df.clear().collect()
df = _LazyFrameOnDelete.attach_file_cleanup(df, [hardlink_base])
return df
if fmt == 'disk-only':
data = None
elif fmt == 'parquet':
data = load_parquet(path)
elif fmt == 'json':
with open(path, 'rt') as f:
data = json.load(f)
elif fmt == 'polars-lazyframe':
try:
data = load_parquet(path)
except pl.ComputeError:
with open(path, 'r') as f:
plan = json.load(f)
plan, hardlinks = _logical_plan_resolve_paths(
self,
plan=plan,
kind='load',
)
plan = json.dumps(plan)
plan = io.StringIO(plan)
data = pl.LazyFrame.deserialize(plan)
data = _LazyFrameOnDelete.attach_file_cleanup(data, hardlinks)
else:
raise ValueError(f'File format not supported "{fmt}" at path: {path}')
return data
def _write_swap(self, cache_desc, data, write_meta=True):
try:
swap_dir = self.swap_dir
except ValueError:
pass
else:
if self._is_written_to_swap(cache_desc):
return
cache_desc_nf = cache_desc.normal_form
# We may already have a swap entry if we used the None data
# placeholder. This would have allowed the user to reserve the swap
# data file in advance so they can write to it directly, instead of
# managing the data in the memory cache.
try:
with self._lock:
swap_entry = self._swap_content[cache_desc_nf]
except KeyError:
swap_entry = _CacheDataSwapEntry(cache_desc_nf)
data_path = os.path.join(swap_dir, swap_entry.data_filename)
# If that would make the swap dir too large, try to do some cleanup
if self._estimate_data_swap_size(data) + self._swap_size > self.max_swap_size:
self.scrub_swap()
def log_error(e):
self.logger.error(f'Could not write {cache_desc} to swap: {e}')
# Write the Parquet file and update the write speed
try:
with measure_time() as measure:
self._write_data(cache_desc.fmt, data, data_path)
# PyArrow fails to save dataframes containing integers > 64bits
except OverflowError as e:
log_error(e)
else:
# Update the swap entry on disk
if write_meta:
try:
swap_entry.to_path(
self._path_of_swap_entry(swap_entry)
)
# We have a swap entry that cannot be written to the swap,
# probably because the descriptor includes something that
# cannot be serialized to JSON.
except _CannotWriteSwapEntry as e:
self.logger.debug(f'Could not write {cache_desc} to swap: {e}')
swap_entry.written = False
return
else:
swap_entry.written = True
with self._lock:
self._swap_content[swap_entry.cache_desc_nf] = swap_entry
# Assume that reading from the swap will take as much time as
# writing to it. We cannot do better anyway, but that should
# mostly bias to keeping things in memory if possible.
swap_cost = measure.exclusive_delta
try:
data_swapped_size = os.stat(data_path).st_size
except FileNotFoundError:
data_swapped_size = 0
mem_usage = self._data_mem_usage(data)
if mem_usage:
self._update_swap_cost(data, swap_cost, mem_usage, data_swapped_size)
with self._lock:
self._swap_size += data_swapped_size
self._update_data_swap_size_estimation(data, data_swapped_size)
self.scrub_swap()
def _get_swap_size(self):
try:
swap_dir = self.swap_dir
except ValueError:
return 1
else:
return sum(
dir_entry.stat().st_size
for dir_entry in os.scandir(swap_dir)
)
def scrub_swap(self):
"""
Scrub the swap area to remove old files if the storage size limit is exceeded.
"""
try:
swap_dir = self.swap_dir
except ValueError:
pass
else:
# TODO: Load the file information from __init__ by discovering the
# swap area's content to avoid doing it each time here
if self._swap_size > self.max_swap_size:
self._scrub_swap(swap_dir)
def _scrub_swap(self, swap_dir):
with self._lock:
def get_stat(dir_entry):
try:
return dir_entry.stat()
except FileNotFoundError:
return None
stats = {
dir_entry.name: stat
for dir_entry in os.scandir(swap_dir)
if (stat := get_stat(dir_entry)) is not None
}
swap_content = list(self._swap_content.values())
data_files = {
swap_entry.data_filename: swap_entry
for swap_entry in swap_content
}
# Get rid of stale files that are not referenced by any swap entry
metadata_files = {
swap_entry.meta_filename
for swap_entry in swap_content
}
metadata_files.add(self.TRACE_META_FILENAME)
non_stale_files = data_files.keys() | metadata_files | {'hardlinks', 'temp'}
stale_files = stats.keys() - non_stale_files
for filename in stale_files:
stats.pop(filename, None)
path = os.path.join(swap_dir, filename)
try:
os.unlink(path)
except Exception:
pass
def by_mtime(path_stat):
_, stat = path_stat
return stat.st_mtime
# Sort by modification time, so we discard the oldest caches
total_size = 0
discarded_swap_entries = set()
for filename, stat in sorted(stats.items(), key=by_mtime):
total_size += stat.st_size
if total_size > self.max_swap_size:
try:
swap_entry = data_files[filename]
# That was not a data file
except KeyError:
continue
else:
discarded_swap_entries.add(swap_entry)
# Update the swap content
for swap_entry in discarded_swap_entries:
stats.pop(swap_entry.data_filename, None)
self._clear_swap_entry(swap_entry)
with self._lock:
self._swap_size = sum(
stats[swap_entry.data_filename].st_size
for swap_entry in swap_content
if swap_entry.data_filename in stats
)
def _clear_cache_desc_swap(self, cache_desc):
with self._lock:
try:
swap_entry = self._swap_content[cache_desc.normal_form]
except KeyError:
pass
else:
self._clear_swap_entry(swap_entry)
def _clear_swap_entry(self, swap_entry):
try:
swap_dir = self.swap_dir
except ValueError:
pass
else:
with self._lock:
self._swap_content.pop(swap_entry.cache_desc_nf, None)
for filename in (swap_entry.meta_filename, swap_entry.data_filename):
path = os.path.join(swap_dir, filename)
try:
os.unlink(path)
except Exception:
pass
def fetch(self, cache_desc, insert=True):
"""
Fetch an entry from the cache or the swap.
:param cache_desc: Descriptor to look for.
:type cache_desc: _CacheDataDesc
:param insert: If ``True`` and if the fetch succeeds by loading the
swap, the data is inserted in the cache.
:type insert: bool
"""
try:
with self._lock:
return self._cache[cache_desc]
except KeyError as e:
# pylint: disable=raise-missing-from
try:
path = self._cache_desc_swap_path(cache_desc)
# If there is no swap, bail out
except (ValueError, KeyError):
raise KeyError('Could not find swap entry for cache_desc')
else:
try:
data = self._load_data(cache_desc.fmt, path)
except Exception as e:
# Rotten swap entry, we clear it. This may happen e.g. when
# reloading a LazyFrame that depended on another cache
# entry that has been scrubbed
self._clear_cache_desc_swap(cache_desc)
raise KeyError('Could not load swap entry content of cache_desc')
if insert:
# We have no idea of the cost of something coming from
# the cache
self.insert(cache_desc, data, write_swap=False, compute_cost=None)
return data
def insert(self, cache_desc, data, compute_cost=None, write_swap=False, ignore_cost=False, write_meta=True, swappable=None):
"""
Insert an entry in the cache.
:param cache_desc: Descriptor of the data to insert.
:type cache_desc: _CacheDataDesc
:param data: data to insert.
:type data: object
:param compute_cost: Time spent to compute the data in seconds.
:type compute_cost: float or None
:param write_swap: If ``True``, the data will be written to the swap as
well so it can be quickly reloaded. Note that it will be subject to
cost evaluation, so it might not result in anything actually
written.
:type write_swap: bool
:param ignore_cost: If ``True``, bypass the computation vs swap
cost comparison.
:type ignore_cost: bool
:param write_meta: If ``True``, the swap entry metadata will be written
on disk if the data are. Otherwise, no swap entry is written to disk.
:type write_meta: bool
:param swappable: If ``False``, the data will never be written to the
swap and will only be kept in memory. If ``None``, the swappability
will not change if it was already set, otherwise it will be set to
``True``.
:type swappable: bool or None
"""
with self._lock:
self._cache[cache_desc] = data
if swappable is not None:
self._swappable[cache_desc] = swappable
if compute_cost is not None:
self._data_cost[cache_desc] = compute_cost
self.write_swap(
cache_desc,
ignore_cost=ignore_cost,
write_meta=write_meta
)
self._scrub_mem()
def insert_disk_only(self, spec, compute_cost=None):
cache_desc = _CacheDataDesc(spec=spec, fmt='disk-only')
self.insert(cache_desc, data=None, compute_cost=compute_cost, write_swap=True)
path = self._cache_desc_swap_path(cache_desc, create=True)
return Path(path).resolve()
def _scrub_mem(self):
if self.max_mem_size == math.inf:
return
mem_usage = sum(
self._data_mem_usage(data)
for data in self._cache.values()
)
if mem_usage > self.max_mem_size:
with self._lock:
# Make sure garbage collection occurred recently, to get the most
# accurate refcount possible
gc.collect()
refcounts = {
cache_desc: sys.getrefcount(data)
for cache_desc, data in self._cache.items()
}
min_refcount = min(refcounts.values())
# Low retention score means it's more likely to be evicted
def retention_score(cache_desc_and_data):
cache_desc, data = cache_desc_and_data
# If we don't know the computation cost, assume it can be evicted cheaply
compute_cost = self._data_cost.get(cache_desc, 0)
if not compute_cost:
score = 0
else:
swap_cost = self._estimate_data_swap_cost(data)
# If it's already written back, make it cheaper to evict since
# the eviction itself is going to be cheap
if self._is_written_to_swap(cache_desc):
swap_cost /= 2
if swap_cost:
score = compute_cost / swap_cost
else:
score = 0
# Assume that more references to an object implies it will
# stay around for longer. Therefore, it's less interesting to
# remove it from this cache and pay the cost of reading/writing it to
# swap, since the memory will not be freed anyway.
#
# Normalize to the minimum refcount, so that the _cache and other
# structures where references are stored are discounted for sure.
return (refcounts[cache_desc] - min_refcount + 1) * score
new_mem_usage = 0
for cache_desc, data in sorted(self._cache.items(), key=retention_score):
new_mem_usage += self._data_mem_usage(data)
if new_mem_usage > self.max_mem_size:
self.evict(cache_desc)
def evict(self, cache_desc):
"""
Evict the given descriptor from memory.
:param cache_desc: Descriptor to evict.
:type cache_desc: _CacheDataDesc
If it would be cheaper to reload the data than to recompute them, they
will be written to the swap area.
"""
self.write_swap(cache_desc)
try:
with self._lock:
del self._cache[cache_desc]
except KeyError:
pass
def write_swap(self, cache_desc, ignore_cost=False, write_meta=True):
"""
Write the given descriptor to the swap area if that would be faster to
reload the data rather than recomputing it. If the descriptor is not in
the cache or if there is no swap area, ignore it.
:param cache_desc: Descriptor of the data to write to swap.
:type cache_desc: _CacheDataDesc
:param ignore_cost: If ``True``, bypass the compute vs swap cost comparison.
:type ignore_cost: bool
:param write_meta: If ``True``, the swap entry metadata will be written
on disk if the data are. Otherwise, no swap entry is written to disk.
:type write_meta: bool
"""
try:
with self._lock:
data = self._cache[cache_desc]
swappable = self._swappable.get(cache_desc, True)
except KeyError:
pass
else:
if (
swappable and
(
ignore_cost or
self._should_evict_to_swap(cache_desc, data)
)
):
self._write_swap(cache_desc, data, write_meta)
def write_swap_all(self, **kwargs):
"""
Attempt to write all cached data to the swap.
"""
with self._lock:
cache_descs = list(self._cache.keys())
for cache_desc in cache_descs:
self.write_swap(cache_desc, **kwargs)
def clear_event(self, event, raw=None):
"""
Clear cache entries referencing a given event.
:param event: Event to clear.
:type event: str
:param raw: If ``True``, only clear entries that refer to raw data. If
``False``, only clear entries that refer to non-raw data. If
``None``, ignore whether the descriptor is about raw data or not.
:type raw: bool or None
"""
with self._lock:
self._cache = {
cache_desc: data
for cache_desc, data in self._cache.items()
if not (
cache_desc.get('event') == event
and (
raw is None
or cache_desc.get('raw') == raw
)
)
}
def clear_all_events(self, raw=None):
"""
Same as :meth:`clear_event` but works on all events at once.
"""
with self._lock:
self._cache = {
cache_desc: data
for cache_desc, data in self._cache.items()
if (
# Cache entries can be associated to something else than events
'event' not in cache_desc or
# Either we care about raw and we check, or blanket clear
raw is None or
cache_desc.get('raw') == raw
)
}
class _Trace(Loggable, _InternalTraceBase):
def _select_userspace(self, source_event, meta_event, df):
# pylint: disable=unused-argument,no-self-use
# tracing_mark_write is the name of the kernel function invoked when
# writing to: /sys/kernel/debug/tracing/trace_marker
# That said, it's not the end of the world if we don't filter on that
# as the meta event name is supposed to be unique anyway
if isinstance(df.schema['ip'], (pl.String, pl.Binary, pl.Categorical)):
df = df.filter(pl.col('ip').cast(pl.String).str.starts_with('tracing_mark_write'))
return (df, 'buf')
def _select_trace_printk(self, source_event, meta_event, df):
content_col = {
'bprint': 'buf',
}[source_event]
return (df, content_col)
_META_EVENT_SOURCE = {
'userspace': {
'print': _select_userspace,
},
'trace_printk': {
'bprint': _select_trace_printk,
},
}
"""
Define the source of each meta event.
Meta events are events derived by parsing a field of another given real event.
Meta events are event with a name formed as ``prefix:event``.
This mapping has the format::
prefix:
source event: getter_func
... # Multiple sources are allowed
...
``getter_func`` takes the :class:`pandas.DataFrame` of the source event as
input and returns a tuple of ``(dataframe, line_column)`` where:
* ``dataframe`` is the source event dataframe, potentially filtered.
* ``line_column`` is the name of the column containing the lines to
parse.
"""
def __init__(self,
trace_path=None,
plat_info=None,
parser=None,
plots_dir=None,
max_mem_size=None,
swap_dir=None,
enable_swap=True,
max_swap_size=None,
):
super().__init__()
self._lock = threading.RLock()
trace_path = str(trace_path) if trace_path else None
if enable_swap:
if trace_path and os.path.exists(trace_path):
if swap_dir is None:
basename = os.path.basename(trace_path)
swap_dir = os.path.join(
os.path.dirname(trace_path),
f'.{basename}.lisa-swap'
)
try:
os.makedirs(swap_dir, exist_ok=True)
except OSError:
swap_dir = None
if max_swap_size is None:
trace_size = os.stat(trace_path).st_size
# Use 10 times the size of the trace so that there is
# enough room to store large artifacts like a JSON dump of
# the trace
max_swap_size = trace_size * 10
else:
swap_dir = None
max_swap_size = None
self.trace_path = trace_path
self._parseable_events = {}
if parser is None:
if not trace_path:
raise ValueError('A trace path must be provided')
_, extension = os.path.splitext(trace_path)
if extension == '.html':
parser = SysTraceParser.from_html
elif extension == '.txt':
parser = HRTxtTraceParser.from_txt_file
else:
parser = TraceDumpTraceParser.from_dat
self._parser = parser
# The platform information used to run the experiments
if plat_info is None:
# Delay import to avoid circular dependency
# pylint: disable=import-outside-toplevel
from lisa.platforms.platinfo import PlatformInfo
plat_info = PlatformInfo()
else:
# Make a shallow copy so we can update it
plat_info = copy.copy(plat_info)
self._source_events_known = False
if plots_dir:
_deprecated_warn('Trace(plots_dir=...) parameter is deprecated', stacklevel=2)
elif not plots_dir and trace_path:
plots_dir = os.path.dirname(trace_path)
self.plots_dir = plots_dir
# No-op cache so that the cacheable metadata machinery does not fall
# over when querying the trace-id.
self._cache = _TraceCache()
trace_id = self._get_trace_id()
self._cache = _TraceCache.from_swap_dir(
trace_path=trace_path,
swap_dir=swap_dir,
max_swap_size=max_swap_size,
max_mem_size=max_mem_size,
trace_id=trace_id,
metadata=self._cache._metadata,
)
# Initial scrub of the swap to discard unwanted data, honoring the
# max_swap_size right from the beginning
self._cache.scrub_swap()
self._cache.to_swap_dir(blocking=False)
try:
self._parseable_events = self._cache.get_metadata('parseable-events')
except MissingMetadataError:
pass
# Preload metadata from the cache, to trigger any side effect when
# processing them. For example, this can help avoiding spinning the
# parser if the set of available events is already known.
self._preload_metadata_cache()
# Register what we currently have
self.plat_info = plat_info
# Update the platform info with the data available from the trace once
# the Trace is almost fully initialized
self.plat_info = plat_info.add_trace_src(self)
def _preload_metadata_cache(self):
def fail():
raise ValueError('Fake metadata value')
def load(key):
try:
return self._process_metadata(key=key, get=fail)
except ValueError:
return None
return {
key: value
for key in TraceParserBase.METADATA_KEYS
if (value := load(key)) is not None
}
def _preload_events(self, events):
if events is _ALL_EVENTS:
return self._preload_all_events()
else:
events = OptionalTraceEventChecker.from_events(events)
# This creates a polars LazyFrame from the cache if available, so it's
# cheap since we always cache the data coming from the parser.
df_map = self._load_cache_raw_df(
events,
allow_missing_events=True,
)
return set(df_map.keys())
@memoized
def _preload_all_events(self):
meta, df_map = self._parse_all()
return set(df_map.keys())
def _parse_all(self):
def convert(parser, df_map):
return {
event: _convert_df_from_parser(
df,
parser=parser,
cache=self._cache,
)
for event, df in df_map.items()
}
def finalize(preloaded_df_map, preloaded_meta, parser, df_map):
parsed_df_map = convert(parser, df_map)
self._insert_events({
event: df
for event, df in parsed_df_map.items()
if event not in preloaded_df_map
})
df_map = {
**preloaded_df_map,
**parsed_df_map,
}
if set(preloaded_meta.keys()) < set(TraceParserBase.METADATA_KEYS):
parsed_meta = parser.get_all_metadata()
parsed_meta = {
k: self._process_metadata(key=k, get=lambda: v)
for k, v in parsed_meta.items()
if k not in preloaded_meta.keys()
}
else:
parsed_meta = {}
meta = {
**preloaded_meta,
**parsed_meta
}
return (meta, df_map)
def load_from_cache(event):
cache_desc = self._make_raw_cache_desc(event)
try:
df = self._cache.fetch(cache_desc, insert=True)
except KeyError:
return None
else:
return _ParsedDataFrame.from_df(df)
def parse(preloaded_df_map, preloaded_meta, events):
needed_metadata = set(TraceParserBase.METADATA_KEYS) - set(preloaded_meta.keys())
if (needed_metadata or events or events is _ALL_EVENTS):
_finalize = functools.partial(
finalize,
preloaded_df_map=preloaded_df_map,
preloaded_meta=preloaded_meta,
)
if events is _ALL_EVENTS:
with self._get_parser(events=events, needed_metadata=needed_metadata) as parser:
try:
return _finalize(parser=parser, df_map=parser.parse_all_events())
except NotImplementedError:
events = parser.get_metadata('available-events')
with self._get_parser(events=events, needed_metadata=needed_metadata) as parser:
# Use custom best-effort implementation that allows
# filtering out _all_ exceptions. We are here to parse as
# many events as we can, regardless of any error.
def parse_best_effort(event):
try:
return parser.parse_event(event)
except Exception:
return None
return _finalize(
parser=parser,
df_map={
event: df
for event in events
if (df := parse_best_effort(event)) is not None
}
)
else:
return (
preloaded_meta,
preloaded_df_map,
)
preloaded_meta = self._preload_metadata_cache()
try:
events = preloaded_meta['available-events']
except KeyError:
events = _ALL_EVENTS
preloaded_df_map = {}
else:
preloaded_df_map = {
event: df
for event in events
if (df := load_from_cache(event)) is not None
}
events = {
event
for event in events
if event not in preloaded_df_map
}
meta, df_map = parse(preloaded_df_map, preloaded_meta, events)
df_map = {
event: df
for event, df in df_map.items()
}
return (meta, df_map)
def get_metadata(self, key):
"""
Get metadata from the underlying trace parser.
.. seealso:: :meth:`TraceParserBase.get_metadata`
"""
return self._get_metadata(key=key)
def _get_metadata(self, key, parser=None, try_hard=False):
def get():
if parser is None:
@contextlib.contextmanager
def _cm():
with self._get_parser(needed_metadata={key}) as parser:
yield parser
cm = _cm()
else:
# If we got passed a parser, we leave the decision to use it as a
# context manager or not to the caller.
cm = parser if try_hard else nullcontext(parser)
with cm as _parser:
value = _parser.get_metadata(key)
return value
return self._process_metadata(get=get, key=key)
@staticmethod
def _meta_to_json(meta):
def process(key, value):
if key == 'available-events':
# Ensure we have a list so that it can be dumped to JSON
value = sorted(set(value))
elif key == 'trace-id':
value = str(value)
elif key == 'time-range':
start, end = value
# Ensure we round the float value of the window boundaries
# correctly so that all events are within that window.
# Otherwise we could have the first/last event out of that
# window due to wrong rounding.
start = Timestamp(start, rounding='down')
end = Timestamp(end, rounding='up')
# Return integer number of nanoseconds. This ensures we cache
# the nanosecond exact value rather than a float that might be
# imprecise.
value = (start.as_nanoseconds, end.as_nanoseconds)
elif key == 'symbols-address':
# Unzip the dict into a list of keys and list of values, since
# that will be cheaper to deserialize and serialize to JSON
value = tuple(unzip_into(
2,
# Turn to a list to allow JSON caching, since JSON cannot
# have non-string keys
dict(value).items()
))
return value
return {
key: process(key, value)
for key, value in meta.items()
}
@staticmethod
def _meta_from_json(meta):
def process(key, value):
"""
Process a value prepared with :meth:`_meta_to_json`
"""
if key == 'available-events':
value = set(value)
elif key == 'time-range':
start, end = value
assert isinstance(start, int)
assert isinstance(end, int)
# Ensure we round the float value of the window boundaries
# correctly so that all events are within that window.
# Otherwise we could have the first/last event out of that
# window due to wrong rounding.
start = Timestamp(start, unit='ns', rounding='down')
end = Timestamp(end, unit='ns', rounding='up')
value = (start, end)
elif key == 'symbols-address':
# Turn the list of items (to allow JSON storage) back to a dict.
addr, names = value
value = dict(zip(
map(int, addr),
names
))
return value
return {
key: process(key, value)
for key, value in meta.items()
}
def _process_metadata(self, key, get):
def process(key, value):
"""
Process a value prepared with :meth:`_meta_to_json`
"""
value = self._meta_from_json({key: value})[key]
if key == 'available-events':
with self._lock:
# Populate the list of available events, and inform the
# rest of the code that this list is definitive.
self._update_parseable_events({
event: True
for event in value
})
# Note that this will not take into account meta-events.
self._source_events_known = True
return value
def _get(key):
value = get()
return self._meta_to_json({key: value})[key]
def get_cacheable(key):
try:
value = self._cache.get_metadata(key)
except MissingMetadataError:
value = _get(key)
self._cache.update_metadata({key: value}, blocking=False)
return value
value = get_cacheable(key)
return process(key, value)
@classmethod
def _is_meta_event(cls, event):
sources = cls.get_event_sources(event)
return len(sources) > 1
@classmethod
def get_event_sources(cls, event):
"""
Get the possible sources events of a given event.
For normal events, this will just be a list with the event itself in
it.
For meta events, this will be the list of source events hosting that
meta-event.
"""
try:
prefix, _ = event.split('@', 1)
except ValueError:
return [event]
try:
# It is capital that "event" is the first item so we allow the
# parser to handle it directly.
return (event, *sorted(cls._META_EVENT_SOURCE[prefix].keys()))
except KeyError:
return (event,)
@property
# Memoization is necessary to ensure the parser always gets the same name
# on a given Trace instance when the parser is not a type.
@lru_memoized(first_param_maxsize=None, other_params_maxsize=None)
def trace_state(self):
parser = self._parser
# The parser type will potentially change the exact content in raw
# dataframes
def get_name(parser):
if isinstance(parser, TraceParserBase):
return parser.get_parser_id()
else:
# If the parser is an instance of something, we cannot safely track its
# state so just make a unique name for it
if (
isinstance(parser, (type, types.MethodType)) and
not any(
x in parser.__qualname__
for x in ('<locals>', '<lambda>')
)
):
return f'{parser.__module__}.{parser.__qualname__}'
else:
cls = type(parser)
return f'{cls.__module__}.{cls.__qualname__}-instance:{uuid.uuid4().hex}'
return (
self.__class__.__qualname__,
get_name(parser),
)
@property
@memoized
def cpus_count(self):
try:
return self.plat_info['cpus-count']
except KeyError:
try:
count = self.get_metadata('cpus-count')
# If we don't know the number of CPUs, check the trace for the
# highest-numbered CPU that traced an event.
except MissingMetadataError:
# Sched_switch should be enough if it's available
if 'sched_switch' in self.available_events:
checked_events = ['sched_switch']
# This is pretty costly, as it will trigger parsing of all
# events in the trace, so only do it as last resort
else:
checked_events = self.available_events
max_cpu = max(
int(df.select(pl.max('__cpu')).collect().item())
for df, meta in (
self._internal_df_event(event)
for event in checked_events
)
if '__cpu' in df.columns
)
count = max_cpu + 1
self.logger.debug(f"Estimated CPU count from trace: {count}")
return count
def _get_parser(self, events=tuple(), needed_metadata=None):
cache = self._cache
path = self.trace_path
events = events if events is _ALL_EVENTS else set(events)
needed_metadata = set(needed_metadata or [])
@contextlib.contextmanager
def cm():
with pl.StringCache(), self._cache._parser_temp_path() as temp_dir:
self.logger.debug(f'Spinning up trace parser {self._parser}: path={path}, events={events}, needed_metadata={needed_metadata}')
parser = self._parser(
path=path,
events=events,
needed_metadata=needed_metadata,
temp_dir=temp_dir,
)
with parser as parser:
yield parser
# While we are at it, gather a bunch of metadata. Since we did not
# explicitly asked for it, the parser will only give
# it if it was a cheap byproduct.
self._update_metadata(parser)
return cm()
def _update_metadata(self, parser):
# Tentatively get the metadata value, in case they are available
for key in TraceParserBase.METADATA_KEYS:
with contextlib.suppress(MissingMetadataError):
self._get_metadata(key, parser=parser)
def _update_parseable_events(self, mapping):
with self._lock:
update = {
k: v
for k, v in mapping.items()
if v != self._parseable_events.get(k)
}
if update:
self._parseable_events.update(update)
self._cache.update_metadata(
{
'parseable-events': self._parseable_events,
},
blocking=False,
)
return self._parseable_events
@property
def basetime(self):
"""
First absolute timestamp available in the trace.
"""
return self._get_time_range()[0]
@property
def endtime(self):
"""
Timestamp of when the tracing stopped.
.. note:: With some parsers, that might be the timestamp of the last
recorded event instead if the trace end timestamp was not recorded.
"""
return self._get_time_range()[1]
@memoized
def _get_time_range(self, parser=None):
return self._get_metadata('time-range', parser=parser)
@memoized
def _get_trace_id(self):
try:
return self._get_metadata('trace-id', try_hard=True)
except MissingMetadataError:
if (path := self.trace_path):
with open(path, 'rb') as f:
md5 = checksum(f, 'md5')
id_ = f'md5sum-{md5}'
else:
id_ = '<unknown>'
self._cache.update_metadata(
{
'trace-id': id_,
},
blocking=False,
)
return id_
@memoized
def _make_raw_cache_desc(self, event):
spec = dict(
# This is used when clearing the cache to know if a given entry is
# related to a raw event or e.g. an analysis.
raw=True,
event=event,
trace_state=self.trace_state,
)
return _CacheDataDesc(spec=spec, fmt=_TraceCache.DATAFRAME_SWAP_FORMAT)
def _internal_df_event(self, event, write_swap=None, raw=None, **kwargs):
if write_swap is not None:
_deprecated_warn('write_swap parameter has no effect anymore')
if raw:
raise ValueError(f'raw=True is not supported anymore, dataframes are always post processed by parsers to be as close as possible to the ftrace event format')
# Make sure all raw descriptors are made the same way, to avoid
# missed sharing opportunities
cache_desc = self._make_raw_cache_desc(event)
try:
try:
df = self._cache.fetch(cache_desc, insert=True)
except KeyError:
df = self._load_cache_raw_df(TraceEventChecker(event))[event]
else:
df = _ParsedDataFrame.from_df(df)
except MissingTraceEventError as e:
e.available_events = self.available_events
raise e
else:
assert isinstance(df, _ParsedDataFrame)
df = df.df
# TODO: If and when this is solved, attach the name of the event to
# the LazyFrames:
# https://github.com/pola-rs/polars/issues/5117
meta = dict(event=event)
return df, meta
def _load_cache_raw_df(self, event_checker, allow_missing_events=False):
events = event_checker.get_all_events()
# Get the raw dataframe from the cache if possible
def try_from_cache(event):
cache_desc = self._make_raw_cache_desc(event)
try:
df = self._cache.fetch(cache_desc, insert=True)
except KeyError:
return None
else:
return _ParsedDataFrame.from_df(df)
from_cache = {
event: try_from_cache(event)
for event in events
}
from_cache = {
event: df
for event, df in from_cache.items()
if df is not None
}
# Load the remaining events from the trace directly
events_to_load = sorted(events - from_cache.keys())
# Cheap check to avoid spinning up the parser for nothing
with self._lock:
for event in list(events_to_load):
# If we have discovered all the available events, there is no
# point in trying again.
default = (not self._source_events_known) or self._is_meta_event(event)
if not self._parseable_events.get(event, default):
if allow_missing_events:
events_to_load.remove(event)
else:
raise MissingTraceEventError([event])
df_from_trace = self._load_raw_df(events_to_load)
self._insert_events(df_from_trace)
df_map = {**from_cache, **df_from_trace}
try:
event_checker.check_events(df_map.keys())
except MissingTraceEventError as e:
if not allow_missing_events:
raise MissingTraceEventError(
e.missing_events,
msg='Events not found in the trace: {missing_events}{available}'
) from e
return df_map
def _insert_events(self, df_map):
for event, df in df_map.items():
assert isinstance(df, _ParsedDataFrame)
if df.meta['mem_cacheable']:
cache_desc = self._make_raw_cache_desc(event)
self._cache.insert(
cache_desc,
df.df,
write_swap=True,
swappable=df.meta['swap_cacheable'],
# For raw dataframe, always write in the swap area if asked for
# since parsing cost is known to be high
ignore_cost=True,
)
def _parse_raw_events(self, events):
if not events:
return {}
def parse(parser, event):
try:
return parser.parse_event(event)
# We only catch MissingTraceEventError, so that we know the parser
# made a voluntary decision to state the event was not available,
# rather than just crashing in an unintended way. However, we will
# display the exceptions's __cause__ later on, which the parser can
# set to inform the user of any actual problem it encountered while
# parsing the data.
except MissingTraceEventError as e:
return None
with self._get_parser(events) as parser:
df_map = {
event: _convert_df_from_parser(
df=df,
parser=parser,
cache=self._cache,
)
for event in events
if (df := parse(parser, event)) is not None
}
return df_map
def _parse_meta_events(self, meta_events):
if not meta_events:
return {}
# Gather the information to parse the meta event
def make_spec(meta_event):
prefix, event = meta_event.split('@', 1)
data_getters = self._META_EVENT_SOURCE[prefix]
return (meta_event, event, data_getters)
meta_specs = list(map(make_spec, meta_events))
# Map each trimmed event name back to its meta event name
events_map = {
event: meta_event
for meta_event, event, _ in meta_specs
}
# Explode per source event
meta_specs = [
(meta_event, event, source_event, source_getter)
for (meta_event, event, data_getters) in meta_specs
for source_event, source_getter in data_getters.items()
]
def get_missing(df_map):
return events_map.keys() - df_map.keys()
df_map = {}
# Group all the meta events by their source event, so we process source
# dataframes one by one
for source_event, specs in groupby(meta_specs, key=itemgetter(2)):
try:
df, meta = self._internal_df_event(source_event)
except MissingTraceEventError:
pass
else:
assert isinstance(df, pl.LazyFrame)
# Add all the header fields from the source dataframes
extra_fields = [x for x in df.columns if x.startswith('__')]
extra_df = df.select(('Time', *extra_fields))
with tempfile.TemporaryDirectory() as temp_dir:
for (meta_event, event, _source_event, source_getter) in specs: # pylint: disable=unused-variable
source_df, line_field = source_getter(self, _source_event, event, df)
# If the lines are in a dtype we won't be able to
# handle, we won't add an entry to df_map, leading to a
# missing event
if not isinstance(source_df.schema[line_field], (pl.String, pl.Categorical, pl.Binary)):
continue
# Ensure we have bytes and not str
source_df = source_df.with_columns(
pl.col(line_field).cast(pl.String).cast(pl.Binary)
)
source_df = source_df.select((
# Use nanoseconds, so that we don't end up with
# timedelta() values that are only accurate down to
# the microsecond.
pl.col('Time').dt.total_nanoseconds(),
pl.col(line_field),
)).collect()
try:
parser = MetaTxtTraceParser(
lines=source_df[line_field],
time=source_df['Time'],
time_unit='ns',
events=[event],
temp_dir=temp_dir,
)
# An empty dataframe would trigger an exception from
# MetaTxtTraceParser, so we just skip over it and if the
# event cannot be found anywhere, we will raise an
# exception.
#
# Also, some android kernels hide trace_printk() strings
# which prevents from parsing anything at all ...
except ValueError:
continue
else:
try:
with parser as parser:
_df = parser.parse_event(event)
except MissingTraceEventError:
continue
# In case a meta-event is spread among multiple
# events, we get all the dataframes and concatenate
# them together
else:
_df = _convert_df_from_parser(
_df,
parser=parser,
cache=self._cache,
)
_df = _df.df
_df = _df.join(
extra_df,
on='Time',
how='left',
)
_df = _df.select(
order_as(
sorted(
_df.columns,
key=lambda col: 0 if col.startswith('__') else 1
),
['Time']
)
)
df_map.setdefault(event, []).append(_df)
if not get_missing(df_map):
break
def concat(df_list):
try:
df, = df_list
except ValueError:
df = pl.concat(df_list, how='diagonal_relaxed')
df = df.sort('Time')
return df
df_map = {
event: concat(df_list)
for event, df_list in df_map.items()
}
# On some parsers, meta events are treated as regular events so attempt
# to load them from there as well
for event in get_missing(df_map):
with contextlib.suppress(MissingTraceEventError):
df, meta = self._internal_df_event(event)
df_map[event] = df
return {
events_map[event]: _ParsedDataFrame.from_df(df)
for event, df in df_map.items()
}
def _load_raw_df(self, events):
events = set(events)
if not events:
return {}
meta_events = set(filter(self._is_meta_event, events))
# Pass the entire set of events to _parse_raw() first, in case the
# parser can handle meta events natively.
df_map = self._parse_raw_events(events)
meta_df_map = self._parse_meta_events(meta_events - df_map.keys())
df_map.update(meta_df_map)
# remember the events that we tried to parse and that turned out to not be available
self._update_parseable_events({
event: (event in df_map)
for event in events
})
return df_map
def has_events(self, events, namespaces=None):
"""
Returns True if the specified event is present in the parsed trace,
False otherwise.
:param events: trace event name or list of trace events
:type events: str or list(str) or TraceEventCheckerBase
"""
if isinstance(events, str):
events = [events]
if not isinstance(events, TraceEventCheckerBase):
events = AndTraceEventChecker.from_events(events=events)
try:
events.check_events(self.available_events, namespaces=namespaces)
except MissingTraceEventError:
return False
else:
return True
@property
def start(self):
return self.basetime
@property
def end(self):
return self.endtime
[docs]
class Trace(TraceBase):
"""
This class provides a way to access event dataframes and ties
together various low-level moving pieces to make that happen.
:param trace_path: File containing the trace
:type trace_path: str or None
:param events: events to be parsed. Since events can be loaded on-demand,
that is optional but still recommended to improve trace parsing speed.
.. seealso:: :meth:`lisa.trace.TraceBase.df_event` for event formats accepted.
:type events: TraceEventCheckerBase or list(str) or None
:param events_namespaces: List of namespaces of the requested events. Each
namespace will be tried in order until the event is found. The ``None``
namespace can be used to specify no namespace. The full event name is
formed with ``<namespace>__<event>``.
:type events_namespaces: list(str or None)
:param strict_events: When ``True``, all the events specified in ``events``
have to be present, and any other events will be assumed to not be
present. This allows early failure and avoid the cost of lazy detection
of events in very large traces.
:type strict_events: bool
:param plat_info: Platform info describing the target that this trace was
collected on.
:type plat_info: lisa.platforms.platinfo.PlatformInfo
:param normalize_time: Make the first timestamp in the trace 0 instead
of the system timestamp that was captured when tracing.
:type normalize_time: bool
:param parser: Optional trace parser to use as a backend. It must
implement the API defined by :class:`TraceParserBase`, and will be
called as ``parser(path=trace_path, events=events,
needed_metadata={'time-range', ...})`` with the events that should be
parsed. Other parameters must either have default values, or be
pre-assigned using partially-applied constructor (for subclasses of
:class:`lisa.utils.PartialInit`) or :func:`functools.partial`. By
default, ``.txt`` files will be assumed to be in human-readable format
output directly by the kernel (or ``trace-cmd report`` without ``-R``).
Support for this format is limited and some events might not be parsed
correctly (or at least without custom event parsers).
:type parser: object or None
:param plots_dir: directory where to save plots
:type plots_dir: str
:param sanitization_functions: This parameter is only for backward
compatibility with existing code, use
:meth:`lisa.trace.TraceBase.get_view` with ``process_df`` parameter
instead.
:type sanitization_functions: object
:param max_mem_size: Maximum memory usage to be used for dataframe cache.
Note that the peak memory usage can exceed that, as the cache can not
forcefully evict an object from memory (it can only drop references to it).
When ``None``, use illimited amount of memory.
:type max_mem_size: int or None
:param swap_dir: Swap directory used to store dataframes evicted from the
cache. When ``None``, a hidden directory along the trace file is used.
:type swap_dir: str or None
:param enable_swap: If ``True``, the on-disk swap is enabled.
:type enable_swap: bool
:param max_swap_size: Maximum size of the swap directory. When ``None``,
the max size is the size of the trace file.
:type max_swap_size: int or None
:Attributes:
* ``start``: The timestamp of the first trace event in the trace
* ``end``: The timestamp of the last trace event in the trace
* ``time_range``: Maximum timespan for all collected events
* ``window``: Conveniency tuple of ``(start, end)``.
* ``available_events``: Events available in the parsed trace, exposed
as some kind of set-ish smart container. Querying for event might
trigger the parsing of it.
* ``ana``: The analysis proxy used as an entry point to run analysis
methods on the trace. See :class:`lisa.analysis._proxy.AnalysisProxy`.
:Supporting more events:
Subclasses of :class:`TraceParserBase` can usually auto-detect the
event format, but there may be a number of reasons to pass a custom
event parser:
* The event format produced by a given kernel differs from the
description bundled with the parser, leading to incorrect parse
(missing field).
* The event cannot be parsed in raw format in case text output of
``trace-cmd`` is used, because of a ``const char*`` field displayed
as a pointer for example.
.. seealso:: For events not following the regular field syntax,
use :class:`CustomFieldsTxtEventParser`
* Automatic detection can take a heavy performance toll. This is
why parsers needing descriptions will come with pre-defined
descritption of most used events.
Custom event parsers can be passed as extra parameters to the parser,
which can be set manually::
# Event parsers provided by TxtTraceParser can also be overridden
# with user-provided ones in case they fail to parse what a given
# kernel produced
event_parsers = [
TxtEventParser('foobar', fields={'foo': int, 'bar': 'string'}, raw=True)
]
# Pre-fill the "event_parsers" parameter of
# TxtEventParser.from_dat() using a partial application.
#
# Note: you need to choose the parser appropriately for the
# format of the trace, since the automatic filetype detection is
# bypassed.
parser = TxtTraceParser.from_dat(event_parsers=event_parsers)
trace = Trace('foobar.dat', parser=parser)
.. warning:: Custom event parsers that are not types or functions (such
as partially-applied values) will tie the on-disc swap entries to
the parser :class:`Trace` instance, incurring higher
:class:`pandas.DataFrame` load time when a new :class:`Trace`
object is created.
"""
def _init(self, view, df_fmt):
self.__view = view
self._df_fmt = df_fmt or 'pandas'
def __init__(self, *args, df_fmt=None, **kwargs):
view = self._view_from_user_kwargs(*args, **kwargs)
self._init(view, df_fmt=df_fmt)
def _with_view(self, view, df_fmt):
new = super().__new__(self.__class__)
new._init(
view=view,
df_fmt=df_fmt or self._df_fmt
)
return new
@classmethod
def _view_from_user_kwargs(cls,
*args,
normalize_time=False,
strict_events=False,
events=None,
events_namespaces=('lisa', None),
sanitization_functions=None,
write_swap=None,
**kwargs,
):
view_kwargs = {}
if write_swap is not None:
_deprecated_warn('write_swap parameter has no effect anymore, you can stop using it')
if sanitization_functions is not None:
_deprecated_warn('Custom sanitization functions are not supported anymore, use trace.get_view(process_df=...) instead.')
def process_df(event, df):
try:
f = sanitization_functions[event]
except KeyError:
return df
else:
return f(view, event, df, dict())
view_kwargs.update(process_df=process_df)
view_kwargs.update(
normalize_time=normalize_time,
events_namespaces=events_namespaces,
strict_events=strict_events,
events=events,
)
trace = _Trace(*args, **kwargs)
view = trace.get_view(**view_kwargs)
assert isinstance(view, _TraceViewBase)
return view
@property
def _default_ana_params(self):
# In the user-visible analysis, we want to change some defaults that
# will improve the immediate experience, at the expense of good
# composition. For example, using ui=None means that a user calling a
# plot method twice will get 2 toolbars. but it can still be disabled
# manually. Since composition can sometimes suffer, the internal
# analysis proxy and the default values on plot methods are set to less
# friendly but more predictable defaults.
return dict(
# Default to displaying a toolbar in notebooks
output=None,
df_fmt=self._df_fmt,
)
# Memoize the analysis as the analysis themselves might memoize some
# things, so we don't want to trash that.
@property
@memoized
def ana(self):
# Import here to avoid a circular dependency issue at import time
# with lisa.analysis.base
# pylint: disable=import-outside-toplevel
from lisa.analysis._proxy import AnalysisProxy
return AnalysisProxy(self, params=self._default_ana_params)
@property
@memoized
def analysis(self):
# Import here to avoid a circular dependency issue at import time
# with lisa.analysis.base
# pylint: disable=import-outside-toplevel
from lisa.analysis._proxy import _DeprecatedAnalysisProxy
# self.analysis is deprecated so we can transition to using holoviews
# in all situations, even when the backend is matplotlib
return _DeprecatedAnalysisProxy(self, params=self._default_ana_params)
# Allow positional parameter for "window" for backward compat
[docs]
def get_view(self, window=None, *, df_fmt=None, **kwargs):
kwargs['window'] = window
view = self.__view.get_view(**kwargs)
# Always preserve the same user-visible type so that view types are
# 100% an implementation detail that does not leak.
return self._with_view(
view,
df_fmt=df_fmt
)
def __getattr__(self, attr):
return getattr(self.__view, attr)
@property
def trace_state(self):
return (
self.__view.trace_state,
self._df_fmt,
)
[docs]
def df_event(self, event, *, df_fmt=None, **kwargs):
df_fmt = df_fmt or self._df_fmt
df, meta = self.__view._internal_df_event(
event,
# Provide the information to the stack so that we can apply the
# legacy signals in _WindowTraceView for pandas format
df_fmt=df_fmt,
**kwargs
)
df = _df_to(df, fmt=df_fmt)
return df
def _internal_df_event(self, *args, **kwargs):
return self.__view._internal_df_event(*args, **kwargs)
def _preload_events(self, *args, **kwargs):
return self.__view._preload_events(*args, **kwargs)
[docs]
@classmethod
@contextlib.contextmanager
def from_target(cls, target, events=None, buffer_size=10240, filepath=None, **kwargs):
"""
Context manager that can be used to collect a
:class:`lisa.trace.TraceBase` directly from a
:class:`lisa.target.Target` without needing to setup an
:class:`FtraceCollector`.
**Example**::
from lisa.trace import Trace
from lisa.target import Target
target = Target.from_default_conf()
with Trace.from_target(target, events=['sched_switch', 'sched_wakeup']) as trace:
target.execute('echo hello world')
# DO NOT USE trace object inside the `with` statement
trace.ana.tasks.plot_tasks_total_residency(filepath='plot.png')
:param target: Target to connect to.
:type target: Target
:param events: ftrace events to collect and parse in the trace.
:type events: list(str)
:param buffer_size: Size of the ftrace ring buffer.
:type buffer_size: int
:param filepath: If set, the trace file will be saved at that location.
Otherwise, a temporary file is created and removed as soon as the
parsing is finished.
:type filepath: str or None
:Variable keyword arguments: Forwarded to :class:`Trace`.
"""
plat_info = target.plat_info
needs_temp = filepath is None
class _TraceNotSet:
def __getattribute__(self, attr):
raise RuntimeError('The trace instance can only be used after the end of the "with" statement.')
class _TraceProxy(TraceBase):
def __init__(self, path):
self.__base_trace = _TraceNotSet()
if path is not None:
# Delete the file once we are done accessing it
self.__deallocator = _Deallocator(
f=functools.partial(_file_cleanup, paths=[path]),
on_del=True,
at_exit=True,
)
def __getattr__(self, attr):
return getattr(self.__base_trace, attr)
@property
def ana(self):
return self.__base_trace.ana
@property
def analysis(self):
return self.__base_trace.analysis
def df_event(self, *args, **kwargs):
return self.__base_trace.df_event(*args, **kwargs)
def _internal_df_event(self, *args, **kwargs):
return self.__base_trace._internal_df_event(*args, **kwargs)
def _preload_events(self, *args, **kwargs):
return self.__base_trace._preload_events(*args, **kwargs)
if needs_temp:
@contextlib.contextmanager
def cm_func():
with tempfile.NamedTemporaryFile(suffix='.dat', delete=False) as temp:
yield temp.name
cm = cm_func()
else:
cm = nullcontext(filepath)
with cm as path:
proxy = _TraceProxy(path if needs_temp else None)
ftrace_coll = FtraceCollector(target, events=events, buffer_size=buffer_size, output_path=path)
with ftrace_coll:
yield proxy
trace = cls(
path,
events=events,
strict_events=True,
plat_info=plat_info,
**kwargs
)
# pylint: disable=attribute-defined-outside-init
proxy._TraceProxy__base_trace = trace
[docs]
@classmethod
def get_event_sources(cls, *args, **kwargs):
return _Trace.get_event_sources(*args, **kwargs)
[docs]
class TraceEventCheckerBase(abc.ABC, Loggable, Sequence):
"""
ABC for events checker classes.
Event checking can be achieved using a boolean expression on expected
events.
:param check: Check that the listed events are present in the
``self.trace`` attribute of the instance on which the decorated
method is applied. If no such attribute is found, no check will be
done.
:type check: bool
"""
def __init__(self, check=True, checkers=None):
self.check = check
self.checkers = checkers or []
@abc.abstractmethod
def _select_events(self, check, event_set):
return set()
[docs]
def check_events(self, event_set, check_optional=False, namespaces=None):
"""
Check that certain trace events are available in the given set of
events.
:return: Set of events selected by the checker in ``event_set``.
:raises: MissingTraceEventError if some events are not available. The
exception must be raised after inspecting children node and combine
their missing events so that the resulting exception is accurate.
"""
if check_optional:
def rewrite(checker):
if isinstance(checker, OptionalTraceEventChecker):
return AndTraceEventChecker(checker.checkers)
else:
return checker
checker = self.map(rewrite)
else:
checker = self
if isinstance(event_set, _AvailableTraceEventsSet):
# Note that this will lead to selecting an event if one of its
# namespaced names is available. This is expected, but the
# check_events() returned set will also contain that non-namespaced
# name, rather than the actual namespaced name that exists.
trace = event_set._trace
view = trace.get_view(events_namespaces=namespaces)
event_set = view.available_events
else:
checker = checker.expand_namespaces(namespaces)
def check(event):
return event in event_set
return checker._select_events(check=check, event_set=event_set)
[docs]
@abc.abstractmethod
def get_all_events(self):
"""
Return a set of all events that are checked by this checker.
That may be a superset of events that are strictly required, when the
checker checks a logical OR combination of events for example.
"""
[docs]
@abc.abstractmethod
def map(self, f):
"""
Apply the given function to all the children and rebuild a new object
with the result.
"""
[docs]
def expand_namespaces(self, namespaces=None):
"""
Build a :class:`TraceEventCheckerBase` that will fixup the event names
to take into account the given namespaces.
"""
expand = lambda event: _NamespaceTraceView._do_expand_namespaces(event, namespaces=namespaces)
return self._expand_namespaces(expand)
def _expand_namespaces(self, expand):
def fixup(checker):
if isinstance(checker, TraceEventChecker):
event = checker.event
namespaced = expand(event)
# fnmatch patterns need to be AND-ed so that we can collect all
# the events matching that pattern, in all namespaces.
if checker._is_pattern:
return AndTraceEventChecker.from_events(namespaced)
else:
return OrTraceEventChecker.from_events(namespaced)
else:
return checker
return self.map(fixup)
[docs]
def __call__(self, f):
"""
Decorator for methods that require some given trace events
:param events: The list of required events
:type events: list(str or TraceEventCheckerBase)
The decorated method must operate on instances that have a ``self.trace``
attribute.
If some event requirements have already been defined for it (it has a
`used_events` attribute, i.e. it has already been decorated), these
will be combined with the new requirements using an
:class`AndTraceEventChecker`.
"""
def unwrap_down_to(obj):
return hasattr(obj, 'used_events')
try:
# we want to see through all other kinds of wrappers, down to the
# one that matters to us
unwrapped_f = inspect.unwrap(f, stop=unwrap_down_to)
used_events = unwrapped_f.used_events
except AttributeError:
checker = self
else:
# Update the existing checker inplace to avoid adding an extra
# level of wrappers.
checker = AndTraceEventChecker([self, used_events])
unwrapped_f.used_events = checker
return f
sig = inspect.signature(f)
if self.check and sig.parameters:
@wraps(f)
def wrapper(self, *args, **kwargs):
try:
trace = self.trace
# If there is no "trace" attribute, silently skip the check. This
# allows using the decorator for documentation and chaining purpose
# without having an actual trace to work on.
except AttributeError:
pass
else:
# Preload all the events that can be useful for that
# function. This will allow loading them in parallel,
# speeding up individual trace.df_event() calls
view = trace.get_view(events=checker.get_all_events())
checker.check_events(view.available_events)
return f(self, *args, **kwargs)
# If the decorated object takes no parameters, we cannot check anything
else:
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
# Set an attribute on the wrapper itself, so it can be e.g. added
# to the method documentation
wrapper.used_events = checker
return wrapper
@abc.abstractmethod
def __bool__(self):
pass
def __iter__(self):
return iter(sorted(self.get_all_events()))
def __getitem__(self, i):
return sorted(self.get_all_events())[i]
def __contains__(self, event):
return event in self.get_all_events()
def __len__(self, event):
return len(self.get_all_events())
[docs]
def __and__(self, other):
"""
Combine two event checkers into one that checks the presence of both.
.. seealso:: :class:`AndTraceEventChecker`
"""
return AndTraceEventChecker([self, other])
[docs]
def __or__(self, other):
"""
Combine two event checkers into one that checks the presence of either
of them.
.. seealso:: :class:`OrTraceEventChecker`
"""
return OrTraceEventChecker([self, other])
[docs]
def __matmul__(self, other):
"""
Combine two event checkers into an optional one.
.. seealso:: :class:`OptionalTraceEventChecker`
"""
return OptionalTraceEventChecker([self, other])
@abc.abstractmethod
def _str_internal(self, style=None, wrapped=True):
"""
Format the boolean expression that this checker represents.
:param style: When 'rst', a reStructuredText output is expected
:type style: str
:param wrapped: When True, the expression should be wrapped with
parenthesis so it can be composed with other expressions.
:type wrapped: bool
"""
[docs]
def doc_str(self):
"""
Top-level function called by Sphinx's autodoc extension to augment
docstrings of the functions.
"""
return f"\n * {self._str_internal(style='rst', wrapped=False)}"
def __str__(self):
return self._str_internal()
[docs]
class TraceEventChecker(TraceEventCheckerBase):
"""
Check for one single event.
:param event: Name of the event to check for.
:type event: str
:param check: Check that the listed events are present in the
``self.trace`` attribute of the instance on which the decorated
method is applied. If no such attribute is found, no check will be
done.
:type check: bool
"""
def __init__(self, event, check=True):
super().__init__(check=check)
self.event = event
def __bool__(self):
return True
[docs]
def get_all_events(self):
return {self.event}
def _select_events(self, check, event_set):
event = self.event
if check(event):
return {event}
else:
raise MissingTraceEventError(self, available_events=event_set)
def _str_internal(self, style=None, wrapped=True):
template = '``{}``' if style == 'rst' else '{}'
return template.format(self.event)
@property
def _is_pattern(self):
s = self.event
return '*' in s or '?' in s or '[' in s or ']' in s
[docs]
def map(self, f):
return f(self)
[docs]
class EmptyTraceEventChecker(TraceEventCheckerBase):
"""
Check for no event at all.
:param check: Check that the listed events are present in the
``self.trace`` attribute of the instance on which the decorated
method is applied. If no such attribute is found, no check will be
done.
:type check: bool
"""
def __init__(self, check=True):
super().__init__(check=check)
def __bool__(self):
return False
[docs]
def get_all_events(self):
return set()
def _select_events(self, check, event_set):
return set()
def _str_internal(self, style=None, wrapped=True):
return '<no event>'
[docs]
def map(self, f):
return f(self)
[docs]
class AssociativeTraceEventChecker(TraceEventCheckerBase):
"""
Base class for associative operators like `and` and `or`
"""
def __init__(self, op_str, event_checkers, check=True, prefix_str=''):
super().__init__(check=check)
checker_list = []
event_checkers = event_checkers or []
for checker in event_checkers:
# "unwrap" checkers of the same type, to avoid useless levels of
# nesting. This is valid since the operator is known to be
# associative. We don't use isinstance to avoid merging checkers
# that may have different semantics.
if type(checker) is type(self):
checker_list.extend(checker.checkers)
else:
checker_list.append(checker)
# Aggregate them separately to avoid having multiple of them, since
# they can appear anywhere in the expression tree with the exact same
# overall effect
optional_checker_list = [
checker
for checker in checker_list
if isinstance(checker, OptionalTraceEventChecker)
]
checker_list = [
checker
for checker in checker_list
if checker not in optional_checker_list
]
if optional_checker_list:
checker_list.append(OptionalTraceEventChecker(optional_checker_list))
# Avoid having the same event twice at the same level
def key(checker):
if isinstance(checker, TraceEventChecker):
return checker.event
else:
return checker
checker_list = deduplicate(checker_list, key=key)
self.checkers = checker_list
self.op_str = op_str
self.prefix_str = prefix_str
def __bool__(self):
return any(map(bool, self.checkers))
[docs]
def map(self, f):
new = copy.copy(self)
new = f(new)
new.checkers = [
checker.map(f)
for checker in new.checkers
]
return new
[docs]
@memoized
def get_all_events(self):
events = set()
for checker in self.checkers:
events.update(checker.get_all_events())
return events
[docs]
@classmethod
def from_events(cls, events, **kwargs):
"""
Build an instance of the class, converting ``str`` to
``TraceEventChecker``.
:param events: Iterable of events
:type events: list(str or TraceEventCheckerBase)
"""
def make_event(e):
if isinstance(e, TraceEventCheckerBase):
return e
else:
return TraceEventChecker(e)
return cls(map(make_event, events), **kwargs)
def _str_internal(self, style=None, wrapped=True):
op_str = f' {self.op_str} '
unwrapped_str = self.prefix_str + op_str.join(
c._str_internal(style=style, wrapped=True)
# Sort for stable output
for c in self.checkers
)
template = '({})' if len(self.checkers) > 1 and wrapped else '{}'
return template.format(unwrapped_str)
[docs]
class OrTraceEventChecker(AssociativeTraceEventChecker):
"""
Check that one of the given event checkers is satisfied.
:param event_checkers: Event checkers to check for
:type event_checkers: list(TraceEventCheckerBase)
"""
def __init__(self, event_checkers=None, **kwargs):
super().__init__('or', event_checkers, **kwargs)
def _select_events(self, check, event_set):
if self.checkers:
failed_checkers = []
for checker in self.checkers:
try:
return checker._select_events(check=check, event_set=event_set)
except MissingTraceEventError as e:
failed_checkers.extend(e.missing_events)
cls = type(self)
raise MissingTraceEventError(
cls.from_events(failed_checkers),
available_events=event_set,
)
else:
return set()
class _OptionalTraceEventCheckerBase(AssociativeTraceEventChecker):
_PREFIX_STR = None
def __init__(self, event_checkers=None, **kwargs):
super().__init__(',', event_checkers, prefix_str=self._PREFIX_STR, **kwargs)
def _select_events(self, check, event_set):
selected = set()
for checker in self.checkers:
try:
selected.update(
checker._select_events(check=check, event_set=event_set)
)
except MissingTraceEventError as e:
pass
return selected
[docs]
class OptionalTraceEventChecker(_OptionalTraceEventCheckerBase):
"""
Do not check anything, but exposes the information that the events may be
used if present.
:param event_checkers: Event checkers that may be used
:type event_checkers: list(TraceEventCheckerBase)
"""
_PREFIX_STR = 'optional: '
[docs]
class DynamicTraceEventChecker(_OptionalTraceEventCheckerBase):
"""
Do not check anything, but exposes the information that one of the group of
events will be used.
This allows an API to manually decide which group is chosen based on its
parameters, but will still convey the information that they are not really
optional.
:param event_checkers: Event checkers that may be used
:type event_checkers: list(TraceEventCheckerBase)
"""
_PREFIX_STR = 'one group of: '
[docs]
class AndTraceEventChecker(AssociativeTraceEventChecker):
"""
Check that all the given event checkers are satisfied.
:param event_checkers: Event checkers to check for
:type event_checkers: list(TraceEventCheckerBase)
"""
def __init__(self, event_checkers=None, **kwargs):
super().__init__('and', event_checkers, **kwargs)
def _select_events(self, check, event_set):
if self.checkers:
failed_checkers = []
selected = set()
for checker in self.checkers:
try:
selected.update(
checker._select_events(check=check, event_set=event_set)
)
except MissingTraceEventError as e:
failed_checkers.extend(e.missing_events)
if failed_checkers:
cls = type(self)
raise MissingTraceEventError(
cls.from_events(failed_checkers),
available_events=event_set,
)
else:
return selected
else:
return set()
[docs]
def doc_str(self):
joiner = '\n' + ' '
rst = joiner + joiner.join(
f"* {c._str_internal(style='rst', wrapped=False)}"
# Sort for stable output
for c in self.checkers
)
return rst
[docs]
def requires_events(*events, **kwargs):
"""
Decorator for methods that require some given trace events.
:param events: The list of required events
:type events: list(str or TraceEventCheckerBase)
:param check: Check that the listed events are present in the
``self.trace`` attribute of the instance on which the decorated method
is applied. If no such attribute is found, no check will be done.
:type check: bool
"""
return AndTraceEventChecker.from_events(events, **kwargs)
[docs]
def requires_one_event_of(*events, **kwargs):
"""
Same as :func:`requires_events` with logical `OR` semantic.
"""
return OrTraceEventChecker.from_events(events, **kwargs)
[docs]
def may_use_events(*events, **kwargs):
"""
Same as :func:`requires_events` but just exposes some events that may be used
if presents.
"""
return OptionalTraceEventChecker.from_events(events, **kwargs)
[docs]
def will_use_events_from(*events, **kwargs):
"""
Same as :func:`requires_events` but just exposes some events groups that
will be used, depending on some dynamic factor.
"""
return DynamicTraceEventChecker.from_events(events, **kwargs)
[docs]
class DroppedTraceEventError(Exception):
"""
Raised when some events were dropped from the ftrace ring buffer because of
lack of space.
"""
[docs]
class MissingTraceEventError(RuntimeError, ValueError):
"""
:param missing_events: The missing trace events
:type missing_events: TraceEventCheckerBase or list(str)
"""
def __init__(self, missing_events, available_events=None, msg='Trace is missing the following required events: {missing_events}{available}'):
# pylint: disable=super-init-not-called
if not isinstance(missing_events, TraceEventCheckerBase):
missing = sorted(missing_events)
try:
event, = missing
except ValueError:
missing_events = AndTraceEventChecker.from_events(
missing
)
else:
missing_events = TraceEventChecker(event)
self._template = msg
self.missing_events = missing_events
# Forcibly turn into a list, to avoid carrying around an
# _AvailableTraceEventsSet with its Trace instance
self.available_events = sorted(available_events or [])
def __str__(self):
available = self.available_events
missing = self.missing_events
available_str = ''
if available:
closest = {
matches[0]
for matches in (
get_close_matches(event, available, n=1)
for event in missing
)
if matches
}
if closest:
available_str = '. Closest available matches are: {}'.format(
', '.join(sorted(closest)),
)
return self._template.format(missing_events=missing, available=available_str)
[docs]
class FtraceConf(SimpleMultiSrcConf, HideExekallID):
"""
Configuration class of :class:`FtraceCollector`
Available keys:
{generated_help}
{yaml_example}
"""
STRUCTURE = TopLevelKeyDesc('ftrace-conf', 'FTrace configuration', (
KeyDesc('events', 'FTrace events to trace', [typing.Sequence[str], TraceEventCheckerBase]),
KeyDesc('events-namespaces', 'FTrace events namespaces to use. See Trace namespace constructor parameter.', [typing.Sequence[typing.Union[str, None]], None]),
KeyDesc('functions', 'FTrace functions to trace', [typing.Sequence[str]]),
KeyDesc('buffer-size', 'FTrace buffer size', [int]),
KeyDesc('trace-clock', 'Clock used while tracing (see "trace_clock" in ftrace.txt kernel doc)', [str, None]),
KeyDesc('saved-cmdlines-nr', 'Number of saved cmdlines with associated PID while tracing', [int]),
KeyDesc('tracer', 'FTrace tracer to use', [str, None]),
LevelKeyDesc('modules', 'Kernel modules settings', (
KeyDesc('auto-load', 'Compile kernel modules and load them automatically based on the events that are needed.', [bool]),
)),
))
[docs]
def add_merged_src(self, src, conf, optional_events=False, **kwargs):
"""
Merge-in a configuration source.
:param src: Name of the merged source
:type src: str
:param conf: Conf to merge in
:type conf: FtraceConf or dict(str, object)
:param optional_events: If ``True``, the events brought by ``conf``
will be wrapped in :class:`OptionalTraceEventChecker`. This avoids
failing just because the user asked for extra events that are not
present in the kernel.
:type optional_events: bool
"""
def get_key(conf, key):
return conf.get_key(key, quiet=True)
def get_key_default(conf, key, default):
try:
return get_key(conf, key)
except KeyError:
return default
if not isinstance(conf, self.__class__):
conf = self.__class__(conf=conf)
def merge_conf(key, val, path):
new = _merge_conf(key, val, path)
try:
existing = get_nested_key(self, path + [key], getitem=get_key)
except KeyError:
return (True, new)
else:
# Only add to the source if the result is different than what
# is already set
return (existing != new, new)
def _merge_conf(key, val, path):
def non_mergeable(key):
if get_key_default(self, key, val) == val:
return val
else:
raise KeyError(f'Cannot merge key "{key}": incompatible values specified: {self[key]} != {val}')
if key == 'functions':
return sorted(set(val) | set(get_key_default(self, key, [])))
elif key == 'events-namespaces':
# We already applied the namespaces to the events so the result
# can be cleanly merged according to the original meaning.
return []
elif key == 'events':
if not isinstance(val, TraceEventCheckerBase):
val = AndTraceEventChecker.from_events(val)
if optional_events:
val = OptionalTraceEventChecker([val])
# Merging has to take into account defaults, as we will then
# set the namespace to be empty (None, )
def get(conf, key):
try:
return get_key(conf, key)
except KeyError:
return conf.DEFAULT_SRC.get(key)
val = val.expand_namespaces(
namespaces=get(conf, 'events-namespaces')
)
self_val = get_key_default(self, key, [])
if not isinstance(self_val, TraceEventCheckerBase):
self_val = AndTraceEventChecker.from_events(self_val)
self_val = self_val.expand_namespaces(
namespaces=get(self, 'events-namespaces')
)
return AndTraceEventChecker([val, self_val])
elif key == 'buffer-size':
return max(val, get_key_default(self, key, 0))
elif key == 'trace-clock':
return non_mergeable(key)
elif key == 'saved-cmdlines-nr':
return max(val, get_key_default(self, key, 0))
elif key == 'tracer':
return non_mergeable(key)
elif key == 'modules':
return merge_level(val, path + [key])
elif key == 'auto-load':
return non_mergeable(key)
else:
raise KeyError(f'Cannot merge key "{key}"')
def merge_level(conf, path=[]):
return {
k: v
for k, (keep, v) in (
(k, merge_conf(k, v, path))
for k, v in conf.items()
)
if keep
}
merged = merge_level(conf)
# Namespaces were expanded in events directly so we want to ensure they
# will not undergo expansion again if there are cascading merges.
merged['events-namespaces'] = []
# We merge some keys with their current value in the conf
return self.add_src(src, conf=merged, **kwargs)
[docs]
class CollectorBase(Loggable):
"""
Base class for :class:`devlib.collector.CollectorBase`-based collectors
using composition.
"""
TOOLS = []
"""
Sequence of tools to install on the target when using the collector.
"""
NAME = None
"""
Name of the collector class.
"""
_COMPOSITION_ORDER = 0
"""
Order in which context managers are composed. ``0`` will be used as the
innermost context manager.
"""
def __init__(self, collector, output_path=None):
self._collector = collector
self._output_path = output_path
self._install_tools(collector.target)
def _install_tools(self, target):
target.install_tools(self.TOOLS)
def __getattr__(self, attr):
return getattr(self._collector, attr)
def __enter__(self):
self._collector.__enter__()
return self
def __exit__(self, *args, **kwargs):
# How did we get some coconuts ?
swallow = self._collector.__exit__(*args, **kwargs)
try:
self.get_data()
except ValueError:
pass
return swallow
[docs]
def get_data(self, path=None):
"""
Similar to :meth:`devlib.collector.CollectorBase.get_data` but takes
the path directly as a parameter in order to disallow representing an
invalid state where no path has been set.
"""
coll = self._collector
path = path or self._output_path
if path is None:
raise ValueError('Path cannot be None')
coll.set_output(path)
return coll.get_data()
[docs]
@deprecate(replaced_by=get_data, deprecated_in='2.0', removed_in='4.0')
def get_trace(self, path):
"""
Deprecated alias for :meth:`get_data`.
"""
return self.get_data(path)
[docs]
class ComposedCollector(Mapping):
"""
Compose multiple :class:`lisa.trace.CollectorBase` together.
When used as a context manager, collectors will be nested. Individual
collectors can be retrieved by using the instance as a mapping, using the
collectors' ``NAME`` attribute as key.
.. note:: Only one collector of each type is allowed. This allows:
* Getting back the collector instance using a fixed name.
* Some collectors like :class:`lisa.trace.DmesgCollector` are not
re-entrant
"""
_COMPOSITION_ORDER = 100
def __init__(self, collectors):
collectors = list(collectors)
if len(set(map(attrgetter('NAME'), collectors))) != len(collectors):
raise ValueError('Collectors of the same type cannot be composed together')
_collectors = {
c.NAME: c
for c in collectors
if hasattr(c, 'NAME')
}
self._collectors = _collectors
self._cm = ComposedContextManager(collectors)
def __enter__(self):
self._cm.__enter__()
return self
def __exit__(self, *args, **kwargs):
return self._cm.__exit__(*args, **kwargs)
def __getitem__(self, key):
return self._collectors[key]
def __iter__(self):
return iter(self._collectors)
def __len__(self):
return len(self._collectors)
[docs]
class FtraceCollector(CollectorBase, Configurable):
"""
Thin wrapper around :class:`devlib.collector.ftrace.FtraceCollector`.
.. note:: Events are expected to be provided by the target's kernel, but if
they are not :class:`lisa._kmod.LISADynamicKmod` will build a
kernel module to attempt to satisfy the missing events. This will
typically require correct target setup, see
:class:`lisa.target.TargetConf` ``kernel/src`` configurations.
{configurable_params}
"""
NAME = 'ftrace'
CONF_CLASS = FtraceConf
INIT_KWARGS_KEY_MAP = {
'kmod_auto_load': ['modules', 'auto-load'],
}
TOOLS = ['trace-cmd']
_COMPOSITION_ORDER = 0
def __init__(self, target, *, events=None, functions=None, buffer_size=10240, output_path=None, autoreport=False, trace_clock=None, saved_cmdlines_nr=8192, tracer=None, kmod_auto_load=True, events_namespaces=('lisa', None), **kwargs):
kconfig = target.plat_info['kernel']['config']
if not kconfig.get('FTRACE'):
raise ValueError("The target's kernel needs CONFIG_FTRACE=y kconfig enabled")
tracing_path = devlib.FtraceCollector.find_tracing_path(target)
target_available_events, avoided = self._target_available_events(target, tracing_path)
kmod = target.get_kmod(LISADynamicKmod)
# Get the events possibly defined in the module. Note that it's a
# superset of the events actually defined as this is based on pretty
# crude filtering of the source files, rather than analyzing the events
# actually defined in the .ko after it has been compiled.
kmod_available_events = set(kmod.possible_events)
# Events provided by the module are namespaced and therefore should
# never overlap with the target, so we never want to provide it via the
# module if it already exists on the target.
kmod_available_events -= target_available_events
available_events = target_available_events | kmod_available_events
if events is None:
events_checker = EmptyTraceEventChecker()
elif isinstance(events, TraceEventCheckerBase):
events_checker = events
else:
events_checker = AndTraceEventChecker.from_events(sorted(set(events)))
def rewrite(checker):
if isinstance(checker, TraceEventChecker):
# Turn meta events into their source event
return OrTraceEventChecker.from_events(
Trace.get_event_sources(checker.event)
)
elif isinstance(checker, DynamicTraceEventChecker):
return AndTraceEventChecker(checker.checkers)
else:
return checker
def wildcard(checker):
if isinstance(checker, TraceEventChecker) and checker._is_pattern:
# Make fnmatch pattern optional
return OptionalTraceEventChecker.from_events(
fnmatch.filter(
available_events,
checker.event
)
)
else:
return checker
meta_events = {
event
for event in events_checker.get_all_events()
if _Trace._is_meta_event(event)
}
events_checker = events_checker.map(rewrite)
events_checker = events_checker.expand_namespaces(namespaces=events_namespaces)
# Expand the wildcards after having expanded the namespaces.
events_checker = events_checker.map(wildcard)
self.logger.debug(f'Will try to collect events: {events_checker}')
# Select the events, after having expanded the namespaces
events = events_checker.check_events(available_events)
functions = functions or []
trace_clock = trace_clock or 'global'
kwargs.update(
target=target,
events=sorted(events),
functions=functions,
buffer_size=buffer_size,
autoreport=autoreport,
trace_clock=trace_clock,
saved_cmdlines_nr=saved_cmdlines_nr,
tracer=tracer,
)
self.check_init_param(**kwargs)
if functions and not kconfig.get('FUNCTION_TRACER'):
raise ValueError(f"The target's kernel needs CONFIG_FUNCTION_TRACER=y kconfig enabled in order to trace functions: {functions}")
if 'funcgraph_entry' in events or 'funcgraph_exit' in events:
tracer = 'function_graph' if tracer is None else tracer
# If some events are not already available on that kernel, look them up
# in custom modules
needed_from_kmod = kmod_available_events & events
kmod_defined_events = set()
kmod_cm = None
if needed_from_kmod:
# If anything wrong happens, we will be restricted to the events
# already available.
events = events & target_available_events
if kmod_auto_load:
self.logger.info(f'Building kernel module to try to provide the following events that are not currently available on the target: {", ".join(sorted(needed_from_kmod))}')
try:
kmod_defined_events, provided, kmod_cm = self._get_kmod(
target,
target_available_events=target_available_events,
needed_events=needed_from_kmod,
)
except Exception as e:
try:
events_checker.check_events(events)
except MissingTraceEventError as e:
raise MissingTraceEventError(
e.missing_events,
available_events=target_available_events,
msg='Ftrace events are missing in the kernel: {missing_events}{available}. Kernel module build was attempted to provide missing events but failed'
) from e
try:
events_checker.check_events(events, check_optional=True)
except MissingTraceEventError as e:
self.logger.error(f'Could not build the kernel module to provide optional events: {e}. Use kmod_auto_load=False to disable automatic module build.')
else:
# Module build went well, we add the events effectively
# provided by the module
events = events | provided
else:
try:
events_checker.check_events(events)
except MissingTraceEventError as e:
raise MissingTraceEventError(
e.missing_events,
available_events=target_available_events,
msg='Ftrace events are missing in the kernel: {missing_events}{available}. Use kmod_auto_load=True to enable automatic module build to provide these events.',
)
self._kmod_cm = kmod_cm
############################################
# Final checks after we enabled all we could
############################################
try:
events_checker.check_events(events)
except MissingTraceEventError as e:
raise MissingTraceEventError(
e.missing_events,
available_events=target_available_events | kmod_defined_events,
msg='Ftrace events are missing: {missing_events}{available}',
)
try:
events_checker.check_events(events, check_optional=True)
except MissingTraceEventError as e:
e = MissingTraceEventError(
e.missing_events,
available_events=target_available_events | kmod_defined_events,
msg='{missing_events}{available}',
)
self.logger.info(f'Optional events missing: {e}')
if not events:
raise ValueError('No ftrace events selected')
self.events = sorted(events | meta_events)
# Some events are "special" and cannot be disabled or enabled. We
# therefore cannot pass them to trace-cmd.
events -= avoided
# trace-cmd fails if passed no events, which is an issue since we
# cannot pass e.g. "print" event.
if not events:
events |= {'sched_switch'}
events = sorted(events)
self._cm = None
# Install the tools before creating the collector, as devlib will check
# for it
self._install_tools(target)
# Only pass true kernel events to devlib, as it will reject any other.
kwargs.update(
events=events,
tracer=tracer,
)
# We need to install the kmod when initializing the devlib object, so
# that the available events are accurate.
with self._make_cm(record=False):
collector = devlib.FtraceCollector(
# Prevent devlib from pushing its own trace-cmd since we provide
# our own binary
no_install=True,
tracing_path=tracing_path,
strict=True,
**kwargs
)
super().__init__(collector, output_path=output_path)
@classmethod
def _get_kmod(cls, target, target_available_events, needed_events):
logger = cls.get_logger()
kmod = target.get_kmod(LISADynamicKmod)
defined_events = set(kmod.defined_events)
possible_events = set(kmod.possible_events)
logger.debug(f'Kernel module possible events: {possible_events}')
logger.debug(f'Kernel module defined events: {defined_events}')
needed = needed_events & defined_events
if needed:
overlapping = defined_events & target_available_events
if overlapping:
raise ValueError(f'Events defined in {mod.src.mod_name} ({", ".join(needed)}) are needed but some events overlap with the ones already provided by the kernel: {", ".join(overlapping)}')
else:
return (
defined_events,
needed,
functools.partial(
kmod.run,
kmod_params={
'features': sorted(kmod._event_features(needed))
}
)
)
else:
return (defined_events, set(), None)
@contextlib.contextmanager
def _make_cm(self, record=True):
with contextlib.ExitStack() as stack:
kmod_cm = self._kmod_cm
if kmod_cm is not None:
stack.enter_context(kmod_cm())
if record:
proxy = super()
class RecordCM:
def __enter__(self):
return proxy.__enter__()
def __exit__(self, *args, **kwargs):
return proxy.__exit__(*args, **kwargs)
stack.enter_context(RecordCM())
yield
def __enter__(self):
self._cm = self._make_cm()
return self._cm.__enter__()
def __exit__(self, *args, **kwargs):
try:
x = self._cm.__exit__(*args, **kwargs)
finally:
self._cm = None
return x
[docs]
def get_data(self, *args, **kwargs):
if self._kmod_cm and not self._cm:
raise ValueError('FtraceCollector.get_data() cannot be called after the kernel module was unloaded.')
else:
return super().get_data(*args, **kwargs)
@staticmethod
def _target_available_events(target, tracing_path):
events = target.read_value(target.path.join(tracing_path, 'available_events'))
# trace-cmd start complains if given these events, even though they are
# valid
avoided = set(target.list_directory(target.path.join(tracing_path, 'events', 'ftrace')))
available = set(
event.split(':', 1)[1]
for event in events.splitlines()
)
# These events are available, but we still cannot pass them to trace-cmd record
available.update(avoided)
return (available, avoided)
[docs]
@classmethod
@kwargs_forwarded_to(__init__)
def from_conf(cls, target, conf, **kwargs):
"""
Build an :class:`FtraceCollector` from a :class:`FtraceConf`
:param target: Target to use when collecting the trace
:type target: lisa.target.Target
:param conf: Configuration object
:type conf: FtraceConf
:Variable keyword arguments: Forwarded to ``__init__``.
"""
cls.get_logger().info(f'Ftrace configuration:\n{conf}')
_kwargs = cls.conf_to_init_kwargs(conf)
_kwargs['target'] = target
_kwargs.update(kwargs)
cls.check_init_param(**_kwargs)
return cls(**_kwargs)
[docs]
@classmethod
@kwargs_forwarded_to(
from_conf,
ignore=['conf']
)
def from_user_conf(cls, target, base_conf=None, user_conf=None, merged_src='merged', **kwargs):
"""
Build an :class:`FtraceCollector` from two :class:`FtraceConf`.
``base_conf`` is expected to contain the minimal configuration, and
``user_conf`` some additional settings that are used to augment the
base configuration.
:param target: Target to use when collecting the trace
:type target: lisa.target.Target
:param base_conf: Base configuration object, merged with ``user_conf``.
:type base_conf: FtraceConf
:param user_conf: User configuration object
:type user_conf: FtraceConf
:param merged_src: Name of the configuration source created by merging
``base_conf`` and ``user_conf``
:type merged_src: str
:Other keyword arguments: Forwarded to :meth:`from_conf`.
"""
user_conf = user_conf or FtraceConf()
base_conf = base_conf or FtraceConf()
# Make a copy of the conf, since it may be shared by multiple classes
conf = copy.copy(base_conf)
# Merge user configuration with the test's configuration
conf.add_merged_src(
src=merged_src,
conf=user_conf,
optional_events=True,
)
return cls.from_conf(target, conf, **kwargs)
[docs]
class DmesgCollector(CollectorBase):
"""
Wrapper around :class:`devlib.collector.dmesg.DmesgCollector`.
It installs the ``dmesg`` tool automatically on the target upon creation,
so we know what version is being is used.
"""
NAME = 'dmesg'
TOOLS = ['dmesg']
LOG_LEVELS = devlib.DmesgCollector.LOG_LEVELS
_COMPOSITION_ORDER = 10
@kwargs_dispatcher(
{
CollectorBase: 'init_kwargs',
devlib.DmesgCollector: 'devlib_kwargs',
},
ignore=['collector'],
)
def __init__(self, target, init_kwargs, devlib_kwargs):
collector = devlib.DmesgCollector(**devlib_kwargs)
super().__init__(collector=collector, **init_kwargs)
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80