Source code for lisa.analysis.frequency

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" Frequency Analysis Module """

import itertools
import functools
import operator

import pandas as pd
import numpy as np
import holoviews as hv

from lisa.analysis.base import TraceAnalysisBase
from lisa.trace import requires_events, requires_one_event_of, CPU, MissingTraceEventError
from lisa.datautils import series_integrate, df_refit_index, series_refit_index, series_deduplicate, df_add_delta, series_mean, df_window, df_merge, SignalDesc
from lisa.notebook import plot_signal, _hv_neutral


[docs] class FrequencyAnalysis(TraceAnalysisBase): """ Support for plotting Frequency Analysis data :param trace: input Trace object :type trace: lisa.trace.Trace """ name = 'frequency'
[docs] @requires_one_event_of('cpu_frequency', 'userspace@cpu_frequency_devlib') def df_cpus_frequency(self, signals_init=True): """ Similar to ``trace.df_event('cpu_frequency')``, with ``userspace@cpu_frequency_devlib`` support. :param signals_init: If ``True``, and initial value for signals will be provided. This includes initial value taken outside window boundaries and devlib-provided events. The ``userspace@cpu_frequency_devlib`` user event is merged in the dataframe if it provides earlier values for a CPU. """ def rename(df): return df.rename( { 'cpu_id': 'cpu', 'state': 'frequency', }, axis=1, ) def check_empty(df, excep): if df.empty: raise excep else: return df try: df = self.trace.df_event( 'cpu_frequency', signals=( [SignalDesc('cpu_frequency', ['cpu_id'])] if signals_init else [] ) ) except MissingTraceEventError as e: excep = e df = pd.DataFrame(columns=['cpu', 'frequency']) else: excep = None df = rename(df) if not signals_init: return check_empty(df, excep) try: devlib_df = self.trace.df_event('userspace@cpu_frequency_devlib') except MissingTraceEventError as e: return check_empty(df, e) else: devlib_df = rename(devlib_df) def groupby_cpu(df): return df.groupby('cpu', observed=True, sort=False, group_keys=False) # Get the initial values for each CPU def init_freq(df, from_devlib): df = groupby_cpu(df).head(1).copy() df['from_devlib'] = from_devlib return df init_df = init_freq(df, False) init_devlib_df = init_freq(devlib_df, True) # Get the first frequency for each CPU as given by devlib and cpufreq. init_df = pd.concat([init_df, init_devlib_df]) init_df.sort_index(inplace=True) # Get the first value for each CPU first_df = groupby_cpu(init_df).head(1) # Only keep the ones coming from devlib, as the other ones are already # in the cpufreq df first_df = first_df[first_df['from_devlib'] == True] del first_df['from_devlib'] df = pd.concat([first_df, df]) df.sort_index(inplace=True) df.index.name = 'Time' return check_empty(df, None)
[docs] @df_cpus_frequency.used_events def df_cpu_frequency(self, cpu, **kwargs): """ Same as :meth:`df_cpus_frequency` but for a single CPU. :param cpu: CPU ID to get the frequency of. :type cpu: int :Variable keyword arguments: Forwarded to :meth:`df_cpus_frequency`. """ df = self.df_cpus_frequency(**kwargs) return df[df['cpu'] == cpu]
@df_cpus_frequency.used_events def _check_freq_domain_coherency(self, cpus=None): """ Check that all CPUs of a given frequency domain have the same frequency transitions. :param cpus: CPUs to take into account. All other CPUs are ignored. If `None`, all CPUs will be checked. :type cpus: list(int) or None """ domains = self.trace.plat_info['freq-domains'] if cpus is None: cpus = list(itertools.chain.from_iterable(domains)) if len(cpus) < 2: return df = self.df_cpus_frequency() for domain in domains: # restrict the domain to what we care. Other CPUs may have garbage # data, but the caller is not going to look at it anyway. domain = set(domain) & set(cpus) if len(domain) < 2: continue # Get the frequency column for each CPU in the domain freq_columns = [ # drop the index since we only care about the transitions, and # not when they happened df[df['cpu'] == cpu]['frequency'].reset_index(drop=True) for cpu in domain ] # Check that all columns are equal. If they are not, that means that # at least one CPU has a frequency transition that is different # from another one in the same domain, which is highly suspicious ref = freq_columns[0] for col in freq_columns: # If the trace started in the middle of a group of transitions, # ignore that transition by shifting and re-test if not (ref.equals(col) or ref[:-1].equals(col.shift()[1:])): raise ValueError(f'Frequencies of CPUs in the freq domain {cpus} are not coherent') @TraceAnalysisBase.df_method @df_cpus_frequency.used_events @requires_events('cpu_idle') def _get_frequency_residency(self, cpus): """ Get a DataFrame with per cluster frequency residency, i.e. amount of time spent at a given frequency in each cluster. :param cpus: A tuple of CPU IDs :type cpus: tuple(int) :returns: A :class:`pandas.DataFrame` with: * A ``total_time`` column (the total time spent at a frequency) * A ``active_time`` column (the non-idle time spent at a frequency) """ freq_df = self.df_cpus_frequency() # Assumption: all CPUs in a cluster run at the same frequency, i.e. the # frequency is scaled per-cluster not per-CPU. Hence, we can limit the # cluster frequencies data to a single CPU. self._check_freq_domain_coherency(cpus) cluster_freqs = freq_df[freq_df.cpu == cpus[0]] # Compute TOTAL Time cluster_freqs = df_add_delta(cluster_freqs, col="total_time", window=self.trace.window) time_df = cluster_freqs[["total_time", "frequency"]].groupby('frequency', observed=True, sort=False, group_keys=False).sum() # Compute ACTIVE Time cluster_active = self.ana.idle.signal_cluster_active(cpus) # In order to compute the active time spent at each frequency we # multiply 2 square waves: # - cluster_active, a square wave of the form: # cluster_active[t] == 1 if at least one CPU is reported to be # non-idle by CPUFreq at time t # cluster_active[t] == 0 otherwise # - freq_active, square wave of the form: # freq_active[t] == 1 if at time t the frequency is f # freq_active[t] == 0 otherwise cluster_freqs = cluster_freqs.join( cluster_active.to_frame(name='active'), how='outer') cluster_freqs.ffill(inplace=True) # Compute total time by integrating the square wave time_df['active_time'] = pd.Series({ freq: series_integrate( cluster_freqs['active'] * (cluster_freqs['frequency'] == freq) ) for freq in cluster_freqs['frequency'].unique() }) return time_df
[docs] @_get_frequency_residency.used_events def df_cpu_frequency_residency(self, cpu): """ Get per-CPU frequency residency, i.e. amount of time CPU `cpu` spent at each frequency. :param cpu: CPU ID :type cpu: int :returns: A :class:`pandas.DataFrame` with: * A ``total_time`` column (the total time spent at a frequency) * A ``active_time`` column (the non-idle time spent at a frequency) """ if not isinstance(cpu, int): raise TypeError('Input CPU parameter must be an integer') return self._get_frequency_residency((cpu,))
[docs] @_get_frequency_residency.used_events def df_domain_frequency_residency(self, cpu): """ Get per-frequency-domain frequency residency, i.e. amount of time each domain at each frequency. :param cpu: Any CPU of the domain to analyse :type cpu: int :returns: A :class:`pandas.DataFrame` with: * A ``total_time`` column (the total time spent at a frequency) * A ``active_time`` column (the non-idle time spent at a frequency) """ domains = [ domain for domain in self.trace.plat_info['freq-domains'] if cpu in domain ] if not domains: raise ValueError(f'The given CPU "{cpu}" does not belong to any domain') else: domain, = domains return self._get_frequency_residency(tuple(domain))
[docs] @TraceAnalysisBase.df_method @df_cpu_frequency.used_events def df_cpu_frequency_transitions(self, cpu): """ Compute number of frequency transitions of a given CPU. :param cpu: a CPU ID :type cpu: int :returns: A :class:`pandas.DataFrame` with: * A ``transitions`` column (the number of frequency transitions) """ freq_df = self.df_cpu_frequency(cpu, signals_init=False) # Since we want to count the number of events appearing inside the # window, make sure we don't get anything outside it freq_df = df_window( freq_df, window=self.trace.window, method='exclusive', ) cpu_freqs = freq_df['frequency'] # Remove possible duplicates (example: when devlib sets trace markers # a cpu_frequency event is triggered that can generate a duplicate) cpu_freqs = series_deduplicate(cpu_freqs, keep='first', consecutives=True) transitions = cpu_freqs.value_counts() transitions.name = "transitions" transitions.sort_index(inplace=True) return pd.DataFrame(transitions)
[docs] @TraceAnalysisBase.df_method @df_cpu_frequency_transitions.used_events def df_cpu_frequency_transition_rate(self, cpu): """ Compute frequency transition rate of a given CPU. :param cpu: a CPU ID :type cpu: int :returns: A :class:`pandas.DataFrame` with: * A ``transitions`` column (the number of frequency transitions per second) """ transitions = self.df_cpu_frequency_transitions(cpu)['transitions'] return pd.DataFrame(dict( transitions=transitions / self.trace.time_range, ))
[docs] @df_cpu_frequency.used_events def get_average_cpu_frequency(self, cpu): """ Get the average frequency for a given CPU :param cpu: The CPU to analyse :type cpu: int """ df = self.df_cpu_frequency(cpu) freq = series_refit_index(df['frequency'], window=self.trace.window) return series_mean(freq)
[docs] @TraceAnalysisBase.df_method @requires_events('clk_set_rate', 'clk_enable', 'clk_disable') def df_peripheral_clock_effective_rate(self, clk_name): # Note: the kernel still defines a "clock_*" variant for each of these, # but it's not actually used anywhere in the code. The new "clk_*" # events are the ones we are interested about. rate_df = self.trace.df_event('clk_set_rate') enable_df = self.trace.df_event('clk_enable').copy() disable_df = self.trace.df_event('clk_disable').copy() # Add 'state' for enable and disable events enable_df['state'] = 1 disable_df['state'] = 0 freq = rate_df[rate_df['name'] == clk_name] enables = enable_df[enable_df['name'] == clk_name] disables = disable_df[disable_df['name'] == clk_name] freq = df_merge((freq, enables, disables)).ffill() freq['start'] = freq.index df_add_delta( freq, col='len', src_col='start', window=self.trace.window, inplace=True ) freq['effective_rate'] = np.where( freq['state'] == 0, 0, freq['rate'] ) return freq
############################################################################### # Plotting Methods ###############################################################################
[docs] @TraceAnalysisBase.plot_method @df_cpu_frequency.used_events def plot_cpu_frequencies(self, cpu: CPU, average: bool=True): """ Plot frequency for the specified CPU :param cpu: The CPU for which to plot frequencies :type cpus: int :param average: If ``True``, add a horizontal line which is the frequency average. :type average: bool If ``sched_overutilized`` events are available, the plots will also show the intervals of time where the system was overutilized. """ logger = self.logger df = self.df_cpu_frequency(cpu) if "freqs" in self.trace.plat_info: frequencies = self.trace.plat_info['freqs'][cpu] else: logger.info(f"Estimating CPU{cpu} frequencies from trace") frequencies = sorted(list(df.frequency.unique())) logger.debug(f"Estimated frequencies: {frequencies}") avg = self.get_average_cpu_frequency(cpu) logger.info( "Average frequency for CPU{} : {:.3f} GHz".format(cpu, avg / 1e6)) df = df_refit_index(df, window=self.trace.window) fig = plot_signal(df['frequency'], name=f'Frequency of CPU{cpu} (Hz)') if average and avg > 0: fig *= hv.HLine(avg, group='average').opts(color='red') plot_overutilized = self.ana.status.plot_overutilized if self.trace.has_events(plot_overutilized.used_events): fig *= plot_overutilized() return fig
[docs] @TraceAnalysisBase.plot_method @plot_cpu_frequencies.used_events def plot_domain_frequencies(self): """ Plot frequency trend for all frequency domains. If ``sched_overutilized`` events are available, the plots will also show the intervals of time where the cluster was overutilized. """ return functools.reduce( operator.add, ( self.plot_cpu_frequencies(domain[0]).relabel( f'Frequencies of domain CPUS {", ".join(map(str, domain))}' ) for domain in self.trace.plat_info['freq-domains'] ) ).cols(1)
[docs] @TraceAnalysisBase.plot_method @df_cpu_frequency_residency.used_events def plot_cpu_frequency_residency(self, cpu: CPU, pct: bool=False, domain_label: bool=False): """ Plot per-CPU frequency residency. :param cpu: The CPU to generate the plot for :type cpu: int :param pct: Plot residencies in percentage :type pct: bool :param domain_label: If ``True``, the labels will mention all CPUs in the domain, rather than the CPU passed. :type domain_label: bool """ residency_df = self.df_cpu_frequency_residency(cpu) total_df = residency_df.total_time active_df = residency_df.active_time if pct: total_df = total_df * 100 / total_df.sum() active_df = active_df * 100 / active_df.sum() ylabel = 'Time share (%)' if pct else 'Time (s)' opts = dict( xlabel='Frequency (Hz)', ylabel=ylabel, # Horizontal bar plots invert_axes=True, ) if domain_label: domains = self.trace.plat_info['freq-domains'] rev_domains = { cpu: sorted(domain) for domain in domains for cpu in domain } def make_label(kind): name = ', '.join(map(str, rev_domains[cpu])) return f'CPUs {name} {kind}frequency residency' else: def make_label(kind): return f'CPU{cpu} {kind}frequency residency' return ( hv.Bars(total_df, label=make_label('total ')).opts(**opts) + hv.Bars(active_df, label=make_label('active ')).opts(**opts) ).cols(1).options( title=make_label('') )
[docs] @TraceAnalysisBase.plot_method @plot_cpu_frequency_residency.used_events def plot_domain_frequency_residency(self, pct: bool=False): """ Plot the frequency residency for all frequency domains. :param pct: Plot residencies in percentage :type pct: bool """ return functools.reduce( operator.add, ( self.plot_cpu_frequency_residency( domain[0], domain_label=True, pct=pct, ) for domain in self.trace.plat_info['freq-domains'] ) ).cols(1)
[docs] @TraceAnalysisBase.plot_method @df_cpu_frequency_transitions.used_events def plot_cpu_frequency_transitions(self, cpu: CPU, pct: bool=False, domain_label: bool=False): """ Plot frequency transitions count of the specified CPU :param cpu: The CPU to genererate the plot for :type cpu: int :param pct: Plot frequency transitions in percentage :type pct: bool :param domain_label: If ``True``, the labels will mention all CPUs in the domain, rather than the CPU passed. :type domain_label: bool """ df = self.df_cpu_frequency_transitions(cpu) if pct: df = df * 100 / df.sum() ylabel = 'Transitions share (%)' if pct else 'Transition count' if domain_label: domains = self.trace.plat_info['freq-domains'] rev_domains = { cpu: sorted(domain) for domain in domains for cpu in domain } name = ', '.join(map(str, rev_domains[cpu])) title = f'Frequency transitions of CPUs {name}' else: title = f'Frequency transitions of CPU{cpu}' if not df.empty: return hv.Bars(df['transitions']).options( title=title, xlabel='Frequency (Hz)', ylabel=ylabel, invert_axes=True, ) else: return _hv_neutral()
[docs] @TraceAnalysisBase.plot_method @plot_cpu_frequency_transitions.used_events def plot_domain_frequency_transitions(self, pct: bool=False): """ Plot frequency transitions count for all frequency domains :param pct: Plot frequency transitions in percentage :type pct: bool """ return functools.reduce( operator.add, ( self.plot_cpu_frequency_transitions( cpu=domain[0], domain_label=True, pct=pct, ) for domain in self.trace.plat_info['freq-domains'] ) ).cols(1)
[docs] @TraceAnalysisBase.plot_method @df_peripheral_clock_effective_rate.used_events def plot_peripheral_frequency(self, clk_name: str, average: bool=True): """ Plot frequency for the specified peripheral clock frequency :param clk_name: The clock name for which to plot frequency :type clk_name: str :param average: If ``True``, add a horizontal line which is the frequency average. :type average: bool """ df = self.df_peripheral_clock_effective_rate(clk_name) freq = df['effective_rate'] freq = series_refit_index(freq, window=self.trace.window) fig = plot_signal(freq, name=f'Frequency of {clk_name} (Hz)') if average: avg = series_mean(freq) if avg > 0: fig *= hv.HLine(avg, group='average').opts(color='red') return fig
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80