Source code for lisa.datautils

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2019, Arm Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import re
import functools
import operator
import math
import itertools
import warnings
import contextlib
import uuid
from operator import attrgetter
import decimal
from numbers import Number
import weakref
import threading

import polars as pl
import numpy as np
import pandas as pd
import pandas.api.extensions
import scipy.integrate
import scipy.signal
import pyarrow

from lisa.utils import TASK_COMM_MAX_LEN, groupby, deprecate, order_as


[docs] class Timestamp(float): """ Nanosecond-precision timestamp. It inherits from ``float`` and as such can be manipulating as a floating point number of seconds. The ``nanoseconds`` attribute allows getting the exact timestamp regardless of the magnitude of the float, allowing for more precise computation. :param unit: Unit of the ``ts`` value being passed. One of ``"s"``, ``"ms"``, ``"us"`` and ``"ns"``. :type unit: str :param rounding: How to round the value when converting to float. Timestamps of large magnitude will suffer from the loss of least significant digits in their float value which will not have nanosecond precision. The rounding determines if a value below or above the actual nanosecond-precision timestamp should be used. One of ``"up"`` or ``"down"``. :type rounding: str """ __slots__ = ('as_nanoseconds', '_Timestamp__rounding') _MUL = dict( ns=decimal.Decimal('1'), us=decimal.Decimal('1e3'), ms=decimal.Decimal('1e6'), s=decimal.Decimal('1e9'), ) _1NS = decimal.Decimal('1e-9')
[docs] def __new__(cls, ts, unit='s', rounding='down'): if isinstance(ts, cls): return cls(ts.as_nanoseconds, unit='ns', rounding=rounding) else: ts = decimal.Decimal(ts) try: mul = cls._MUL[unit] except KeyError: raise ValueError(f'Unknown unit={unit}') ns = mul * ts if rounding == 'up': ns = math.ceil(ns) elif rounding == 'down': ns = math.floor(ns) else: raise ValueError(f'Unknown rounding={rounding}') s = ns * cls._1NS float_s = float(s) if float_s > s and rounding == 'down': float_s = float(np.nextafter(float_s, -math.inf)) elif float_s < s and rounding == 'up': float_s = float(np.nextafter(float_s, math.inf)) self = super().__new__(cls, float_s) self.as_nanoseconds = ns self.__rounding = rounding return self
def _with_ns(self, ns): return self.__class__( ns, unit='ns', rounding=self.__rounding )
[docs] def __hash__(self): return hash(self.as_nanoseconds)
def __cmp(self, other, op): try: other = Timestamp(other) except OverflowError: # +inf/-inf return op(float(self), other) else: return op(self.as_nanoseconds, other.as_nanoseconds)
[docs] def __le__(self, other): return self.__cmp(other, operator.le)
[docs] def __ge__(self, other): return self.__cmp(other, operator.ge)
[docs] def __lt__(self, other): return self.__cmp(other, operator.lt)
[docs] def __gt__(self, other): return self.__cmp(other, operator.gt)
def __eq__(self, other): try: return self.__cmp(other, operator.eq) except TypeError: return NotImplemented except Exception: return False
[docs] def __ne__(self, other): return not (self == other)
[docs] def __add__(self, other): ns = Timestamp(other).as_nanoseconds return self._with_ns(self.as_nanoseconds + ns)
[docs] def __sub__(self, other): ns = Timestamp(other).as_nanoseconds return self._with_ns(self.as_nanoseconds - ns)
[docs] def __mul__(self, other): return self._with_ns(self.as_nanoseconds * other)
[docs] def __mod__(self, other): return self._with_ns(self.as_nanoseconds % other)
[docs] def __truediv__(self, other): return self._with_ns(self.as_nanoseconds / other)
[docs] def __floordiv__(self, other): return self._with_ns(self.as_nanoseconds // other)
[docs] def __abs__(self): return self._with_ns(abs(self.as_nanoseconds))
[docs] def __neg__(self): return self._with_ns(abs(-self.as_nanoseconds))
[docs] def __pos__(self): return self
def __invert__(self): return -self
[docs] def to_polars_expr(self): return pl.duration( # https://github.com/pola-rs/polars/issues/11625 nanoseconds=self.as_nanoseconds, # https://github.com/pola-rs/polars/issues/14751 time_unit='ns' )
def _dispatch(polars_f, pandas_f, data, *args, **kwargs): if isinstance(data, (pl.LazyFrame, pl.DataFrame, pl.Series)): return polars_f(data, *args, **kwargs) elif isinstance(data, (pd.DataFrame, pd.Series)): return pandas_f(data, *args, **kwargs) else: raise TypeError(f'Cannot find implementation for {data.__class__}') def _polars_duration_expr(duration, unit='s', rounding='down'): if duration is None: return duration elif isinstance(duration, pl.Expr): return duration else: duration = Timestamp( duration, unit=unit, rounding=rounding ) return duration.to_polars_expr() _MEM_LAZYFRAMES_LOCK = threading.Lock() _MEM_LAZYFRAMES = weakref.WeakValueDictionary() def _polars_declare_in_memory(df): with _MEM_LAZYFRAMES_LOCK: _MEM_LAZYFRAMES[id(df)] = df def _polars_df_in_memory(df): try: with _MEM_LAZYFRAMES_LOCK: _df = _MEM_LAZYFRAMES[id(df)] except KeyError: return False else: return _df is df def _polars_index_col(df): if 'Time' in df.columns: return 'Time' else: return df.columns[0] def _df_to_polars(df): in_memory = _polars_df_in_memory(df) if isinstance(df, pl.LazyFrame): index = _polars_index_col(df) dtype = df.schema[index] # This skips a useless cast, saving some time on the common path if index == 'Time': if dtype != pl.Duration('ns'): _index = pl.col(index) if dtype.is_float(): # Convert to nanoseconds df = df.with_columns(_index * 1_000_000_000) elif dtype.is_integer() or dtype.is_temporal(): pass else: raise TypeError(f'Index dtype not handled: {dtype}') df = df.with_columns( _index.cast(pl.Duration('ns')) ) # Make the index column the first one df = df.select(order_as(list(df.columns), [index])) # TODO: once this is solved, we can just inspect the plan and see if the # data is backed by a "DataFrameScan" instead of a "Scan" of a file: # https://github.com/pola-rs/polars/issues/9771 elif isinstance(df, pl.DataFrame): in_memory = True df = df.lazy() df = _df_to_polars(df) elif isinstance(df, pd.DataFrame): df = pl.from_pandas(df, include_index=True) df = _df_to_polars(df) else: raise ValueError(f'{df.__class__} not supported') if in_memory: _polars_declare_in_memory(df) return df def _df_to_pandas(df): if isinstance(df, pd.DataFrame): return df else: assert isinstance(df, pl.LazyFrame) index = _polars_index_col(df) if index == 'Time' and df.schema[index].is_temporal(): df = df.with_columns( pl.col(index).dt.total_nanoseconds() * 1e-9 ) df = df.collect() # Make sure we get nullable dtypes: # https://arrow.apache.org/docs/python/pandas.html dtype_mapping = { pyarrow.int8(): pd.Int8Dtype(), pyarrow.int16(): pd.Int16Dtype(), pyarrow.int32(): pd.Int32Dtype(), pyarrow.int64(): pd.Int64Dtype(), pyarrow.uint8(): pd.UInt8Dtype(), pyarrow.uint16(): pd.UInt16Dtype(), pyarrow.uint32(): pd.UInt32Dtype(), pyarrow.uint64(): pd.UInt64Dtype(), pyarrow.bool_(): pd.BooleanDtype(), pyarrow.string(): pd.StringDtype(), } df = df.to_pandas(types_mapper=dtype_mapping.get) df.set_index(index, inplace=True) # Nullable dtypes are still not supported everywhere, so cast back to a # non-nullable dtype in cases where there is no null value: # https://github.com/holoviz/holoviews/issues/6142 dtypes = { col: dtype.type for col in df.columns if getattr( (dtype := df[col].dtype), 'na_value', None ) is pd.NA and not df[col].isna().any() } df = df.astype(dtypes, copy=False) # Round trip polars -> pandas -> polars can be destructive as polars will # store timestamps at nanosecond precision in an integer. This will wipe # any sub-nanosecond difference in values, possibly leading to duplicate # timestamps. df.index = series_update_duplicates(df.index.to_series()) return df def _df_to(df, fmt): if fmt == 'pandas': return _df_to_pandas(df) elif fmt == 'polars-lazyframe': # Note that this is not always a no-op even if the input is already a # LazyFrame, so it's important this does not get "optimized away". return _df_to_polars(df) else: raise ValueError(f'Unknown format {fmt}')
[docs] class DataAccessor: """ Proxy class that allows extending the :class:`pandas.DataFrame` API. **Example**:: # Define and register a dataframe accessor @DataFrameAccessor.register_accessor def df_foobar(df, baz): ... df = pandas.DataFrame() # Use the accessor with the "lisa" proxy df.lisa.foobar(baz=1) """ def __init__(self, data): self.data = data
[docs] @classmethod def register_accessor(cls, f): """ Decorator to register an accessor function. The accessor name will be the name of the function, without the ``series_`` or ``df_`` prefix. """ name = re.sub(r'^(?:df|series)_(.*)', r'\1', f.__name__) cls.FUNCTIONS[name] = f return f
def __getattr__(self, attr): try: f = self.FUNCTIONS[attr] except KeyError as e: raise AttributeError(f'Unknown method name: {attr}') from e meth = f.__get__(self.data, self.__class__) return meth
[docs] def __dir__(self): attrs = set(super().__dir__()) attrs |= self.FUNCTIONS.keys() return sorted(attrs)
[docs] @pandas.api.extensions.register_dataframe_accessor('lisa') class DataFrameAccessor(DataAccessor): FUNCTIONS = {}
[docs] @pandas.api.extensions.register_series_accessor('lisa') class SeriesAccessor(DataAccessor): FUNCTIONS = {}
[docs] @SeriesAccessor.register_accessor def series_refit_index(series, start=None, end=None, window=None, method='inclusive', clip_window=True): """ Slice a series using :func:`series_window` and ensure we have a value at exactly the specified boundaries, unless the signal started after the beginning of the required window. :param df: Series to act on :type df: pandas.Series :param start: First index value to find in the returned series. :type start: object :param end: Last index value to find in the returned series. :type end: object :param window: ``window=(start, end)`` is the same as ``start=start, end=end``. These parameters styles are mutually exclusive. :type window: tuple(float or None, float or None) or None :param method: Windowing method used to select the first and last values of the series using :func:`series_window`. Defaults to ``inclusive``, which is suitable for signals where all the value changes have a corresponding row without any fixed sample-rate constraints. If they have been downsampled, ``nearest`` might be a better choice.). :type method: str .. note:: If ``end`` is past the end of the data, the last row will be duplicated so that we can have a start and end index at the right location, without moving the point at which the transition to the last value happened. This also allows plotting series with only one item using matplotlib, which would otherwise be impossible. :param clip_window: Passed down to :func:`series_refit_index`. """ window = _make_window(start, end, window) return _pandas_refit_index(series, window, method=method)
[docs] @DataFrameAccessor.register_accessor def df_refit_index(df, start=None, end=None, window=None, method='inclusive'): """ Same as :func:`series_refit_index` but acting on :class:`pandas.DataFrame` """ window = _make_window(start, end, window) return _pandas_refit_index(df, window, method=method)
def _make_window(start, end, window): uses_separated = (start, end) != (None, None) if uses_separated: warnings.warn('start and end df_refit_index() parameters are deprecated, please use window=', DeprecationWarning, stacklevel=3) if window is not None and uses_separated: raise ValueError('window != None cannot be used along with start and end parameters') if window is None: return (start, end) else: return window
[docs] @DataFrameAccessor.register_accessor def df_split_signals(df, signal_cols, align_start=False, window=None): """ Yield subset of ``df`` that only contain one signal, along with the signal identification values. :param df: The dataframe to split. :type df: pandas.DataFrame :param signal_cols: Columns that uniquely identify a signal. :type signal_cols: list(str) :param window: Apply :func:`df_refit_index` on the yielded dataframes with the given window. :type window: tuple(float or None, float or None) or None :param align_start: If ``True``, same as ``window=(df.index[0], None)``. This makes sure all yielded signals start at the same index as the original dataframe. :type align_start: bool """ if not signal_cols: yield ({}, df) else: if align_start: if window is not None: raise ValueError('align_start=True cannot be used with window != None') window = (df.index[0], None) # Pandas chokes on common iterables like dict key views, so spoon feed # it a list signal_cols = list(signal_cols) # Avoid this warning: # FutureWarning: In a future version of pandas, a length 1 tuple will # be returned when iterating over a groupby with a grouper equal to a # list of length 1. Don't supply a list with a single grouper to avoid # this warning. if len(signal_cols) == 1: _signal_cols = signal_cols[0] else: _signal_cols = signal_cols for group, signal in df.groupby(_signal_cols, observed=True, sort=False, group_keys=False): # When only one column is looked at, the group is the value instead of # a tuple of values if isinstance(group, tuple) : cols_val = dict(zip(signal_cols, group)) else: cols_val = {signal_cols[0]: group} if window: signal = df_refit_index(signal, window=window, method='inclusive') yield (cols_val, signal)
def _pandas_refit_index(data, window, method): if data.empty: raise ValueError('Cannot refit the index of an empty dataframe or series') start, end = window if end is None: duplicate_last = False else: duplicate_last = end > data.index[-1] data = _pandas_window(data, window, method=method) if data.empty: return data # When the end is after the end of the data, duplicate the last row so we # can push it to the right as much as we want without changing the point at # which the transition to that value happened if duplicate_last: data = pd.concat([data, data.iloc[-1:]]) else: # Shallow copy is enough, we only want to replace the index and not the # actual data data = data.copy(deep=False) index = data.index.to_series() # Only advance the beginning of the data, never move it in the past. # Otherwise, we "invent" a value for the signal that did not existed, # leading to various wrong results. if start is not None and index.iloc[0] < start: index.iloc[0] = start if end is not None: index.iloc[-1] = end data.index = index return data
[docs] @DataFrameAccessor.register_accessor def df_squash(df, start, end, column='delta'): """ Slice a dataframe of deltas in [start:end] and ensure we have an event at exactly those boundaries. The input dataframe is expected to have a "column" which reports the time delta between consecutive rows, as for example dataframes generated by :func:`df_add_delta`. The returned dataframe is granted to have an initial and final event at the specified "start" ("end") index values, which values are the same of the last event before (first event after) the specified "start" ("end") time. Examples: Slice a dataframe to [start:end], and work on the time data so that it makes sense within the interval. Examples to make it clearer:: df is: Time len state 15 1 1 16 1 0 17 1 1 18 1 0 ------------- df_squash(df, 16.5, 17.5) => Time len state 16.5 .5 0 17 .5 1 df_squash(df, 16.2, 16.8) => Time len state 16.2 .6 0 :returns: a new df that fits the above description """ if df.empty: return df end = min(end, df.index[-1] + df[column].iloc[-1]) res_df = pd.DataFrame(data=[], columns=df.columns) if start > end: return res_df # There's a few things to keep in mind here, and it gets confusing # even for the people who wrote the code. Let's write it down. # # It's assumed that the data is continuous, i.e. for any row 'r' within # the trace interval, we will find a new row at (r.index + r.len) # For us this means we'll never end up with an empty dataframe # (if we started with a non empty one) # # What's we're manipulating looks like this: # (| = events; [ & ] = start,end slice) # # | [ | ] | # e0 s0 e1 s1 e2 # # We need to push e0 within the interval, and then tweak its duration # (len column). The mathemagical incantation for that is: # e0.len = min(e1.index - s0, s1 - s0) # # This takes care of the case where s1 isn't in the interval # If s1 is in the interval, we just need to cap its len to # s1 - e1.index prev_df = df.loc[:start] middle_df = df.loc[start:end] # Tweak the closest previous event to include it in the slice if not prev_df.empty and start not in middle_df.index: res_df = pd.concat([res_df, prev_df.tail(1)]) res_df.index = [start] e1 = end if not middle_df.empty: e1 = middle_df.index[0] res_df[column] = min(e1 - start, end - start) if not middle_df.empty: res_df = pd.concat([res_df, middle_df]) if end in res_df.index: # e_last and s1 collide, ditch e_last res_df = res_df.drop([end]) else: # Fix the delta for the last row delta = min(end - res_df.index[-1], res_df[column].iloc[-1]) res_df.at[res_df.index[-1], column] = delta return res_df
[docs] @DataFrameAccessor.register_accessor def df_filter(df, filter_columns, exclude=False): """ Filter the content of a dataframe. :param df: DataFrame to filter :type df: pandas.DataFrame :param filter_columns: Dict of `{"column": value)` that rows has to match to be selected. :type filter_columns: dict(str, object) :param exclude: If ``True``, the matching rows will be excluded rather than selected. :type exclude: bool """ if filter_columns: key = functools.reduce( operator.and_, ( df[col] == val for col, val in filter_columns.items() ) ) return df[~key if exclude else key] else: if exclude: return df else: return df_make_empty_clone(df)
[docs] def df_merge(df_list, drop_columns=None, drop_inplace=False, filter_columns=None): """ Merge a list of :class:`pandas.DataFrame`, keeping the index sorted. :param drop_columns: List of columns to drop prior to merging. This avoids ending up with extra renamed columns if some dataframes have column names in common. :type drop_columns: list(str) :param drop_inplace: Drop columns in the original dataframes instead of creating copies. :type drop_inplace: bool :param filter_columns: Dict of `{"column": value)` used to filter each dataframe prior to dropping columns. The columns are then dropped as they have a constant value. :type filter_columns: dict(str, object) """ df_list = list(df_list) drop_columns = drop_columns if drop_columns else [] if filter_columns: df_list = [ df_filter(df, filter_columns) for df in df_list ] # remove the column to avoid duplicated useless columns drop_columns.extend(filter_columns.keys()) # Since we just created dataframe slices, drop_inplace would give a # warning from pandas drop_inplace = False if drop_columns: def drop(df): filtered_df = df.drop(columns=drop_columns, inplace=drop_inplace) # when inplace=True, df.drop() returns None return df if drop_inplace else filtered_df df_list = [ drop(df) for df in df_list ] if any( not (df1.columns.intersection(df2.columns)).empty for (df1, df2) in itertools.combinations(df_list, 2) ): df = pd.concat(df_list) df.sort_index(inplace=True) return df else: df1, *other_dfs = df_list return df1.join(other_dfs, how='outer')
[docs] @DataFrameAccessor.register_accessor def df_delta(pre_df, post_df, group_on=None): """ pre_df and post_df containing paired/consecutive events indexed by time, df_delta() merges the two dataframes and adds a ``delta`` column containing the time spent between the two events. A typical usecase would be adding pre/post events at the entry/exit of a function. Rows from ``pre_df`` and ``post_df`` are grouped by the ``group_on`` columns. E.g.: ``['pid', 'comm']`` to group by task. Except columns listed in ``group_on``, ``pre_df`` and ``post_df`` must have columns with different names. Events that cannot be paired are ignored. :param pre_df: Dataframe containing the events that start a record. :type pre_df: pandas.DataFrame :param post_df: Dataframe containing the events that end a record. :type post_df: pandas.DataFrame :param group_on: Columns used to group ``pre_df`` and ``post_df``. E.g.: This would be ``['pid', 'comm']`` to group by task. :type group_on: list(str) :returns: a :class:`pandas.DataFrame` indexed by the ``pre_df`` dataframe with: * All the columns from the ``pre_df`` dataframe. * All the columns from the ``post_df`` dataframe. * A ``delta`` column (duration between the emission of a 'pre' event and its consecutive 'post' event). """ pre_df = pre_df.copy(deep=False) post_df = post_df.copy(deep=False) # Tag the rows to remember from which df they are coming from. pre_df["is_pre"] = True post_df["is_pre"] = False # Merge on columns common to the two dfs to avoid overlapping of names. on_col = sorted(pre_df.columns.intersection(post_df.columns)) # Merging on nullable types converts columns to object. # Merging on non-nullable types converts integer/boolean to float. # Thus, let the on_col non-nullable and converts the others to nullable. pre_df_cols = sorted(set(pre_df) - set(on_col)) post_df_cols = sorted(set(post_df) - set(on_col)) pre_df[pre_df_cols] = df_convert_to_nullable(pre_df[pre_df_cols]) post_df[post_df_cols] = df_convert_to_nullable(post_df[post_df_cols]) # Merge. Don't allow column renaming. df = pd.merge(pre_df, post_df, left_index=True, right_index=True, on=on_col, how='outer', suffixes=(False, False)) # Save and replace the index name by a tmp name to avoid a clash # with column names. index_name = df.index.name index_tmp_name = uuid.uuid4().hex df.index.name = index_tmp_name df.reset_index(inplace=True) # In each group, search for a faulty sequence (where pre/post events are # not interleaving, e.g. pre1->pre2->post1->post2). if group_on: grouped = df.groupby(group_on, observed=True, sort=False, group_keys=False) else: grouped = df if grouped['is_pre'].transform(lambda x: x == x.shift()).any(): raise ValueError('Unexpected sequence of pre and post event (more than one "pre" or "post" in a row)') # Create the 'delta' column and add the columns from post_df # in the rows coming from pre_df. new_columns = dict( delta=grouped[index_tmp_name].transform(lambda time: time.diff().shift(-1)), ) new_columns.update({col: grouped[col].shift(-1) for col in post_df_cols}) df = df.assign(**new_columns) df.set_index(index_tmp_name, inplace=True) df.index.name = index_name # Only keep the rows from the pre_df, they have all the necessary info. df = df.loc[df["is_pre"]] # Drop the rows from pre_df with not matching row from post_df. df.dropna(inplace=True) df.drop(columns=["is_pre"], inplace=True) return df
def _resolve_x(y, x): """ Resolve the `x` series to use for derivative and integral operations """ if x is None: x = pd.Series(y.index) x.index = y.index return x
[docs] @SeriesAccessor.register_accessor def series_derivate(y, x=None, order=1): """ Compute a derivative of a :class:`pandas.Series` with respect to another series. :return: A series of `dy/dx`, where `x` is either the index of `y` or another series. :param y: Series with the data to derivate. :type y: pandas.DataFrame :param x: Series with the `x` data. If ``None``, the index of `y` will be used. Note that `y` and `x` are expected to have the same index. :type x: pandas.DataFrame or None :param order: Order of the derivative (1 is speed, 2 is acceleration etc). :type order: int """ x = _resolve_x(y, x) for _ in range(order): y = y.diff() / x.diff() return y
[docs] @SeriesAccessor.register_accessor def series_integrate(y, x=None, sign=None, method='rect', rect_step='post'): """ Compute the integral of `y` with respect to `x`. :return: A scalar :math:`\\int_{x=A}^{x=B} y \\, dx`, where `x` is either the index of `y` or another series. :param y: Series with the data to integrate. :type y: pandas.DataFrame :param x: Series with the `x` data. If ``None``, the index of `y` will be used. Note that `y` and `x` are expected to have the same index. :type x: pandas.DataFrame or None :param sign: Clip the data for the area in positive or negative regions. Can be any of: - ``+``: ignore negative data - ``-``: ignore positive data - ``None``: use all data :type sign: str or None :param method: The method for area calculation. This can be any of the integration methods supported in :mod:`numpy` or `rect` :type param: str :param rect_step: The step behaviour for `rect` method :type rect_step: str *Rectangular Method* - Step: Post Consider the following time series data:: 2 *----*----*----+ | | 1 | *----*----+ | 0 *----*----+ 0 1 2 3 4 5 6 7 import pandas as pd a = [0, 0, 2, 2, 2, 1, 1] s = pd.Series(a) The area under the curve is: .. math:: \\sum_{k=0}^{N-1} (x_{k+1} - {x_k}) \\times f(x_k) \\\\ (2 \\times 3) + (1 \\times 2) = 8 - Step: Pre :: 2 +----*----*----* | | 1 | +----*----*----+ | 0 *----* 0 1 2 3 4 5 6 7 import pandas as pd a = [0, 0, 2, 2, 2, 1, 1] s = pd.Series(a) The area under the curve is: .. math:: \\sum_{k=1}^{N} (x_k - x_{k-1}) \\times f(x_k) \\\\ (2 \\times 3) + (1 \\times 3) = 9 """ x = _resolve_x(y, x) if sign == "+": y = y.clip(lower=0) elif sign == "-": y = y.clip(upper=0) elif sign is None: pass else: raise ValueError(f'Unsupported "sign": {sign}') if method == "rect": if len(x) <= 1: raise ValueError('Cannot integrate with less than 2 points') else: dx = x.diff() if rect_step == "post": dx = dx.shift(-1) return (y * dx).sum() # Make a DataFrame to make sure all rows stay aligned when we drop NaN, # which is needed by all the below methods df = pd.DataFrame({'x': x, 'y': y}).dropna() x = df['x'] y = df['y'] if method == 'trapz': return np.trapz(y, x) elif method == 'simps': return scipy.integrate.simps(y, x) else: raise ValueError(f'Unsupported integration method: {method}')
[docs] @SeriesAccessor.register_accessor def series_mean(y, x=None, **kwargs): r""" Compute the average of `y` by integrating with respect to `x` and dividing by the range of `x`. :return: A scalar :math:`\int_{x=A}^{x=B} \frac{y}{| B - A |} \, dx`, where `x` is either the index of `y` or another series. :param y: Series with the data to integrate. :type y: pandas.DataFrame :param x: Series with the `x` data. If ``None``, the index of `y` will be used. Note that `y` and `x` are expected to have the same index. :type x: pandas.DataFrame or None :Variable keyword arguments: Forwarded to :func:`series_integrate`. """ x = _resolve_x(y, x) integral = series_integrate(y, x, **kwargs) if len(y) > 1: mean = integral / (x.max() - x.min()) # If there is only one data item, the mean is equal to it. else: mean = integral return mean
[docs] @SeriesAccessor.register_accessor def series_window(series, window, method='pre', clip_window=True): """ Select a portion of a :class:`pandas.Series` :param series: series to slice :type series: :class:`pandas.Series` :param window: two-tuple of index values for the start and end of the region to select. :type window: tuple(object) :param clip_window: Only ``True`` value is now allwed: clip the requested window to the bounds of the index, otherwise raise exceptions if the window is too large. :type clip_window: bool :param method: Choose how edges are handled: * `inclusive`: When no exact match is found, include both the previous and next values around the window. * `exclusive`: When no exact match is found, only index values within the range are selected. This is the default pandas float slicing behavior. * `nearest`: Not supported with :mod:`polars` objects: when no exact match is found, take the nearest index value. * `pre`: When no exact match is found, take the previous index value. * `post`: When no exact match is found, take the next index value. .. note:: The index of `series` must be monotonic and without duplicates. """ if not clip_window: raise ValueError(f'Only clip_window=True is supported') return _pandas_window(series, window, method)
def _polars_window(data, window, method): # TODO: relax that assert isinstance(data, pl.LazyFrame) # TODO: maybe expose that as a param col = _polars_index_col(data) col = pl.col(col) start, stop = window def pre(): if start is None: filter_ = col <= stop else: if stop is None: filter_ = col > start else: filter_ = col.is_between( lower_bound=start, upper_bound=stop, closed='right', ) filter_ = filter_ | filter_.shift(-1) return filter_ def post(): if stop is None: filter_ = col >= start else: if start is None: filter_ = col < stop else: filter_ = col.is_between( lower_bound=start, upper_bound=stop, closed='left', ) filter_ = filter_ | filter_.shift(+1) return filter_ if start is None and stop is None: filter_ = True else: start = _polars_duration_expr(start, rounding='down') stop = _polars_duration_expr(stop, rounding='up') if method == 'exclusive': if start is None: filter_ = col <= stop elif stop is None: filter_ = col >= start else: filter_ = col.is_between( lower_bound=start, upper_bound=stop, closed='both', ) elif method == 'inclusive': filter_ = pre() | post() elif method == 'pre': filter_ = pre() elif method == 'post': filter_ = post() else: raise ValueError(f'Slicing method not supported: {method}') return data.filter(filter_) def _pandas_window(data, window, method): """ ``data`` can either be a :class:`pandas.DataFrame` or :class:`pandas.Series` .. warning:: This function assumes ``data`` has a sorted index. """ index = data.index if data.empty: return data start, end = window first = index[0] last = index[-1] # Fill placeholders if start is None: start = first if end is None: end = last # Window is on the left if start <= first and end <= first: start = first end = first # Window is on the right elif start >= last and end >= last: start = last end = last # Overlapping window else: if start <= first: start = first if end >= last: end = last window = (start, end) if None not in window and window[0] > window[1]: raise KeyError(f'The window starts after its end: {window}') if method == 'inclusive': method = ('ffill', 'bfill') elif method == 'exclusive': # Default slicing behaviour of pandas' float index is to be exclusive, # so we can use that knowledge to enable a fast path. if data.index.dtype.kind == 'f': return data[slice(*window)] method = ('bfill', 'ffill') elif method == 'nearest': method = ('nearest', 'nearest') elif method == 'pre': method = ('ffill', 'ffill') elif method == 'post': method = ('bfill', 'bfill') else: raise ValueError(f'Slicing method not supported: {method}') sides = ('left', 'right') window = [ _get_loc(index, x, method=method, side=side) if x is not None else None for x, method, side in zip(window, method, sides) ] window = window[0], (window[1] + 1) return data.iloc[slice(*window)] def _get_loc(index, x, method, side): """ Emulate :func:`pandas.Index.get_loc` behavior with the much faster :func:`pandas.Index.searchsorted`. .. warning:: Passing a non-sorted index will destroy performance. """ # Not a lot of use for nearest, so fall back on the slow but easy to use get_loc() # # Also, if the index is not sorted, we need to fall back on the slow path # as well. Checking is_monotonic is cheap so it's ok to do it here. if method == 'nearest' or not index.is_monotonic_increasing: return index.get_indexer([x], method=method)[0] else: if index.empty: raise KeyError(x) # get_loc() also raises an exception in these case elif method == 'ffill' and x < index[0]: raise KeyError(x) elif method == 'bfill' and x > index[-1]: raise KeyError(x) loc = index.searchsorted(x, side=side) try: val_at_loc = index[loc] # We are getting an index past the end. This is fine since we already # checked correct bounds before except IndexError: return loc - 1 if val_at_loc == x: return loc elif val_at_loc < x: return loc if method == 'ffill' else loc + 1 else: return loc - 1 if method == 'ffill' else loc
[docs] @DataFrameAccessor.register_accessor def df_window(df, window, method='pre', clip_window=True): """ Same as :func:`series_window` but acting on a :class:`pandas.DataFrame` """ if not clip_window: raise ValueError(f'Only clip_window=True is supported') return _dispatch( _polars_window, _pandas_window, df, window, method )
[docs] @DataFrameAccessor.register_accessor def df_make_empty_clone(df): """ Make an empty clone of the given dataframe. :param df: The template dataframe. :type df: pandas.DataFrame More specifically, the following aspects are cloned: * Column names * Column dtypes """ return df.iloc[0:0].copy(deep=True)
[docs] @DataFrameAccessor.register_accessor def df_window_signals(df, window, signals, compress_init=False, clip_window=True): """ Similar to :func:`df_window` with ``method='pre'`` but guarantees that each signal will have a values at the beginning of the window. :param window: two-tuple of index values for the start and end of the region to select. :type window: tuple(object) :param signals: List of :class:`SignalDesc` describing the signals to fixup. :type signals: list(SignalDesc) :param compress_init: When ``False``, the timestamps of the init value of signals (right before the window) are preserved. If ``True``, they are changed into values as close as possible to the beginning of the window. :type compress_init: bool :param clip_window: See :func:`df_window` .. seealso:: :func:`df_split_signals` """ if not clip_window: raise ValueError(f'Only clip_window=True is supported') return _dispatch( _polars_window_signals, _pandas_window_signals, df, window, signals, compress_init )
def _polars_window_signals(df, window, signals, compress_init): index = _polars_index_col(df) assert df.schema[index].is_temporal() start, stop = window start = _polars_duration_expr(start, rounding='down') stop = _polars_duration_expr(stop, rounding='up') if start is not None: if stop is None: post_filter = pl.col(index) >= start pre_filter = pl.lit(True) else: post_filter = pl.col(index).is_between( lower_bound=start, upper_bound=stop, closed='both' ) pre_filter = (pl.col(index) < stop) pre_filter &= ~post_filter post_df = df.filter(post_filter) pre_df = df.filter(pre_filter) signals_init = [ pre_df.group_by(fields).last() for signal in set(signals) if (fields := signal.fields) ] if signals_init: pre_df = pl.concat( signals_init, how='diagonal', ) if compress_init: first_row = post_df.select(index).head(1).collect() try: first_time = first_row[0] except IndexError: pass else: pre_df.with_columns(Time=pl.lit(first_time)) # We could have multiple signals for the same event, so we want to # avoid duplicate events occurrences. pre_df = pre_df.unique() pre_df = pre_df.sort(index) return pl.concat( [ pre_df, post_df, ], how='diagonal', ) df = _polars_window( df, window=window, method='pre', ) return df def _pandas_window_signals(df, window, signals, compress_init=False): def before(x): return x - 1e-9 windowed_df = df_window(df, window, method='pre') # Split the extra rows that the method='pre' gave in a separate dataframe, # so we make sure we don't end up with duplication in init_df extra_window = ( windowed_df.index[0], window[0], ) if extra_window[0] >= extra_window[1]: extra_df = df_make_empty_clone(df) else: extra_df = df_window(windowed_df, extra_window, method='pre') # This time around, exclude anything before extra_window[1] since it will be provided by extra_df try: # Right boundary is exact, so failure can only happen if left boundary # is after the end of the dataframe, or if the window starts after its # end. _window = (extra_window[1], windowed_df.index[-1]) windowed_df = df_window(windowed_df, _window, method='post') # The windowed_df did not contain any row in the given window, all the # actual data are in extra_df except KeyError: windowed_df = df_make_empty_clone(df) else: # Make sure we don't include the left boundary if windowed_df.index[0] == _window[0]: windowed_df = windowed_df.iloc[1:] def window_signal(signal_df): # Get the row immediately preceding the window start loc = _get_loc(signal_df.index, window[0], method='ffill', side='left') return signal_df.iloc[loc:loc + 1] # Get the value of each signal at the beginning of the window signal_df_list = [ window_signal(signal_df) for signal, signal_df in itertools.chain.from_iterable( df_split_signals(df, signal.fields, align_start=False) for signal in signals ) # Only consider the signal that are in the window. Signals that started # after the window are irrelevant. if not signal_df.empty and signal_df.index[0] <= window[0] ] if compress_init: def make_init_df_index(init_df): # Yield a sequence of numbers incrementing by the smallest amount # possible def smallest_increment(start, length): curr = start for _ in range(length): prev = curr while int(prev * 1e9) == int(curr * 1e9): curr = before(curr) yield curr # If windowed_df is empty, we take the last bit right before the # beginning of the window try: start = windowed_df.index[0] except IndexError: start = extra_df.index[-1] index = list(smallest_increment(start, len(init_df))) index = pd.Index(reversed(index), dtype='float64') return index else: def make_init_df_index(init_df): return init_df.index # Get the last row before the beginning the window for each signal, in # timestamp order init_df = pd.concat([extra_df] + signal_df_list) init_df.sort_index(inplace=True) # Remove duplicated indices, meaning we selected the same row multiple # times because it's part of multiple signals init_df = init_df.loc[~init_df.index.duplicated(keep='first')] init_df.index = make_init_df_index(init_df) return pd.concat([init_df, windowed_df])
[docs] @SeriesAccessor.register_accessor def series_align_signal(ref, to_align, max_shift=None): """ Align a signal to an expected reference signal using their cross-correlation. :returns: `(ref, to_align)` tuple, with `to_align` shifted by an amount computed to align as well as possible with `ref`. Both `ref` and `to_align` are resampled to have a fixed sample rate. :param ref: reference signal. :type ref: pandas.Series :param to_align: signal to align :type to_align: pandas.Series :param max_shift: Maximum shift allowed to align signals, in index units. :type max_shift: object or None """ if ref.isnull().any() or to_align.isnull().any(): raise ValueError('NaN needs to be dropped prior to alignment') # Select the overlapping part of the signals start = max(ref.index.min(), to_align.index.min()) end = min(ref.index.max(), to_align.index.max()) # Resample so that we operate on a fixed sampled rate signal, which is # necessary in order to be able to do a meaningful interpretation of # correlation argmax def get_period(series): return pd.Series(series.index).diff().min() period = min(get_period(ref), get_period(to_align)) num = math.ceil((end - start) / period) new_index = pd.Index(np.linspace(start, end, num), dtype='float64') to_align = to_align.reindex(new_index, method='ffill') ref = ref.reindex(new_index, method='ffill') # Compute the correlation between the two signals correlation = scipy.signal.signaltools.correlate(to_align, ref) # The most likely shift is the index at which the correlation is # maximum. correlation.argmax() can vary from 0 to 2*len(to_align), so we # re-center it. shift = correlation.argmax() - len(to_align) # Cap the shift value if max_shift is not None: assert max_shift >= 0 # Turn max_shift into a number of samples in the resampled signal max_shift = int(max_shift / period) # Adjust the sign of max_shift to match shift max_shift *= -1 if shift < 0 else 1 if abs(shift) > abs(max_shift): shift = max_shift # Compensate the shift return ref, to_align.shift(-shift)
[docs] @DataFrameAccessor.register_accessor def df_filter_task_ids(df, task_ids, pid_col='pid', comm_col='comm', invert=False, comm_max_len=TASK_COMM_MAX_LEN): """ Filter a dataframe using a list of :class:`lisa.analysis.tasks.TaskID` :param task_ids: List of task IDs to filter :type task_ids: list(lisa.analysis.tasks.TaskID) :param df: Dataframe to act on. :type df: pandas.DataFrame :param pid_col: Column name in the dataframe with PIDs. :type pid_col: str or None :param comm_col: Column name in the dataframe with comm. :type comm_col: str or None :param comm_max_len: Maximum expected length of the strings in ``comm_col``. The ``task_ids`` `comm` field will be truncated at that length before being matched. :param invert: Invert selection :type invert: bool """ return _dispatch( _polars_filter_task_ids, _pandas_filter_task_ids, df, task_ids=task_ids, pid_col=pid_col, comm_col=comm_col, invert=invert, comm_max_len=comm_max_len, )
def _pandas_filter_task_ids(df, task_ids, pid_col, comm_col, invert, comm_max_len): def make_filter(task_id): if pid_col and task_id.pid is not None: pid = (df[pid_col] == task_id.pid) else: pid = True if comm_col and task_id.comm is not None: comm = (df[comm_col] == task_id.comm[:comm_max_len]) else: comm = True return pid & comm tasks_filters = list(map(make_filter, task_ids)) if tasks_filters: # Combine all the task filters with OR tasks_filter = functools.reduce(operator.or_, tasks_filters) if invert: tasks_filter = ~tasks_filter return df[tasks_filter] else: return df if invert else df.iloc[0:0] def _polars_filter_task_ids(df, task_ids, pid_col, comm_col, invert, comm_max_len): def make_filter(task_id): if pid_col and task_id.pid is not None: pid = (pl.col(pid_col) == pl.lit(task_id.pid)) else: pid = pl.lit(True) if comm_col and task_id.comm is not None: comm = (pl.col(comm_col) == pl.lit(task_id.comm[:comm_max_len])) else: comm = pl.lit(True) return pid & comm tasks_filters = list(map(make_filter, task_ids)) # Combine all the task filters with OR tasks_filter = functools.reduce(operator.or_, tasks_filters, pl.lit(False)) if invert: tasks_filter = ~tasks_filter return df.filter(tasks_filter)
[docs] @SeriesAccessor.register_accessor def series_local_extremum(series, kind): """ Returns a series of local extremum. :param series: Series to look at. :type series: pandas.Series :param kind: Kind of extremum: ``min`` or ``max``. :type kind: str """ if kind == 'min': comparator = np.less_equal elif kind == 'max': comparator = np.greater_equal else: raise ValueError(f'Unsupported kind: {kind}') ilocs = scipy.signal.argrelextrema(series.to_numpy(), comparator=comparator) return series.iloc[ilocs]
[docs] @SeriesAccessor.register_accessor def series_envelope_mean(series): """ Compute the average between the mean of local maximums and local minimums of the series. Assuming that the values are ranging inside a tunnel, this will give the average center of that tunnel. """ first_val = series.iat[0] # Remove constant values, otherwise they would be accounted in both max and # min, which can bias the result series = series_deduplicate(series, keep='first', consecutives=True) # If the series was constant, just return that constant if series.empty: return first_val else: maxs = series_local_extremum(series, kind='max') mins = series_local_extremum(series, kind='min') maxs_mean = series_mean(maxs) mins_mean = series_mean(mins) return (maxs_mean - mins_mean) / 2 + mins_mean
# Keep an alias in place for compatibility
[docs] @deprecate(replaced_by=series_envelope_mean, deprecated_in='2.0', removed_in='4.0') def series_tunnel_mean(*args, **kwargs): return series_envelope_mean(*args, **kwargs)
[docs] @SeriesAccessor.register_accessor def series_rolling_apply(series, func, window, window_float_index=True, center=False): """ Apply a function on a rolling window of a series. :returns: The series of results of the function. :param series: Series to act on. :type series: pandas.Series :param func: Function to apply on each window. It must take a :class:`pandas.Series` as only parameter and return one value. :type func: collections.abc.Callable :param window: Rolling window width in seconds. :type window: float :param center: Label values generated by ``func`` with the center of the window, rather than the highest index in it. :type center: bool :param window_float_index: If ``True``, the series passed to ``func`` will be of type :class:`pandas.Index` (float64), in nanoseconds. Disabling is recommended if the index is not used by ``func`` since it will remove the need for a conversion. :type window_float_index: bool """ orig_index = series.index # Wrap the func to turn the index into nanosecond Float64Index if window_float_index: def func(s, func=func): # pylint: disable=function-redefined s.index = s.index.astype('int64') * 1e-9 return func(s) # Use a timedelta index so that rolling gives time-based results index = pd.to_timedelta(orig_index, unit='s') series = pd.Series(series.array, index=index) window_ns = int(window * 1e9) rolling_window = f'{window_ns}ns' values = series.rolling(rolling_window).apply(func, raw=False).values if center: new_index = orig_index - (window / 2) else: new_index = orig_index return pd.Series(values, index=new_index)
def _pandas_find_unique_bool_vector(data, cols, all_col, keep): if keep == 'first': shift = 1 elif keep == 'last': shift = -1 elif keep is None: shift = 1 else: raise ValueError(f'Unknown keep value: {keep}') dedup_data = data[cols] if cols else data # Unique values will be True, duplicate False cond = dedup_data != dedup_data.shift(shift) cond = cond.fillna(True) if isinstance(data, pd.DataFrame): # (not (duplicate and duplicate)) # (not ((not unique) and (not unique))) # (not (not (unique or unique))) # (unique or unique) if all_col: cond = cond.any(axis=1) # (not (duplicate or duplicate)) # (not (duplicate or duplicate)) # (not ((not unique) or (not unique))) # (not (not (unique and unique))) # (unique and unique) else: cond = cond.all(axis=1) # Also mark as duplicate the first row in a run if keep is None: cond &= cond.shift(-1).fillna(True) return cond def _pandas_deduplicate(data, keep, consecutives, cols, all_col): if consecutives: return data.loc[_pandas_find_unique_bool_vector(data, cols, all_col, keep)] else: if not all_col: raise ValueError("all_col=False is not supported with consecutives=False") kwargs = dict(subset=cols) if cols else {} return data.drop_duplicates(keep=keep, **kwargs)
[docs] @SeriesAccessor.register_accessor def series_deduplicate(series, keep, consecutives): """ Remove duplicate values in a :class:`pandas.Series`. :param keep: Keep the first occurrences if ``first``, or the last if ``last``. :type keep: str :param consecutives: If ``True``, will only remove consecutive duplicates, for example:: s = pd.Series([1,2,2,3,4,2], index=[1,2,20,30,40,50]) s2 = series_deduplicate(s, keep='first', consecutives=True) assert (s2 == [1,2,3,4,2]).all() s3 = series_deduplicate(s, keep='first', consecutives=False) assert (s3 == [1,2,3,4]).all() :type consecutives: bool """ return _pandas_deduplicate(series, keep=keep, consecutives=consecutives, cols=None, all_col=True)
[docs] @DataFrameAccessor.register_accessor def df_deduplicate(df, keep, consecutives, cols=None, all_col=True): """ Same as :func:`series_deduplicate` but for :class:`pandas.DataFrame`. :param cols: Only consider these columns when looking for duplicates. By default, all columns are considered :type cols: list(str) or None :param all_col: If ``True``, remove a row when all the columns have duplicated value. Otherwise, remove the row if any column is duplicated. :type all_col: bool """ return _pandas_deduplicate(df, keep=keep, consecutives=consecutives, cols=cols, all_col=all_col)
[docs] @DataFrameAccessor.register_accessor def series_update_duplicates(series, func=None): """ Update a given series to avoid duplicated values. :param series: Series to act on. :type series: pandas.Series :param func: The function used to update the column. It must take a :class:`pandas.Series` of duplicated entries to update as parameters, and return a new :class:`pandas.Series`. The function will be called as long as there are remaining duplicates. If ``None``, the column is assumed to be floating point number of seconds and will be updated so that the no duplicated timestamps exist once translated to an integer number of nanoseconds. :type func: collections.abc.Callable or None """ if func: def preprocess(series): return series else: def func(series): return series + 1e-9 def preprocess(series): return (series * 1e9).astype('int64') def get_duplicated(series): # Keep the first, so we update the second duplicates locs = preprocess(series).duplicated(keep='first') return locs, series.loc[locs] # Update the values until there is no more duplication duplicated_locs, duplicated = get_duplicated(series) while not duplicated.empty: updated = func(duplicated) # Change the values at the points of duplication. Otherwise, take the # initial value series.loc[duplicated_locs] = updated duplicated_locs, duplicated = get_duplicated(series) return series
[docs] @DataFrameAccessor.register_accessor def df_update_duplicates(df, col=None, func=None, inplace=False): """ Same as :func:`series_update_duplicates` but on a :class:`pandas.DataFrame`. :param df: Dataframe to act on. :type df: pandas.DataFrame :param col: Column to update. If ``None``, the index is used. :type col: str or None :param func: See :func:`series_update_duplicates`. :type func: collections.abc.Callable or None :param inplace: If ``True``, the passed dataframe will be modified. :type inplace: bool """ use_index = col is None series = df.index.to_series() if use_index else df[col].copy() series = series_update_duplicates(series, func=func) df = df if inplace else df.copy() if use_index: df.index = series else: df[col] = series return df
[docs] @DataFrameAccessor.register_accessor def df_combine_duplicates(df, func, output_col, cols=None, all_col=True, prune=True, inplace=False): """ Combine the duplicated rows using ``func`` and remove the duplicates. :param df: The dataframe to act on. :type df: pandas.DataFrame :param func: Function to combine a group of duplicates. It will be passed a :class:`pandas.DataFrame` corresponding to the group and must return either a :class:`pandas.Series` with the same index as its input dataframe, or a scalar depending on the value of ``prune``. :type func: collections.abc.Callable :param prune: If ``True``, ``func`` will be expected to return a single scalar that will be used instead of a whole duplicated group. Only the first row of the group is kept, the other ones are removed. If ``False``, ``func`` is expected to return a :class:`pandas.Series` that will be used as replacement for the group. No rows will be removed. :type prune: bool :param output_col: Column in which the output of ``func`` should be stored. :type output_col: str :param cols: Columns to use for duplicates detection :type cols: list(str) or None :param all_cols: If ``True``, all columns will be used. :type all_cols: bool :param inplace: If ``True``, the passed dataframe is modified. :type inplace: bool """ init_df = df if inplace else df.copy() # We are going to add columns so make a copy df = df.copy(deep=False) # Find all rows where the active status is the same as the previous one duplicates_to_remove = ~_pandas_find_unique_bool_vector(df, cols, all_col, keep='first') # Then get only the first row in a run of duplicates first_duplicates = (~duplicates_to_remove) & duplicates_to_remove.shift(-1, fill_value=False) # We only kept them separate with keep='first' to be able to detect # correctly the beginning of a duplicate run to get a group ID, so now we # merge them duplicates = duplicates_to_remove | first_duplicates # Assign the group ID to each member of the group df.loc[first_duplicates, 'duplicate_group'] = first_duplicates.loc[first_duplicates].index df.loc[duplicates, 'duplicate_group'] = df.loc[duplicates, 'duplicate_group'].ffill() # For some reasons GroupBy.apply() will raise a KeyError if the index is a # Float64Index, go figure ... index = df.index df.reset_index(drop=True, inplace=True) # Apply the function to each group, and assign the result to the output # Note that we cannot use GroupBy.transform() as it currently cannot handle # NaN groups. output = df.groupby('duplicate_group', sort=False, as_index=True, group_keys=False, observed=True)[df.columns].apply(func) if not output.empty: init_df[output_col].update(output) # Ensure the column is created if it does not exists yet try: init_df[output_col] except KeyError: init_df[output_col] = np.NaN else: # Restore the index that we had to remove for apply() df.index = index try: fill = df[output_col] except KeyError: pass else: init_df[output_col] = df[output_col].fillna(fill) if prune: # Only keep the first row of each duplicate run if inplace: removed_indices = duplicates_to_remove[duplicates_to_remove].index init_df.drop(removed_indices, inplace=True) return None else: return init_df.loc[~duplicates_to_remove] else: if inplace: return None else: return init_df
[docs] @DataFrameAccessor.register_accessor def df_add_delta(df, col='delta', src_col=None, window=None, inplace=False): """ Add a column containing the delta of the given other column. :param df: The dataframe to act on. :type df: pandas.DataFrame :param col: The name of the column to add. :type col: str :param src_col: Name of the column to compute the delta of. If ``None``, the index is used. :type src_col: str or None :param window: Optionally, a window. It will be used to compute the correct delta of the last row. If ``inplace=False``, the dataframe will be pre-filtered using :func:`df_refit_index`. This implies that the last row will have a NaN delta, but will be suitable e.g. for plotting, and aggregation functions that ignore delta such as :meth:`pandas.DataFrame.sum`. :type window: tuple(float or None, float or None) or None :param inplace: If ``True``, ``df`` is modified inplace to add the column :type inplace: bool """ use_refit_index = window and not inplace if use_refit_index: df = df_refit_index(df, window=window) src = df[src_col] if src_col else df.index.to_series() delta = src.diff().shift(-1) # When use_refit_index=True, the last delta will already be sensible if not use_refit_index and window: _, end = window if end is not None: new_end = end - src.iloc[-1] new_end = new_end if new_end > 0 else 0 delta.iloc[-1] = new_end if not inplace: df = df.copy() df[col] = delta return df
[docs] def series_combine(series_list, func, fill_value=None): """ Same as :meth:`pandas.Series.combine` on a list of series rather than just two. """ return _pandas_combine(series_list, func, fill_value)
[docs] def df_combine(series_list, func, fill_value=None): """ Same as :meth:`pandas.DataFrame.combine` on a list of series rather than just two. """ return _pandas_combine(series_list, func, fill_value)
def _pandas_combine(datas, func, fill_value=None): state = datas[0] for data in datas[1:]: state = state.combine(data, func=func, fill_value=fill_value) return state
[docs] def series_dereference(series, sources, inplace=False, method='ffill'): """ Replace each value in ``series`` by the value at the corresponding index by the source indicated by ``series``'s value. :param series: Series of "pointer" values. :type series: pandas.Series :param sources: Dictionary with keys corresponding to ``series`` values. For each value of ``series``, a source will be chosen and its value at the current index will be used. If a :class:`pandas.DataFrame` is passed, the column names will be used as keys and the column series as values. .. note:: Unless ``series`` and the ``sources`` share the same index, the ``sources`` will be reindexed with ``ffill`` method. :type sources: collections.abc.Mapping or pandas.DataFrame :param inplace: If ``True``, modify the series inplace. :type inplace: bool :param method: ``sources`` is reindexed so that it shares the same index as ``series``. ``method`` is forwarded to :meth:`pandas.Series.reindex`. :type method: str """ def reindex(values): # Skip the reindex if they are in the same dataframe if values.index is not series.index: values = values.reindex(series.index, method=method) return values if isinstance(sources, pd.DataFrame): sources = reindex(sources) sources = { col: sources[col] for col in sources.columns } else: sources = { col: reindex(val) for col, val in sources.items() } for key, values in sources.items(): _series = series.mask(series == key, values, inplace=inplace) series = series if inplace else _series return series
[docs] def df_dereference(df, col, pointer_col=None, sources=None, inplace=False, **kwargs): """ Similar to :func:`series_dereference`. **Example**:: df = pd.DataFrame({ 'ptr': ['A', 'B'], 'A' : ['A1', 'A2'], 'B' : ['B1', 'B2'], }) df = df_dereference(df, 'dereferenced', pointer_col='ptr') # ptr A B dereferenced # 0 A A1 B1 A1 # 1 B A2 B2 B2 :param df: Dataframe to act on. :type df: pandas.DataFrame :param col: Name of the column to create. :type col: str :param pointer_col: Name of the column containing "pointer" values. Defaults to the same value as ``col``. :type pointer_col: str or None :param sources: Same meaning as in :func:`series_dereference`. If omitted, ``df`` is used. :type sources: collections.abc.Mapping or pandas.DataFrame :param inplace: If ``True``, the dataframe is modified inplace. :type inplace: bool :Variable keyword arguments: Forwarded to :func:`series_dereference`. """ pointer_col = pointer_col or col sources = df if sources is None else sources df = df if inplace else df.copy(deep=False) df[col] = series_dereference(df[pointer_col], sources, inplace=inplace, **kwargs) return df
[docs] class SignalDesc: """ Define a signal to be used by various signal-oriented APIs. :param event: Name of the event that this signal is represented by. :type event: str :param fields: Fields that identify multiple signals multiplexed into one event. For example, a `frequency` signal would have a ``cpu_frequency`` event and a ``cpu`` field since ``cpu_frequency`` multiplexes the signals for all CPUs. :type fields: list(str) """ def __init__(self, event, fields): self.event = event self.fields = sorted(fields) def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False
[docs] def __hash__(self): return hash(self.event) ^ hash(tuple(self.fields))
[docs] @classmethod @deprecate(msg='No new signals will be added to this list, use explicit signal description where appropriate in the Trace API', deprecated_in='3.0', removed_in='4.0') def from_event(cls, *args, **kwargs): return cls._from_event(*args, **kwargs)
# Keep a warning-free private method for backward compat pandas code that # will one day be removed. @classmethod def _from_event(cls, event, fields=None): """ Return list of :class:`SignalDesc` for the given event. The hand-coded list is used first, and then some generic heuristics are used to detect per-cpu and per-task signals. """ # For backward compatibility, so that we still get signal descriptors # for traces before the events from the lisa module got renamed to # lisa__<event> from lisa.trace import _NamespaceTraceView events = _NamespaceTraceView._do_expand_namespaces(event, namespaces=('lisa', None)) for event in events: try: return cls._SIGNALS_MAP[event] except KeyError: continue if not fields: return [cls(event, fields=[])] else: fields = set(fields) # At most one set of each group will be taken default_field_sets = [ [ {'comm', 'pid'}, {'pid'}, {'comm'}, ], [ {'cpu'}, {'cpu_id'}, ], ] selected = [] for field_set_group in default_field_sets: # Select at most one field set per group for field_set in field_set_group: # if fields is a non-strict superset of field_set if fields >= field_set: selected.append(field_set) break return [ cls(event, fields=field_set) for field_set in selected ]
[docs] @SeriesAccessor.register_accessor def series_convert(series, dtype, nullable=None): """ Convert a :class:`pandas.Series` with a best effort strategy. Nullable types may be used if necessary and possible, otherwise ``object`` dtype will be used. :param series: Series of another type than the target one. Strings are allowed. :type series: pandas.Series :param dtype: dtype to convert to. If it is a string (like ``"uint8"``), the following strategy will be used: 1. Convert to the given dtype 2. If it failed, try converting to an equivalent nullable dtype 3. If it failed, try to parse it with an equivalent Python object constructor, and then convert it to the dtype. 4. If an integer dtype was requested, parsing as hex string will be attempted too If it is a callable, it will be applied on the series, converting all values considered as nan by :func:`pandas.isna` into ``None`` values. The result will have ``object`` dtype. The callable has a chance to handle the conversion from nan itself. .. note:: In some cases, asking for an unsigned dtype might let through negative values, as there is no way to reliably distinguish between conversion failures reasons. :type dtype: str or collections.abc.Callable :param nullable: If: - ``True``, use the nullable dtype equivalent of the requested dtype. - ``None``, use the equivalent nullable dtype if there is any missing data, otherwise a non-nullable dtype will be used for lower memory consumption. :type nullable: bool or None """ nullable_dtypes = { 'int': 'Int64', 'int8': 'Int8', 'int16': 'Int16', 'int32': 'Int32', 'int64': 'Int64', 'uint': 'UInt64', 'uint8': 'UInt8', 'uint16': 'UInt16', 'uint32': 'UInt32', 'uint64': 'UInt64', 'bool': 'boolean', } if series.dtype.name == dtype and \ not (nullable and dtype in nullable_dtypes): # If there is a conversion to a nullable dtype, don't skip. return series def to_object(x): x = x.astype('object', copy=True) # If we had any pandas <NA> values, they need to be turned into None # first, otherwise pyarrow will choke on them x.loc[x.isna()] = None return x astype = lambda dtype: lambda x: x.astype(dtype, copy=False) make_convert = lambda dtype: lambda x: series_convert(x, dtype, nullable=nullable) basic = astype(dtype) class Tree(list): """ Tree of converters to guide what to do in case of failure """ def __init__(self, *items, name=None): items = [ item for item in items if item is not None ] super().__init__(items) self.name = name class Pipeline(Tree): """ Sequence of converters that succeed as a whole or fail as a whole """ def __call__(self, series): for x in self: series = x(series) return series class Alternative(Tree): """ Sequence of converters to try in order until one works """ def __call__(self, series): excep = ValueError('Empty alternative') for x in self: try: return x(series) except (TypeError, ValueError, OverflowError) as e: excep = e # Re-raise the last exception raised raise excep pipelines = Alternative(name='root') # If that is not a string with contextlib.suppress(AttributeError, TypeError): lower_dtype = dtype.lower() is_bool = ('bool' in lower_dtype) is_int = ('int' in lower_dtype) # types are callable too if callable(dtype): def convert(x): try: return dtype(x) except Exception: # pylint: disable=broad-except # Make sure None will be propagated as None. # note: We use an exception handler rather than checking first # in order to speed up the expected path where the conversion # won't fail. if pd.isna(x): return None else: raise # Use faster logic of pandas if possible, but not for bytes as it will # happily convert math.nan into b'nan' if dtype is not bytes: pipelines.append(basic) pipelines.append( # Otherwise fallback to calling the type directly lambda series: series.astype(object).apply(convert) ) # Then try with a nullable type. # Floats are already nullable so we don't need to do anything elif is_bool or is_int: # Bare nullable dtype # Already nullable if dtype[0].isupper(): nullable_type = dtype else: nullable_type = nullable_dtypes[dtype] to_nullable = astype(nullable_type) if nullable: # Only allow nullable dtype conversion. from_numeric = Alternative( to_nullable ) elif nullable is None: # (nullable == None): default behaviour, try both. from_numeric = Alternative( basic, to_nullable ) else: # Do not convert to nullable dtype unless the user specified one. from_numeric = Alternative( basic ) if is_int: parse = Alternative( from_numeric, # Maybe we were trying to parse some strings that turned out to # need to go through the Python int constructor to be parsed, # so do that first Pipeline( Alternative( # Parse as integer make_convert(int), # Parse as hex int make_convert(functools.partial(int, base=16)) ), Alternative( from_numeric, # Or just leave the output as it is if nothing else can be # done, as we already have 'object' of an integer type to_object, name='convert parser output', ), name='parse', ), ) elif is_bool: parse = Alternative( Pipeline( # Convert to int first, so that input like b'0' is # converted to int before being interpreted as a bool, # avoiding turning it into "True" make_convert(int), from_numeric, name='parse as int', ), # If that failed, just feed the input to Python's bool() # builtin, and then convert to the right dtype to avoid ending # up with "object" dtype and bool values Pipeline( make_convert(bool), from_numeric, name='parse as bool', ) ) else: assert False pipelines.append(parse) elif dtype == 'string': # Sadly, pandas==1.1.1 (and maybe later) series.astype('string') turns # b'hello' into "b'hello'" instead of "hello", so basic decoder becomes # unusable if ( series.dtype.name == 'object' and series.astype(object).apply(isinstance, args=(bytes,)).any() ): string_basic = None # Handle mixed dtypes str_basic = lambda x : x.astype(object).apply( lambda x: x.decode('ascii') if isinstance(x, bytes) else str(x), ) else: string_basic = basic str_basic = make_convert(str) # Faster than Series.str.decode() basic_decode = lambda x : x.astype(object).apply(bytes.decode, args=('ascii',)) # Significantly faster than Series.str.decode() def fast_decode(x): # Deduplicate the original values by turning into a category x = x.astype('category') cat = x.cat.categories.to_series() # Decode the deduplicated values. # # Since decoding is relatively expensive, doing it on fewer objects # is usually a win, especially since most strings are task names. # # This also has the advantage that the strings are deduplicated, # which is safe since they are immutable. This reduces the memory # used by the final series new_cat = basic_decode(cat) x = x.cat.rename_categories(new_cat) return astype('string')(x) pipelines.extend(( string_basic, # We need to attempt conversion from bytes before using Python str, # otherwise it will include the b'' inside the string fast_decode, # Since decode() is complex, let's have the basic version in case # categories have unexpected limitations basic_decode, # If direct conversion to "string" failed, we need to turn # whatever the type was to actual strings using the Python # constructor Pipeline( str_basic, Alternative( basic, # basic might fail on older version of pandas where # 'string' dtype does not exists to_object, name='convert parse output' ), name='parse' ) )) elif dtype == 'bytes': pipelines.append(make_convert(bytes)) else: # For floats, astype() works well and can even convert from strings and the like pipelines.append(basic) return pipelines(series)
[docs] @DataFrameAccessor.register_accessor def df_convert_to_nullable(df): """ Convert the columns of the dataframe to their equivalent nullable dtype, when possible. :param df: The dataframe to convert. :type df: pandas.DataFrame :returns: The dataframe with converted columns. """ def _series_convert(column): return series_convert(column, str(column.dtype), nullable=True) return df.apply(_series_convert, raw=False)
[docs] @DataFrameAccessor.register_accessor def df_find_redundant_cols(df, col, cols=None): """ Find the columns that are redundant to ``col``, i.e. that can be computed as ``df[x] = df[col].map(dict(...))``. :param df: Dataframe to analyse. :type df: pandas.DataFrame :param col: Reference column :type col: str :param cols: Columns to restrict the analysis to. If ``None``, all columns are used. :type cols: str or None """ grouped = df.groupby(col, observed=True, group_keys=False) cols = cols or (set(df.columns) - {col}) return { _col: dict(map( lambda x: (x[0], x[1][0]), series.items() )) for _col, series in ( ( _col, grouped[_col].unique() ) for _col in cols if (grouped[_col].nunique() == 1).all() ) }
# Defined outside SignalDesc as it references SignalDesc itself _SIGNALS = [ SignalDesc('sched_switch', ['next_comm', 'next_pid']), SignalDesc('sched_switch', ['prev_comm', 'prev_pid']), SignalDesc('sched_waking', ['target_cpu']), SignalDesc('sched_waking', ['comm', 'pid']), SignalDesc('sched_wakeup', ['target_cpu']), SignalDesc('sched_wakeup', ['comm', 'pid']), SignalDesc('sched_wakeup_new', ['target_cpu']), SignalDesc('sched_wakeup_new', ['comm', 'pid']), SignalDesc('cpu_idle', ['cpu_id']), SignalDesc('sched_cpu_capacity', ['cpu']), SignalDesc('cpu_frequency', ['cpu_id']), SignalDesc('userspace@cpu_frequency_devlib', ['cpu_id']), SignalDesc('sched_compute_energy', ['comm', 'pid']), SignalDesc('clk_set_rate', ['name']), SignalDesc('clk_enable', ['name']), SignalDesc('clk_disable', ['name']), SignalDesc('sched_pelt_se', ['comm', 'pid']), SignalDesc('sched_load_se', ['comm', 'pid']), SignalDesc('sched_util_est_se', ['comm', 'pid']), SignalDesc('sched_util_est_cfs', ['cpu']), SignalDesc('sched_pelt_cfs', ['path', 'cpu']), SignalDesc('sched_load_cfs_rq', ['path', 'cpu']), SignalDesc('sched_pelt_irq', ['cpu']), SignalDesc('sched_pelt_rt', ['cpu']), SignalDesc('sched_pelt_dl', ['cpu']), SignalDesc('uclamp_util_se', ['pid', 'comm']), SignalDesc('uclamp_util_cfs', ['cpu']), SignalDesc('sched_overutilized', []), SignalDesc('sched_process_wait', ['comm', 'pid']), SignalDesc('schedutil_em_boost', ['cpu']), SignalDesc('thermal_temperature', ['id']), SignalDesc('thermal_zone_trip', ['id']), ] """ List of predefined :class:`SignalDesc`. """ SignalDesc._SIGNALS_MAP = { event: list(signal_descs) for event, signal_descs in groupby(_SIGNALS, key=attrgetter('event')) } # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab