Source code for lisa.energy_meter

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015-2020, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
import json
import os
import os.path
import time
import shutil

from collections import namedtuple
from collections.abc import Mapping
from subprocess import Popen, PIPE, STDOUT
import subprocess
from time import sleep
import typing

import numpy as np
import pandas as pd
import psutil

import devlib

from lisa.utils import Loggable, get_subclasses, ArtifactPath, HideExekallID, deprecate
from lisa.datautils import series_integrate
from lisa.conf import (
    SimpleMultiSrcConf, KeyDesc, TopLevelKeyDesc, Configurable,
)

# Default energy measurements for each board
EnergyReport = namedtuple('EnergyReport',
                          ['channels', 'report_file', 'data_frame'])

_deprecate_emeter = deprecate(
    'LISA energy meters are deprecated, please use devlib instruments or contribute the instrument to devlib',
    deprecated_in='2.0',
    removed_in='4.0',
)


[docs] class EnergyMeter(Loggable, Configurable): """ Abstract Base Class of energy meters. """ def __init__(self, target, res_dir=None): self._target = target res_dir = res_dir if res_dir else target.get_res_dir( name=f'EnergyMeter-{self.name}', symlink=False, ) self._res_dir = res_dir
[docs] @classmethod def from_conf(cls, target, conf, res_dir=None): """ Build an instance of :class:`EnergyMeter` from a configuration object. :param target: Target to use :type target: lisa.target.Target :param conf: Configuration object to use :param res_dir: Result directory to use :type res_dir: str or None """ # Select the right subclass according to the type of the configuration # object we are given for subcls in get_subclasses(cls) | {cls}: try: conf_cls = subcls.CONF_CLASS except AttributeError: continue if isinstance(conf, conf_cls): chosen_cls = subcls break else: chosen_cls = cls chosen_cls.get_logger(f'{chosen_cls.name} energy meter configuration:\n{conf}') kwargs = chosen_cls.conf_to_init_kwargs(conf) kwargs.update( target=target, res_dir=res_dir, ) chosen_cls.check_init_param(**kwargs) return chosen_cls(**kwargs)
[docs] @abc.abstractmethod def name(): pass
[docs] @abc.abstractmethod def sample(self): """ Get a sample from the energy meter """
[docs] @abc.abstractmethod def reset(self): """ Reset the energy meter """
[docs] @abc.abstractmethod def report(self): """ Get total energy consumption since last :meth:`reset` """
[docs] class HWMonConf(SimpleMultiSrcConf, HideExekallID): """ Configuration class for :class:`HWMon`. {generated_help} {yaml_example} """ STRUCTURE = TopLevelKeyDesc('hwmon-conf', 'HWMon Energy Meter configuration', ( # TODO: find a better help and maybe a better type KeyDesc('channel-map', 'Channels to use', [Mapping]), ))
[docs] @_deprecate_emeter class HWMon(EnergyMeter): """ HWMon energy meter {configurable_params} """ CONF_CLASS = HWMonConf name = 'hwmon' def __init__(self, target, channel_map, res_dir=None): super().__init__(target, res_dir) logger = self.logger # Energy readings self.readings = {} if not self._target.is_module_available('hwmon'): raise RuntimeError('HWMON devlib module not enabled') # Initialize HWMON instrument logger.info('Scanning for HWMON channels, may take some time...') self._hwmon = devlib.HwmonInstrument(self._target) # Decide which channels we'll collect data from. # If the caller provided a channel_map, require that all the named # channels exist. # Otherwise, try using the big.LITTLE core names as channel names. # If they don't match, just collect all available channels. available_sites = [c.site for c in self._hwmon.get_channels('energy')] self._channels = channel_map if self._channels: # If the user provides a channel_map then require it to be correct. if not all(s in available_sites for s in list(self._channels.values())): raise RuntimeError( f"Found sites {sorted(available_sites)} but channel_map contains {sorted(self._channels.values())}") elif self._target.big_core: bl_sites = [self._target.big_core.upper(), self._target.little_core.upper()] if all(s in available_sites for s in bl_sites): logger.info('Using default big.LITTLE hwmon channels') self._channels = dict(zip(['big', 'LITTLE'], bl_sites)) if not self._channels: logger.info('Using all hwmon energy channels') self._channels = {site: site for site in available_sites} # Configure channels for energy measurements channels = sorted(self._channels.values()) logger.debug(f'Enabling channels: {channels}') self._hwmon.reset(kinds=['energy'], sites=channels) # Logging enabled channels logger.info('Channels selected for energy sampling: {}'.format( ', '.join(channel.label for channel in self._hwmon.active_channels) ))
[docs] def sample(self): logger = self.logger samples = self._hwmon.take_measurement() for s in samples: site = s.channel.site value = s.value if site not in self.readings: self.readings[site] = { 'last': value, 'delta': 0, 'total': 0 } continue self.readings[site]['delta'] = value - self.readings[site]['last'] self.readings[site]['last'] = value self.readings[site]['total'] += self.readings[site]['delta'] logger.debug(f'SAMPLE: {self.readings}') return self.readings
[docs] def reset(self): self.sample() for site in self.readings: self.readings[site]['delta'] = 0 self.readings[site]['total'] = 0 self.logger.debug(f'RESET: {self.readings}')
[docs] def report(self, out_dir, out_file='energy.json'): # Retrive energy consumption data nrg = self.sample() # Reformat data for output generation clusters_nrg = {} for channel, site in self._channels.items(): if site not in nrg: raise RuntimeError( f'hwmon channel "{channel}" not available. Selected channels: {list(nrg.keys())}' ) nrg_total = nrg[site]['total'] self.logger.debug(f'Energy [{site:>16}]: {nrg_total:.6f}') clusters_nrg[channel] = nrg_total # Dump data as JSON file nrg_file = os.path.join(out_dir, out_file) with open(nrg_file, 'w') as ofile: json.dump(clusters_nrg, ofile, sort_keys=True, indent=4) return EnergyReport(clusters_nrg, nrg_file, None)
[docs] class _DevlibContinuousEnergyMeter(EnergyMeter): """ :meta public: Common functionality for devlib Instruments in CONTINUOUS mode """
[docs] def reset(self): self._instrument.start()
[docs] def report(self, out_dir, out_energy='energy.json', out_samples='samples.csv'): self._instrument.stop() df = self._read_csv(out_dir, out_samples) df = self._build_timeline(df) if df.empty: raise RuntimeError('No energy data collected') channels_nrg = self._compute_energy(df) # Dump data as JSON file nrg_file = os.path.join(out_dir, out_energy) with open(nrg_file, 'w') as ofile: json.dump(channels_nrg, ofile, sort_keys=True, indent=4) return EnergyReport(channels_nrg, nrg_file, df)
def _read_csv(self, out_dir, out_samples): csv_path = os.path.join(out_dir, out_samples) csv_data = self._instrument.get_data(csv_path) with open(csv_path) as f: # Each column in the CSV will be headed with 'SITE_measure' # (e.g. 'BAT_power'). Convert that to a list of ('SITE', 'measure') # tuples, then pass that as the `names` parameter to read_csv to get # a nested column index. None of devlib's standard measurement types # have '_' in the name so this use of rsplit should be fine. exp_headers = [c.label for c in csv_data.channels] headers = f.readline().strip().split(',') if set(headers) != set(exp_headers): raise ValueError( 'Unexpected headers in CSV from devlib instrument. ' f'Expected {sorted(headers)}, found {sorted(exp_headers)}' ) columns = [tuple(h.rsplit('_', 1)) for h in headers] # Passing `names` means read_csv doesn't expect to find headers in # the CSV (i.e. expects every line to hold data). This works because # we have already consumed the first line of `f`. df = pd.read_csv(f, names=columns) return df def _build_timeline(self, df): sample_period = 1. / self._instrument.sample_rate_hz df.index = np.linspace(0, sample_period * len(df), num=len(df)) return df @staticmethod def _compute_energy(df): channels_nrg = {} for site, measure in df: if measure == 'power': channels_nrg[site] = series_integrate(df[site]['power'], method='trapz') return channels_nrg
[docs] class AEPConf(SimpleMultiSrcConf, HideExekallID): """ Configuration class for :class:`AEP`. {generated_help} {yaml_example} """ STRUCTURE = TopLevelKeyDesc('aep-conf', 'AEP Energy Meter configuration', ( KeyDesc('channel-map', 'Channels to use', [Mapping]), KeyDesc('resistor-values', 'Resistor values', [typing.Sequence[float]]), KeyDesc('labels', 'List of labels', [typing.Sequence[str]]), KeyDesc('device-entry', 'TTY device', [typing.Sequence[str]]), ))
[docs] @_deprecate_emeter class AEP(_DevlibContinuousEnergyMeter): """ Arm Energy Probe energy meter {configurable_params} """ name = 'aep' CONF_CLASS = AEPConf def __init__(self, target, resistor_values, labels=None, device_entry='/dev/ttyACM0', res_dir=None): super().__init__(target, res_dir) logger = self.logger # Configure channels for energy measurements self._instrument = devlib.EnergyProbeInstrument( self._target, resistor_values=resistor_values, labels=labels, device_entry=device_entry, ) # Configure channels for energy measurements logger.debug('Enabling channels') self._instrument.reset() # Logging enabled channels logger.info(f'Channels selected for energy sampling: {self._instrument.active_channels}') logger.debug(f'Results dir: {self._res_dir}')
[docs] class MonsoonConf(SimpleMultiSrcConf, HideExekallID): """ Configuration class for :class:`Monsoon`. {generated_help} {yaml_example} """ STRUCTURE = TopLevelKeyDesc('monsoon-conf', 'Monsoon Energy Meter configuration', ( KeyDesc('channel-map', 'Channels to use', [Mapping]), KeyDesc('monsoon-bin', 'monsoon binary path', [str]), KeyDesc('tty-device', 'TTY device to use', [str]), ))
[docs] @_deprecate_emeter class Monsoon(_DevlibContinuousEnergyMeter): """ Monsoon Solutions energy meter {configurable_params} """ name = 'monsoon' CONF_CLASS = MonsoonConf def __init__(self, target, monsoon_bin=None, tty_device=None, res_dir=None): super().__init__(target, res_dir) self._instrument = devlib.MonsoonInstrument(self._target, monsoon_bin=monsoon_bin, tty_device=tty_device) self._instrument.reset()
[docs] class ACMEConf(SimpleMultiSrcConf, HideExekallID): """ Configuration class for :class:`ACME`. {generated_help} {yaml_example} """ STRUCTURE = TopLevelKeyDesc('acme-conf', 'ACME Energy Meter configuration', ( KeyDesc('channel-map', 'Channels to use', [Mapping]), KeyDesc('host', 'Hostname or IP address of the ACME board', [str]), KeyDesc('iio-capture-bin', 'path to iio-capture binary', [str]), ))
[docs] @_deprecate_emeter class ACME(EnergyMeter): """ BayLibre's ACME board based EnergyMeter {configurable_params} """ name = 'acme' CONF_CLASS = ACMEConf REPORT_DELAY_S = 2.0 """ iio-capture returns an empty string if killed right after its invocation, so we have to enforce a delay between reset() and report() """ def __init__(self, target, # pylint: disable=dangerous-default-value channel_map={'CH0': 0}, host='baylibre-acme.local', iio_capture_bin='iio-capture', res_dir=None ): super().__init__(target, res_dir) logger = self.logger self._iiocapturebin = iio_capture_bin self._hostname = host self.reset_time = None # Make a copy to be sure to never modify the default value self._channels = dict(channel_map) self._iio = {} logger.info('ACME configuration:') logger.info(f' binary: {self._iiocapturebin}') logger.info(f' device: {self._hostname}') logger.info(' channels: {}'.format(', '.join( self._str(channel) for channel in self._channels ))) # Check if iio-capture binary is available try: subprocess.call([self._iiocapturebin, '-h'], stdout=PIPE, stderr=STDOUT) except FileNotFoundError as e: logger.error(f'iio-capture binary {self._iiocapturebin} not available') raise FileNotFoundError('Missing iio-capture binary') from e
[docs] def sample(self): raise NotImplementedError('Not available for ACME')
def _iio_device(self, channel): return f'iio:device{self._channels[channel]}' def _str(self, channel): return f'{channel} ({self._iio_device(channel)})'
[docs] def reset(self): """ Reset energy meter and start sampling from channels specified in the target configuration. """ logger = self.logger # Terminate already running iio-capture instance (if any) wait_for_termination = 0 for proc in psutil.process_iter(): if self._iiocapturebin not in proc.cmdline(): continue for channel in self._channels: if self._iio_device(channel) in proc.cmdline(): logger.debug(f'Killing previous iio-capture for {self._iio_device(channel)}') logger.debug(proc.cmdline()) proc.kill() wait_for_termination = 2 # Wait for previous instances to be killed sleep(wait_for_termination) # Start iio-capture for all channels required for channel in self._channels: ch_id = self._channels[channel] # Setup CSV file to collect samples for this channel csv_file = ArtifactPath.join(self._res_dir, f'samples_{channel}.csv') # Start a dedicated iio-capture instance for this channel self._iio[ch_id] = Popen(['stdbuf', '-i0', '-o0', '-e0', self._iiocapturebin, '-n', self._hostname, '-o', '-c', '-f', str(csv_file), self._iio_device(channel)], stdout=PIPE, stderr=STDOUT, universal_newlines=True) # Wait some time before to check if there is any output sleep(1) # Check that all required channels have been started for channel in self._channels: ch_id = self._channels[channel] self._iio[ch_id].poll() if self._iio[ch_id].returncode: logger.error(f'Failed to run {self._iiocapturebin} for {self._str(channel)}') logger.warning(f'Make sure there are no iio-capture processes connected to {self._hostname} and device {self._str(channel)}') out, _ = self._iio[ch_id].communicate() logger.error(f'Output: {out.strip()}') self._iio[ch_id] = None raise RuntimeError('iio-capture connection error') logger.debug(f'Started {self._iiocapturebin} on {self._str(channel)}...') self.reset_time = time.monotonic()
[docs] def report(self, out_dir, out_energy='energy.json'): """ Stop iio-capture and collect sampled data. :param out_dir: Output directory where to store results :type out_dir: str :param out_file: File name where to save energy data :type out_file: str """ delta = time.monotonic() - self.reset_time if delta < self.REPORT_DELAY_S: sleep(self.REPORT_DELAY_S - delta) logger = self.logger channels_nrg = {} channels_stats = {} for channel, ch_id in self._channels.items(): if self._iio[ch_id] is None: continue self._iio[ch_id].poll() if self._iio[ch_id].returncode: # returncode not None means that iio-capture has terminated # already, so there must have been an error out, _ = self._iio[ch_id].communicate() logger.error(f'{self._iiocapturebin} terminated for {self._str(channel)}: {out}') self._iio[ch_id] = None continue # kill process and get return self._iio[ch_id].terminate() out, _ = self._iio[ch_id].communicate() self._iio[ch_id].wait() self._iio[ch_id] = None # iio-capture return "energy=value", add a simple format check if '=' not in out: logger.error(f'Bad output format for {self._str(channel)}: {out}') continue else: logger.debug(f'{self._str(channel)}: {out}') # Build energy counter object nrg = {} for kv_pair in out.split(): key, val = kv_pair.partition('=')[::2] nrg[key] = float(val) channels_stats[channel] = nrg logger.debug(self._str(channel)) logger.debug(nrg) src = os.path.join(self._res_dir, f'samples_{channel}.csv') shutil.move(src, out_dir) # Add channel's energy to return results channels_nrg[f'{channel}'] = nrg['energy'] # Dump energy data nrg_file = os.path.join(out_dir, out_energy) with open(nrg_file, 'w') as ofile: json.dump(channels_nrg, ofile, sort_keys=True, indent=4) # Dump energy stats nrg_stats_file = os.path.splitext(out_energy)[0] + \ '_stats' + os.path.splitext(out_energy)[1] nrg_stats_file = os.path.join(out_dir, nrg_stats_file) with open(nrg_stats_file, 'w') as ofile: json.dump(channels_stats, ofile, sort_keys=True, indent=4) return EnergyReport(channels_nrg, nrg_file, None)
[docs] class Gem5EnergyMeterConf(SimpleMultiSrcConf, HideExekallID): """ Configuration class for :class:`Gem5EnergyMeter`. {generated_help} {yaml_example} """ STRUCTURE = TopLevelKeyDesc('gem5-energy-meter-conf', 'Gem5 Energy Meter configuration', ( KeyDesc('channel-map', 'Channels to use', [Mapping]), ))
[docs] @_deprecate_emeter class Gem5EnergyMeter(_DevlibContinuousEnergyMeter): name = 'gem5' CONF_CLASS = Gem5EnergyMeterConf def __init__(self, target, channel_map, res_dir=None): super().__init__(target, res_dir) power_sites = list(channel_map.values()) self._instrument = devlib.Gem5PowerInstrument(self._target, power_sites)
[docs] def reset(self): self._instrument.reset() self._instrument.start()
def _build_timeline(self, df): # Power measurements on gem5 are performed not only periodically but also # spuriously on OPP changes. Let's use the time channel provided by the # gem5 power instrument to build the timeline accordingly. for site, measure in df: if measure == 'time': meas_dur = df[site]['time'] break timeline = np.zeros(len(meas_dur)) # The time channel gives the elapsed time since previous measurement for i in range(1, len(meas_dur)): timeline[i] = meas_dur[i] + timeline[i - 1] df.index = timeline return df
# vim :set tabstop=4 shiftwidth=4 expandtab textwidth=80