# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2018, Arm Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import copy
from collections.abc import Mapping
from collections import OrderedDict
import difflib
import inspect
import itertools
import textwrap
import logging
import re
import contextlib
import pprint
import os
import io
import functools
import threading
import weakref
import typing
from ruamel.yaml.comments import CommentedMap
import typeguard
import lisa
from lisa.utils import (
Serializable, Loggable, get_nested_key, set_nested_key, get_call_site,
is_running_sphinx, get_cls_name, HideExekallID, get_subclasses, groupby,
import_all_submodules,
)
from lisa._generic import check_type
[docs]
class DeferredValue:
"""
Wrapper similar to :func:`functools.partial` allowing to defer computation
of the value until the key is actually used.
Once computed, the deferred value is replaced by the value that was
computed. This is useful for values that are very costly to compute, but
should be used with care as it means it will usually not be available in
the offline :class:`lisa.platforms.platinfo.PlatformInfo` instances. This
means that client code such as submodules of ``lisa.analysis`` will
typically not have it available (unless :meth:`~MultiSrcConf.eval_deferred`
was called) although they might need it.
"""
def __init__(self, callback, *args, **kwargs):
self.callback = callback
self.args = args
self.kwargs = kwargs
self._is_computing = False
[docs]
def __call__(self, key_desc=None):
# Make sure we don't reenter the callback, to avoid infinite loops.
if self._is_computing:
key = key_desc.qualname if key_desc else '<unknown>'
raise KeyComputationRecursionError(f'Recursion error while computing deferred value for key: {key}', key)
try:
self._is_computing = True
return self.callback(*self.args, **self.kwargs)
finally:
self._is_computing = False
def __str__(self):
return f'<lazy value of {self.callback.__qualname__}>'
[docs]
class FilteredDeferredValue(DeferredValue):
"""
Same as :class:`lisa.conf.DeferredValue` except that the given sentinel
value will be interpreted as no value .
"""
def __init__(self, callback, sentinel=None):
@functools.wraps(callback)
def _callback():
x = callback()
if x == sentinel:
raise _DeferredInexistentError()
else:
return x
# Cheat on the name to have better DeferredValue.__str__ output
try:
_callback.__qualname__ = callback.__qualname__
# Not all callbacks have a __qualname__
except AttributeError:
pass
super().__init__(callback=_callback)
[docs]
class DeferredExcep(DeferredValue):
"""
Specialization of :class:`DeferredValue` to lazily raise an exception.
:param excep: Exception to raise when the value is used.
:type excep: BaseException
"""
def __init__(self, excep):
self.excep = excep
def callback():
raise self.excep
super().__init__(callback=callback)
def __str__(self):
return f'<lazy {self.excep.__class__.__qualname__} exception>'
[docs]
class TopLevelKeyError(ValueError):
"""
Exception raised when no top-level key matches the expected one in the
given configuration file.
"""
def __init__(self, key):
self.key = key
def __str__(self):
return f'Key "{self.key}" needs to appear at the top level'
[docs]
class KeyDescBase(abc.ABC):
"""
Base class for configuration files key descriptor.
This allows defining the structure of the configuration file, in order
to sanitize user input and generate help snippets used in various places.
"""
INDENTATION = 4 * ' '
_VALID_NAME_PATTERN = r'^[a-zA-Z0-9-<>]+$'
def __init__(self, name, help):
# pylint: disable=redefined-builtin
self._check_name(name)
self.name = name
self.help = help
self.parent = None
@classmethod
def _check_name(cls, name):
if not re.match(cls._VALID_NAME_PATTERN, name):
raise ValueError(f'Invalid key name "{name}". Key names must match: {cls._VALID_NAME_PATTERN}')
@property
def qualname(self):
"""
"Qualified" name of the key.
This is a slash-separated path in the config file from the root to that
key:
<parent qualname>/<name>
"""
return '/'.join(self.path)
@property
def path(self):
"""
Path in the config file from the root to that key.
.. note:: This includes the top-level key name, which must be removed
before it's fed to :meth:`MultiSrcConf.get_nested_key`.
"""
curr = [self.name]
if self.parent is None:
return curr
return self.parent.path + curr
[docs]
@abc.abstractmethod
def get_help(self, style=None, last=False):
"""
Get a help message describing the key.
:param style: When "rst", ResStructuredText formatting may be applied
:param style: str
:param last: ``True`` if this is the last item in a list.
:type last: bool
"""
[docs]
@abc.abstractmethod
def validate_val(self, val):
"""
Validate a value to be used for that key.
:raises TypeError: When the value has the wrong type
:raises ValueError: If the value does not comply with some other
constraints. Note that constraints should ideally be encoded in the
type itself, to make help message as straightforward as possible.
"""
[docs]
class KeyDesc(KeyDescBase):
"""
Key descriptor describing a leaf key in the configuration.
:param name: Name of the key
:param help: Short help message describing the use of that key
:param classinfo: sequence of allowed types for that key. As a special
case, `None` is allowed in that sequence of types, even though it is
not strictly speaking a type.
:type classinfo: collections.abc.Sequence
:param newtype: If specified, a type with the given name will be created
for that key with that name. Otherwise, a camel-case name derived from
the key name will be used: ``toplevel-key/sublevel/mykey`` will give a
type named `SublevelMykey`. This class will be exposed as an attribute
of the parent :class:`MultiSrcConf` (which is why the toplevel key is
omitted from its name). A getter will also be created on the parent
configuration class, so that the typed key is exposed to ``exekall``.
If the key is not present in the configuration object, the getter will
return ``None``.
:type newtype: str or None
:param deepcopy_val: If ``True``, the values will be deepcopied upon
lookup. This prevents accidental modification of mutable types (like
lists) by the user.
:type deepcopy_val: bool
"""
def __init__(self, name, help, classinfo, newtype=None, deepcopy_val=True):
# pylint: disable=redefined-builtin
super().__init__(name=name, help=help)
# isinstance's style classinfo
self.classinfo = tuple(classinfo)
self._newtype = newtype
self.deepcopy_val = deepcopy_val
@property
def newtype(self):
if self._newtype:
return self._newtype
else:
def filter_compo(compo):
return ''.join(
c
for c in compo
# Keep only characters allowed in identifiers
if c.isidentifier()
).title()
compos = itertools.chain.from_iterable(
x.split('-')
for x in self.path[1:]
)
return ''.join(map(filter_compo, compos))
[docs]
def validate_val(self, val):
"""
Check that the value is an instance of one of the type specified in the
``self.classinfo``.
If the value is not an instance of any of these types, then a
:exc:`TypeError` is raised corresponding to the first type in the
tuple, which is assumed to be the main one.
"""
# Or if that key is supposed to hold a value
classinfo = self.classinfo
key = self.qualname
def checkinstance(key, val, classinfo):
try:
check_type(val, classinfo)
except TypeError as e:
classinfo = ' or '.join(get_cls_name(cls) for cls in classinfo)
raise TypeError(f'Key "{key}" is an instance of {get_cls_name(type(val))}, but should be instance of {classinfo}: {e}. Help: {self.help}', key)
# DeferredValue will be checked when they are computed
if not isinstance(val, DeferredValue):
checkinstance(key, val, classinfo)
[docs]
def get_help(self, style=None, last=False):
base_fmt = '{prefix}{key} ({classinfo}){prefixed_help}.'
if style == 'rst':
prefix = '* '
key = self.name
fmt = base_fmt
elif style == 'yaml':
prefix = ''
key = ''
fmt = '{key}{help}\ntype: {classinfo}'
else:
prefix = ('└' if last else '├') + ' '
key = self.name
fmt = base_fmt
if self.help:
joiner = f"\n{(' ' * len(prefix))}"
wrapped_lines = textwrap.wrap(self.help, width=60)
# If more than one line, output a paragraph on its own starting on
# a new line
multiline = len(wrapped_lines) > 1
if multiline:
wrapped_lines.insert(0, '')
help_ = joiner.join(wrapped_lines)
prefixed_help = (':' if multiline else ': ') + help_
else:
help_ = ''
prefixed_help = help_
return fmt.format(
prefix=prefix,
key=key,
classinfo=' or '.join(
get_cls_name(
key_cls,
style=style,
fully_qualified=False,
)
for key_cls in self.classinfo
),
help=help_,
prefixed_help=prefixed_help,
)
[docs]
class ConfigKeyError(KeyError):
"""
Exception raised when a key is not found in the config instance.
"""
def __init__(self, msg, key=None, src=None):
# pylint: disable=super-init-not-called
self.msg = msg
self.key = key
self.src = src
def __str__(self):
return self.msg
[docs]
class MissingBaseKeyError(ConfigKeyError):
"""
Exception raised when a base key needed to compute a derived key is missing.
"""
[docs]
class DeferredValueComputationError(ConfigKeyError):
"""
Raised when computing the value of :class:`DeferredValue` lead to an
exception.
"""
def __init__(self, msg, excep, key=None, src=None):
self.excep = excep
super().__init__(msg, key, src)
[docs]
class KeyComputationRecursionError(ConfigKeyError, RecursionError):
"""
Raised when :meth:`DerivedKeyDesc.compute_val` is reentered while computing
a given key on a configuration instance, or when a :class:`DeferredValue`
callback is reentered.
"""
class _DeferredInexistentError(Exception):
"""
Raised when a :class:`lisa.conf.DeferredValue` cannot compute the value. It
is then interpreted by the machinery as if no value was provided for that
key.
"""
[docs]
class DerivedKeyDesc(KeyDesc):
"""
Key descriptor describing a key derived from other keys
Derived keys cannot be added from a source, since they are purely computed
out of other keys. It is also not possible to change their source
priorities. To achieve that, set the source priorities on the keys it is
based on.
:param base_key_paths: List of paths to the keys this key is derived from.
The paths in the form of a list of string are relative to the current
level. To reference a level above the current one, use the special key
``..``.
:type base_key_paths: list(list(str))
:param compute: Function used to compute the value of the key. It takes a
dictionary of base keys specified in ``base_key_paths`` as only
parameter and is expected to return the key's value.
:type compute: collections.abc.Callable
"""
def __init__(self, name, help, classinfo, base_key_paths, compute, newtype=None):
# pylint: disable=redefined-builtin
super().__init__(name=name, help=help, classinfo=classinfo, newtype=newtype)
self._base_key_paths = base_key_paths
self._compute = compute
self._is_computing_in = set()
@property
def help(self):
return '(derived from {}) '.format(
', '.join(sorted(
self._get_base_key_qualname(path)
for path in self._base_key_paths
))
) + self._help
@help.setter
def help(self, val):
self._help = val
[docs]
@staticmethod
def make_get_key(conf, **kwargs):
f = conf.__class__.get_key
return functools.partial(f, **kwargs)
@classmethod
def _get_base_key_val(cls, conf, path, eval_deferred=True):
get_key = cls.make_get_key(conf, quiet=True, eval_deferred=eval_deferred)
return get_nested_key(conf, path, getitem=get_key)
@classmethod
def _get_base_key_src(cls, conf, path):
get_key = cls.make_get_key(conf, quiet=True)
conf = get_nested_key(conf, path[:-1], getitem=get_key)
return conf.resolve_src(path[-1])
def _resolve_key_path(self, key_path):
def should_skip(n, key):
if key == '..':
n += 1
skip = True
elif n:
skip = True
n -= 1
else:
skip = False
return (n, skip)
# Traverse the path in opposit order so we can know if a component needs to
# be removed based on previous components
key_path = self.parent.path + key_path
key_path = list(reversed(key_path))
n = 0
skip_list = []
for key in key_path:
n, skip = should_skip(n, key)
skip_list.append(skip)
resolved = [
key for key, skip in zip(key_path, skip_list)
if not skip
]
return list(reversed(resolved))
def _get_base_key_qualname(self, key_path):
path = self._resolve_key_path(key_path)
return '/'.join(path)
def _get_base_conf(self, conf, eval_deferred=True):
try:
base_conf = {}
for key_path in self._base_key_paths:
val = self._get_base_key_val(conf, key_path, eval_deferred=eval_deferred)
set_nested_key(base_conf, key_path, val)
return base_conf
except ConfigKeyError as e:
key = self.qualname
raise MissingBaseKeyError(
f'Missing value for base key "{e.key}" in order to compute derived key "{key}": {e.msg}',
key=key,
) from e
[docs]
def get_non_evaluated_base_keys(self, conf):
"""
Get the :class:`KeyDescBase` of base keys that have a :class:`DeferredValue`
value.
"""
def get_key_desc(path):
path = self._resolve_key_path(path)[1:]
return get_nested_key(conf.STRUCTURE, path)
bases = {
get_key_desc(key_path): self._get_base_key_val(conf, key_path, eval_deferred=False)
for key_path in self._base_key_paths
}
return [
key_desc
for key_desc, val in bases.items()
if isinstance(val, DeferredValue)
]
[docs]
def can_be_computed(self, conf):
try:
self._get_base_conf(conf, eval_deferred=False)
except MissingBaseKeyError:
return False
else:
return True
[docs]
def compute_val(self, conf, eval_deferred=True):
conf_id = id(conf)
if conf_id in self._is_computing_in:
key = self.qualname
raise KeyComputationRecursionError(f'Recursion error while computing derived key: {key}', key)
else:
try:
self._is_computing_in.add(conf_id)
# If there is non evaluated base, transitively return a closure rather
# than computing now.
if not eval_deferred and self.get_non_evaluated_base_keys(conf):
val = DeferredValue(self.compute_val, conf=conf, eval_deferred=True)
else:
base_conf = self._get_base_conf(conf)
val = self._compute(base_conf)
self.validate_val(val)
finally:
self._is_computing_in.remove(conf_id)
return val
[docs]
def get_src(self, conf):
return ','.join(
'{src}({key})'.format(
src=self._get_base_key_src(conf, path),
key=self._get_base_key_qualname(path),
)
for path in self._base_key_paths
)
[docs]
class LevelKeyDesc(KeyDescBase, Mapping):
"""
Key descriptor defining a hierarchical level in the configuration.
:param name: name of the key in the configuration
:param help: Short help describing the use of the keys inside that level
:param children: collections.abc.Sequence of :class:`KeyDescBase` defining the allowed keys
under that level
:type children: collections.abc.Sequence
:param value_path: Relative path to a sub-key that will receive assignment
to that level for non-mapping types. This allows turning a leaf key into a
level while preserving backward compatibility, as long as:
* The key did not accept mapping values, otherwise it would be
ambiguous and is therefore rejected.
* The old leaf key has a matching new leaf key, that is a sub-key
of the new level key.
In practice, that allows turning a single knob into a tree of settings.
:type value_path: list(str) or None
Children keys will get this key assigned as a parent when passed to the
constructor.
"""
def __init__(self, name, help, children, value_path=None):
# pylint: disable=redefined-builtin
super().__init__(name=name, help=help)
# Make it easier to share children with another configuration class by
# making them independent, so we will not accidentally override the
# parent link when it would not be appropriate.
self.children = list(map(copy.deepcopy, children))
# Fixup parent for easy nested declaration
for key_desc in self.children:
key_desc.parent = self
self.value_path = value_path
@property
def key_desc(self):
path = self.value_path
if path is None:
raise AttributeError(f'{self} does not define a value path for direct assignment')
else:
return get_nested_key(self, path)
def __getattr__(self, attr):
# If the property raised an exception, __getattr__ is tried so we need
# to fail explicitly in order to avoid infinite recursion
if attr == 'key_desc':
raise AttributeError('recursive key_desc lookup')
else:
try:
key_desc = self.key_desc
except Exception as e:
raise AttributeError(str(e))
else:
return getattr(key_desc, attr)
@property
def _key_map(self):
return {
key_desc.name: key_desc
for key_desc in self.children
}
def __iter__(self):
return iter(self._key_map)
def __len__(self):
return len(self._key_map)
def __getitem__(self, key):
self.check_allowed_key(key)
return self._key_map[key]
[docs]
def check_allowed_key(self, key):
"""
Checks that a given key is allowed under that levels
"""
try:
self._key_map[key]
except KeyError:
# pylint: disable=raise-missing-from
try:
closest_match = difflib.get_close_matches(
word=str(key),
possibilities=self._key_map.keys(),
n=1,
)[0]
except IndexError:
closest_match = ''
else:
closest_match = f', maybe you meant "{closest_match}" ?'
parent = self.qualname
raise ConfigKeyError(
f'Key "{key}" is not allowed in {parent}{closest_match}',
key=key,
)
[docs]
def validate_val(self, conf):
"""Validate a mapping to be used as a configuration source"""
if not isinstance(conf, Mapping):
key = self.qualname
raise TypeError(f'Configuration of {key} must be a Mapping', key)
for key, val in conf.items():
self[key].validate_val(val)
[docs]
def get_help(self, style=None, last=False):
idt = self.INDENTATION
prefix = '*' if style == 'rst' else ('└' if last else '├')
# Nasty hack: adding an empty ResStructuredText comment between levels
# of nested list avoids getting extra blank line between list items.
# That prevents ResStructuredText from thinking each item must be a
# paragraph.
suffix = '\n\n..\n\n' if style == 'rst' else '\n'
suffix += idt
help_ = '{prefix} {key}:{help}{suffix}'.format(
prefix=prefix,
suffix=suffix,
key=self.name,
help=' ' + self.help if self.help else '',
)
nl = '\n' + idt
last = len(self.children) - 1
help_ += nl.join(
key_desc.get_help(
style=style,
last=i == last,
).replace('\n', nl)
for i, key_desc in enumerate(self.children)
)
if style == 'rst':
help_ += '\n\n..\n'
return help_
class _KeyMap(dict):
def __init__(self, child, content):
self._child = child
super().__init__(content)
def __missing__(self, _):
return self._child
[docs]
class VariadicLevelKeyDesc(LevelKeyDesc):
"""
Level key descriptor that allows configuration-source-defined sub-level keys.
:param child: Variadic level. Its name will only be used for documentation
purposes, the configuration instances will be able to hold any string.
:type child: lisa.conf.LevelKeyDesc
:Variable keyword arguments: Forwarded to :class:`lisa.conf.LevelKeyDesc`.
This allows "instantiating" a whole sub-configuration for variable level
keys.
"""
def __init__(self, name, help, child, value_path=None):
super().__init__(name=name, help=help, children=[child], value_path=value_path)
@property
def _child(self):
return self.children[0]
@property
def _key_map(self):
return _KeyMap(self._child, super()._key_map)
[docs]
class DelegatedLevelKeyDesc(LevelKeyDesc):
"""
Level key descriptor that imports the keys from another
:class:`~lisa.conf.MultiSrcConfABC` subclass.
:param conf: Configuration class to extract keys from.
:type conf: MultiSrcConfABC
:Variable keyword arguments: Forwarded to :class:`lisa.conf.LevelKeyDesc`.
This allows embedding a configuration inside another one, mostly to be able
to split a configuration class while preserving backward compatibility.
.. note:: Only the children keys are taken from the passed level, other
information such as ``value_path`` are ignored and must be set
explicitly.
"""
def __init__(self, name, help, conf, **kwargs):
# Make a deepcopy to ensure we will not change the parent attribute of
# an existing structure.
level = copy.deepcopy(conf.STRUCTURE)
children = level.values()
super().__init__(
name=name,
help=help,
children=children,
**kwargs
)
[docs]
class TopLevelKeyDescBase(LevelKeyDesc):
"""
Top-level key descriptor, which defines the top-level key to use in the
configuration files.
:param levels: Levels of the top-level key, as a list of strings. Each item
specifies a level in a mapping, so that multiple classes can share the same
actual top-level without specific cooperation.
:type levels: list(str)
This top-level key is omitted in all interfaces except for the
configuration file, since it only reflects the configuration class
"""
def __init__(self, levels, *args, **kwargs):
levels = tuple(levels)
for level in levels:
super()._check_name(level)
self.levels = levels
name = '/'.join(levels)
super().__init__(name, *args, **kwargs)
@classmethod
def _check_name(cls, name):
pass
[docs]
def get_help(self, style=None, **kwargs):
if style == 'yaml':
return self.help
else:
return super().get_help(style=style, **kwargs)
[docs]
class TopLevelKeyDesc(TopLevelKeyDescBase):
"""
Regular top-level key descriptor, with only one level.
:param name: Name of the top-level key, as a string.
:type name: str
"""
def __init__(self, name, *args, **kwargs):
super().__init__([name], *args, **kwargs)
[docs]
class NestedTopLevelKeyDesc(TopLevelKeyDescBase):
"""
Top-level key descriptor, with an arbitrary amount of levels.
"""
[docs]
class MultiSrcConfABC(Serializable, abc.ABC):
_REGISTERED_TOPLEVEL_KEYS = {}
[docs]
@abc.abstractmethod
def to_map(self):
raise NotImplementedError
[docs]
@classmethod
@abc.abstractmethod
def from_map(cls, mapping, add_default_src=True):
raise NotImplementedError
[docs]
@classmethod
def from_yaml_map(cls, path, add_default_src=True):
"""
Allow reloading from a plain mapping, to avoid having to specify a tag
in the configuration file. The content is hosted under the top-level
key specified in ``STRUCTURE``.
:param path: Path to the YAML file
:type path: str
:param add_default_src: Add a default source if available for that
class.
:type add_default_src: bool
.. note:: Only load YAML files from trusted source as it can lead to
arbitrary code execution.
"""
toplevel_keys = cls.STRUCTURE.levels
get_content = lambda x: get_nested_key(x, toplevel_keys) or {}
def has_keys(keys, mapping):
if keys:
key, *keys = keys
try:
return has_keys(keys, mapping[key])
except KeyError:
return False
else:
return True
mapping = cls._from_path(path, fmt='yaml')
if not isinstance(mapping, Mapping):
raise ValueError(f'Top-level object is expected to be a mapping but got: {mapping.__class__.__qualname__}')
try:
data = get_content(mapping)
except KeyError:
# pylint: disable=raise-missing-from
raise TopLevelKeyError(toplevel_keys)
# "unwrap" an extra layer of toplevel key, to play well with !include
if len(data) == 1 and has_keys(toplevel_keys, data):
data = get_content(data)
return cls.from_map(data, add_default_src=add_default_src)
[docs]
@classmethod
def from_yaml_map_list(cls, path_list, add_default_src=True):
"""
Create a mapping of configuration classes to instance, by loading them
from the list of paths using :meth:`from_yaml_map` and merging them.
:param path_list: List of paths to YAML configuration files.
:type path_list: list(str)
:param add_default_src: See :meth:`from_yaml_map`.
.. note:: When merging, the configuration coming from the rightmost
path will win if it defines some keys that were also defined in another
file. Each file will be mapped to a different sources, named after
the basename of the file.
.. note:: Only load YAML files from trusted source as it can lead to
arbitrary code execution.
"""
# Make sure that all modules from LISA are loaded, so that
# get_subclasses will be accurate.
import_all_submodules(lisa, best_effort=True)
conf_cls_set = set(get_subclasses(cls, only_leaves=True))
conf_list = []
for conf_path in path_list:
# Try to build as many configurations instances from all the files we
# are given
for conf_cls in conf_cls_set:
try:
# Do not add the default source, to avoid overriding user
# configuration with the default one.
conf = conf_cls.from_yaml_map(conf_path, add_default_src=False)
except TopLevelKeyError:
continue
else:
conf_list.append((conf, conf_path))
def keyfunc(conf_and_path):
cls = type(conf_and_path[0])
# Sort according to class qualified name since classes are not
# comparable directly
return (cls.__module__ + '.' + cls.__qualname__), cls
# Then aggregate all the conf from each type, so they just act as
# alternative sources.
conf_map = {}
for (_, conf_cls), conf_and_path_seq in groupby(conf_list, key=keyfunc):
conf_and_path_list = list(conf_and_path_seq)
# Get the default configuration, and stack all user-defined keys
conf = conf_cls(add_default_src=add_default_src)
for conf_src, conf_path in conf_and_path_list:
src = os.path.basename(conf_path)
conf.add_src(src, conf_src)
conf_map[conf_cls] = conf
return conf_map
@property
def as_yaml_map(self):
"""
Give a mapping suitable for storing in a YAML configuration file.
.. seealso:: :meth:`to_yaml_map` and :meth:`from_yaml_map`
"""
def make_map(keys, data):
if keys:
key, *keys = keys
return {key: make_map(keys, data)}
else:
return data
data = self.to_map()
mapping = make_map(self.STRUCTURE.levels, data)
return mapping
[docs]
def to_yaml_map(self, path):
"""
Write a configuration file, with the key descriptions in comments.
:param path: Path to the file to write to.
:type path: str
"""
return self._to_path(self.as_yaml_map, path, fmt='yaml')
[docs]
def to_yaml_map_str(self, **kwargs):
"""
Return the content of the file that would be create by
:meth:`to_yaml_map` in a string.
:Variable keyword arguments: Forwarded to :meth:`to_yaml_map`
"""
content = io.StringIO()
self.to_yaml_map(content, **kwargs)
return content.getvalue()
[docs]
@classmethod
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Ignore abstract classes, since there can be no instance of them
if not inspect.isabstract(cls):
if cls.__doc__:
doc = inspect.getdoc(cls)
# Create a ResStructuredText preformatted block when rendering
# with Sphinx
style = 'rst' if is_running_sphinx() else None
generated_help = '\n' + cls.get_help(style=style)
indent = '\n '
try:
# Not all classes support these parameters
yaml_example = cls().to_yaml_map_str(
add_placeholder=True,
placeholder='_'
)
except TypeError:
yaml_example = cls().to_yaml_map_str()
if yaml_example:
yaml_example = ':Example YAML:\n\n.. code-block:: YAML\n' + indent + yaml_example.replace('\n', indent)
cls.__doc__ = doc.format(
generated_help=generated_help,
yaml_example=yaml_example
)
# Create the types for the keys that specify it, along with the getters
# to expose the values to exekall
if hasattr(cls, 'STRUCTURE') and isinstance(cls.STRUCTURE, TopLevelKeyDescBase):
# Ensure uniqueness of toplevel key
toplevel_keys = tuple(cls.STRUCTURE.levels)
def format_keys(keys):
return '/'.join(keys)
def eq_prefix_keys(key1, key2):
len_ = min(len(key1), len(key2))
return tuple(key1[:len_]) == tuple(key2[:len_])
offending = [
cls_
for keys, cls_ in cls._REGISTERED_TOPLEVEL_KEYS.items()
if eq_prefix_keys(toplevel_keys, keys)
]
# If the offending class has the same name and was declared in the
# same module, we ignore the conflict as this is probably arising
# from an import error in that module, that lead to the module
# being re-imported again (by another import statement).
if offending and not all(
(
cls_.__qualname__ == cls.__qualname__ and
cls_.__module__ == cls.__module__
)
for cls_ in offending
):
raise RuntimeError(f'Class {cls.__qualname__} cannot reuse top level key "{format_keys(toplevel_keys)}" as it is already used by {", ".join(map(str, offending))}')
else:
cls._REGISTERED_TOPLEVEL_KEYS[toplevel_keys] = cls
def flatten(structure):
for key_desc in structure.values():
if isinstance(key_desc, LevelKeyDesc):
yield from flatten(key_desc)
else:
yield key_desc
for key_desc in flatten(cls.STRUCTURE):
newtype_name = key_desc.newtype
if isinstance(key_desc, KeyDesc):
# We need a helper to make sure "key_desc" is bound to the
# right object, otherwise it will be referred by name only
# and will always have the value during the last iteration
# of the loop
#
# pylint complains about make_metacls being unused,
# which is a bug:
# https://github.com/PyCQA/pylint/issues/4020
def make_metacls(key_desc): # pylint: disable=unused-variable
# Implement __instancecheck__ on the metaclass allows
# isinstance(x, Newtype) to be true for any instance of any
# type given in KeyDesc.__init__(classinfo=...)
class NewtypeMeta(type):
def __instancecheck__(cls, x):
return check_type(x, key_desc.classinfo)
return NewtypeMeta
# Inherit from HideExekallID, since we don't want it to be
# shown in the exekall IDs.
class Newtype(HideExekallID, metaclass=make_metacls(key_desc)):
pass
Newtype.__name__ = newtype_name
Newtype.__qualname__ = f'{cls.__qualname__}.{newtype_name}'
Newtype.__module__ = cls.__module__
Newtype.__doc__ = key_desc.help
setattr(cls, newtype_name, Newtype)
def make_getter(cls, type_, key_desc):
def getter(self: cls) -> type_:
try:
return self.get_nested_key(key_desc.path[1:])
# We cannot afford to raise here, as the
# configuration instance might not hold a value for
# that key, but we still need to pass something to
# the user function.
except KeyError:
return None
getter_name = f'_get_typed_key_{type_.__name__}'
getter.__name__ = getter_name
getter.__qualname__ = f'{cls.__qualname__}.{getter_name}'
getter.__module__ = cls.__module__
return getter
newtype_getter = make_getter(cls, Newtype, key_desc)
setattr(cls, newtype_getter.__name__, newtype_getter)
class _HashableMultiSrcConf:
"""
Dummy wrapper to :class:`MultiSrcConf` that is hashable, each wrapper
instance being equal to other instances wrapping the same configuration
instance.
This allows using configuration as keys for instance-oriented usages like
indexing in dictionaries to hold instance-related information.
.. warning:: Python does not implement ``__hash__`` for mutable containers
for good reasons, make sure you understand why before using this class.
"""
def __init__(self, conf):
self.conf = conf
def __hash__(self):
return id(self.conf)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.conf is other.conf
else:
return False
class _SubLevelMap(dict):
def __init__(self, conf):
self._conf = conf
# Pre-hit existing known inner levels so that they will be listed when
# converting to a plain dictionary.
for key in conf._structure.keys():
try:
self[key]
except KeyError:
pass
def __missing__(self, key):
conf = self._conf
structure = conf._structure
key_desc = structure[key]
if isinstance(key_desc, LevelKeyDesc):
# Build the tree of objects for nested configuration mappings,
# lazily so that we can accomodate VariadicLevelKeyDesc
new = conf._nested_new(
key_desc_path=key_desc.path,
src_prio=conf._src_prio,
parent=conf,
)
self[key] = new
return new
else:
raise KeyError(f'No sublevel map for leaf key {key_desc.path} in {conf.__class__.__qualname__}')
[docs]
class MultiSrcConf(MultiSrcConfABC, Loggable, Mapping):
"""
Base class providing layered configuration management.
:param conf: collections.abc.Mapping to initialize the configuration with. This must be
optional, in which case it is assumed the object will contain a default
base configuration.
:type conf: collections.abc.Mapping
:param src: Name of the source added when passing ``conf``
:param src: str
The class inherits from :class:`collections.abc.Mapping`, which means it
can be used like a readonly dict. Writing to it is handled by a different
API that allows naming the source of values that are stored.
Each configuration key can be either a leaf key, that holds a value, or
a level key that allows to defined nested levels. The allowed keys is set
by the ``STRUCTURE`` class attribute.
Each leaf key can hold different values coming from different named
sources. By default, the last added source will have the highest priority
and will be served when looking up that key. A different priority order can
be defined for a specific key if needed.
.. seealso:: :class:`KeyDescBase`
This base class will modify the docstring of subclasses, using it as an
``str.format`` template with the following placeholders:
* ``{generated_help}``: snippet of ResStructuredText containing the
list of allowed keys.
* ``{yaml_example}``: example snippet of YAML
It will also create the types specified using ``newtype`` in the
:class:`KeyDesc`, along with a getter to expose it to ``exekall``.
.. note:: Since the dosctring is interpreted as a template, "{" and "}"
characters must be doubled to appear in the final output.
.. attention:: The layout of the configuration is typically guaranteed to
be backward-compatible in terms of accepted shape of input, but layout
of the configuration might change. This means that the path to a given
key could change as long as old input is still accepted. Types of
values can also be widened, so third party code re-using config classes
from :mod:`lisa` might have to evolve along the changes of
configuration.
"""
[docs]
@abc.abstractmethod
def STRUCTURE():
"""
Class attribute defining the structure of the configuration file, as a
instance of :class:`TopLevelKeyDescBase`
"""
DEFAULT_SRC = {}
"""
Source added automatically using :meth:`~lisa.conf.MultiSrcConf.add_src` under the name 'default'
when instances are built.
"""
def __init__(self, conf=None, src='user', add_default_src=True):
self._nested_init(
key_desc_path=[],
src_prio=[],
parent=None,
)
self.add_src(src, conf)
# Give some preset in the the lowest prio source
if self.DEFAULT_SRC and add_default_src:
self.add_src('default', self.DEFAULT_SRC, fallback=True)
[docs]
@classmethod
def get_help(cls, *args, **kwargs):
return cls.STRUCTURE.get_help(*args, **kwargs)
def _nested_init(self, key_desc_path, src_prio, parent):
"""Called to initialize nested instances of the class for nested
configuration levels."""
self._key_desc_path = key_desc_path
"Path in the structure of that level of configuration"
# Make a copy to avoid sharing it with the parent
self._src_prio = copy.copy(src_prio)
"List of sources in priority order (1st item is highest prio)"
self._src_override = {}
"Map of keys to map of source to values"
self._key_map = {}
"Key/value map of leaf values"
self._sublevel_map = {}
"Key/sublevel map of nested configuration objects"
self._parent = parent
"Parent instance of configuration"
self._as_hashable = _HashableMultiSrcConf(self)
"""
Hashable proxy, mostly designed to allow instance-oriented lookup in
mappings. DO NOT USE IT FOR OTHER PURPOSES. You have been warned.
"""
self._sublevel_map = _SubLevelMap(self)
@property
def _structure(self):
# The first level in the path is the top-level key, which must be
# skipped
path = self._key_desc_path[1:]
return get_nested_key(self.STRUCTURE, path)
@classmethod
def _nested_new(cls, *args, **kwargs):
new = cls.__new__(cls)
new._nested_init(*args, **kwargs)
return new
def _copy(self, copyf):
"""
Shallow copy of the nested configuration tree, without duplicating the
leaf values.
"""
cls = type(self)
new = cls.__new__(cls)
# This is eather going to be fixed up by the caller of __copy__ if we
# are in a recursive copy, or left as it is if that's either the root,
# or the highest sublevel that was copied (in case someone copies a
# part of a larger conf).
new._parent = self._parent
not_copied = {'_parent', '_sublevel_map', '_as_hashable'}
# make a shallow copy of the attributes so we don't end up sharing
# metadata
new.__dict__.update(
(key, copyf(val))
for key, val in self.__dict__.items()
if key not in not_copied
)
# Do the same with sublevels recursively, since we consider the nested
# levels as one "meta object". It's not really a deepcopy either, since
# we don't copy the values themselves.
def copy_sublevel(sublevel):
new_sublevel = copyf(sublevel)
# Fixup the parent
new_sublevel._parent = new
return new_sublevel
new._sublevel_map = {
key: copy_sublevel(sublevel)
for key, sublevel in self._sublevel_map.items()
}
new._as_hashable = _HashableMultiSrcConf(new)
return new
[docs]
def __copy__(self):
"""
Shallow copy of the nested configuration tree, without duplicating the
leaf values.
"""
return self._copy(copy.copy)
def __deepcopy__(self, memo):
return self._copy(lambda x: copy.deepcopy(x, memo))
[docs]
def to_map(self):
"""
Export the configuration as a mapping.
The return value should preserve key-specific priority override list,
which is not done if directly passing that instance to ``dict()``.
"""
mapping = dict()
# For each key, get the highest prio value
mapping['conf'] = self._get_effective_map()
src_override = self._get_nested_src_override()
if src_override:
mapping['source'] = src_override
return mapping
[docs]
@classmethod
def from_map(cls, mapping, add_default_src=True):
"""
Create a new configuration instance, using the output of :meth:`to_map`
"""
conf_src = mapping.get('conf', {})
src_override = mapping.get('source', {})
conf = cls(conf_src, add_default_src=add_default_src)
conf.force_src_nested(src_override)
return conf
[docs]
def add_src(self, src, conf, filter_none=False, fallback=False, inplace=True):
"""
Add a source of configuration.
:param src: Name of the soruce to add
:type src: str
:param conf: Nested mapping of key/values to overlay
:type conf: collections.abc.Mapping
:param filter_none: Ignores the keys that have a ``None`` value. That
simplifies the creation of the mapping, by having keys always
present. That should not be used if ``None`` value for a key is
expected, as opposit to not having that key set at all.
:type filter_none: bool
:param fallback: If True, the source will be added as a fallback, which
means at the end of the priority list. By default, the source will
have the highest priority and will be used unless a key-specific
priority override is setup.
:type fallback: bool
:param inplace: If ``True``, the object is modified. If ``False``, a
mutated copy is returned and the original object is left unmodified.
:type inplace: bool
This method provides a way to update the configuration, by importing a
mapping as a new source.
"""
class PlaceHolder(str):
def __new__(cls):
return super().__new__(cls, '...')
def __repr__(self):
return str(self)
class NonEscapedValue(str):
def __new__(cls, value):
value = repr(value)
# Make sure no individual value will print to a string that is too long
max_len = 50
if len(value) > max_len:
value = value[:max_len] + '...'
return super().__new__(cls, value)
def __repr__(self):
# pylint: disable=invalid-repr-returned
return self
def format_conf(conf):
if isinstance(conf, Mapping):
# Make sure that mappings won't be too long
max_mapping_len = 10
key_val = sorted(conf.items())
if len(key_val) > max_mapping_len:
key_val = key_val[:max_mapping_len]
key_val.append((PlaceHolder(), PlaceHolder()))
def format_val(val):
if isinstance(val, Mapping):
return format_conf(val)
else:
return NonEscapedValue(val)
return {
key: format_val(val)
for key, val in key_val
}
else:
return conf
self = self if inplace else copy.copy(self)
return self._add_src(
src, conf,
filter_none=filter_none, fallback=fallback
)
def _add_src(self, src, conf, filter_none=False, fallback=False):
conf = {} if conf is None else conf
if isinstance(conf, Mapping):
# Filter-out None values, so they won't override actual data from
# another source
if filter_none:
conf = {
k: v for k, v in conf.items()
if v is not None
}
# only validate at that level, since sublevel will take care of
# filtering then validating their own level
validated_conf = {
k: v for k, v in conf.items()
if not isinstance(self._structure[k], LevelKeyDesc)
}
self._structure.validate_val(validated_conf)
for key, val in conf.items():
key_desc = self._structure[key]
# Dispatch the nested mapping to the right sublevel
if isinstance(key_desc, LevelKeyDesc):
# sublevels have already been initialized when the root object
# was created.
self._sublevel_map[key]._add_src(src, val, filter_none=filter_none, fallback=fallback)
# Derived keys cannot be set, since they are purely derived from
# other keys
elif isinstance(key_desc, DerivedKeyDesc):
raise ValueError(f'Cannot set a value for a derived key "{key_desc.qualname}"', key_desc.qualname)
# Otherwise that is a leaf value that we store at that level
else:
self._key_map.setdefault(key, {})[src] = val
else:
# Non-mapping value are allowed if the level defines a subkey
# to assign to. We then craft a conf that sets that specific
# value.
key_desc = self._structure
value_path = key_desc.value_path
if value_path is None:
raise ValueError(f'Cannot set a value for the key level "{key_desc.qualname}"', key_desc.qualname)
else:
conf = set_nested_key({}, list(value_path), conf)
self._add_src(src, conf, filter_none=filter_none, fallback=fallback)
if src not in self._src_prio:
if fallback:
self._src_prio.append(src)
else:
self._src_prio.insert(0, src)
return self
[docs]
def set_default_src(self, src_prio):
"""
Set the default source priority list.
:param src_prio: list of source names, first is the highest priority
:type src_prio: collections.abc.Sequence(str)
Adding sources using :meth:`add_src` in the right order is preferable,
but the default priority order can be specified using that method.
"""
# pylint: disable=attribute-defined-outside-init
# Make a copy of the list to make sure it is not modified behind our back
self._src_prio = list(src_prio)
for sublevel in self._sublevel_map.values():
sublevel.set_default_src(src_prio)
[docs]
def force_src_nested(self, key_src_map):
"""
Force the source priority list for all the keys defined in the nested
mapping ``key_src_map``
:param key_src_map: nested mapping of keys to priority list of sources
:type key_src_map: collections.abc.Mapping
"""
for key, src_or_map in key_src_map.items():
key_desc = self._structure[key]
if isinstance(key_desc, LevelKeyDesc):
mapping = src_or_map
self._sublevel_map[key].force_src_nested(mapping)
else:
self.force_src(key, src_or_map)
[docs]
def force_src(self, key, src_prio):
"""
Force the source priority list for a given key
:param key: name of the key. Only leaf keys are allowed here, since
level keys have no source on their own.
:type key: str
:param src_prio: List of sources in priority order (first is highest
priority). Special value ``None`` can be used to remove the
key-specific priority override, so the default priority list will
be used instead.
:type src_prio: collections.abc.Sequence(str) or None
"""
key_desc = self._structure[key]
qual_key = key_desc.qualname
if isinstance(key_desc, LevelKeyDesc):
raise ValueError(f'Cannot force source of the sub-level "{qual_key}"', qual_key)
elif isinstance(key_desc, DerivedKeyDesc):
raise ValueError(f'Cannot force source of a derived key "{qual_key}"', qual_key)
else:
# None means removing the src override for that key
if src_prio is None:
self._src_override.pop(key, None)
else:
self._src_override[key] = src_prio
def _get_nested_src_override(self):
# Make a copy to avoid modifying it
override = copy.copy(self._src_override)
for key, sublevel in self._sublevel_map.items():
sublevel_override = sublevel._get_nested_src_override()
# Skip sublevel keys if they don't have any override to specify
if sublevel_override:
override[key] = sublevel_override
return override
def _get_effective_map(self, eval_deferred=True):
"""
Return the effective mapping by taking values from the highest
priority source for each key, recursively.
"""
mapping = {}
for key in self._key_map.keys():
try:
val = self.get_key(key, eval_deferred=eval_deferred)
# If the source of that key does not exist, we just ignore it
except KeyError:
pass
else:
mapping[key] = val
mapping.update(
(key, sublevel._get_effective_map(eval_deferred=eval_deferred))
for key, sublevel in self._sublevel_map.items()
)
return mapping
def _resolve_prio(self, key):
key_desc = self._structure[key]
if isinstance(key_desc, DerivedKeyDesc):
return [key_desc.get_src(self)]
elif key not in self._key_map:
return []
else:
# Get the priority list from the prio override list, or just the
# default prio list
src_list = self._src_override.get(key, self._src_prio)
# Only include a source if it holds an actual value for that key
src_list = [
src for src in src_list
if src in self._key_map[key]
]
return src_list
[docs]
def resolve_src(self, key):
"""
Get the source name that will be used to serve the value of ``key``.
"""
key_desc = self._structure[key]
if isinstance(key_desc, LevelKeyDesc):
key = key_desc.qualname
raise ValueError(f'Key "{key}" is a nested configuration level, it does not have a source on its own.', key)
# Get the priority list from the prio override list, or just the
# default prio list
src_prio = self._resolve_prio(key)
if src_prio:
return src_prio[0]
else:
key = key_desc.qualname
raise ConfigKeyError(
f'Could not find any source for key "{key}"',
key=key,
)
def _eval_deferred_val(self, src, key, error='raise'):
error = error or 'raise'
key_desc = self._structure[key]
val = self._key_map[key][src]
validate = True
if isinstance(val, DeferredValue):
try:
val = val(key_desc=key_desc)
except _DeferredInexistentError:
self.logger.debug(f'Lazy key "{key_desc.qualname}" was computed but no value was produced')
del self._key_map[key][src]
raise
# Wrap into a ConfigKeyError so that the user code can easily
# handle missing keys, and the original exception is still
# available as excep.__cause__ since it was chained with "from"
except Exception as e: # pylint: disable=broad-except
key_qualname = key_desc.qualname
msg = f'Could not compute "{key_qualname}" from source "{src}": {e}'
# Propagate ConfigKeyError as-is
if isinstance(e, ConfigKeyError):
excep = e
else:
excep = DeferredValueComputationError(
msg,
e,
key_qualname,
src,
)
# Chain explicitly like "raise X from Y" in case it needs
# to be wrapped to be raised later
excep.__cause__ = e
if error == 'raise':
raise excep from e
elif error == 'log':
self.logger.error(msg)
# Setup a bomb that will explode during a later access
val = DeferredExcep(excep)
validate = False
if validate:
key_desc.validate_val(val)
self._key_map[key][src] = val
return val
[docs]
def eval_deferred(self, cls=DeferredValue, src=None, resolve_src=True, error='raise'):
"""
Evaluate instances of :class:`DeferredValue` that can be used for
values that are expensive to compute.
:param cls: Only evaluate values of instances of that class. This can
be used to have different categories of :class:`DeferredValue` by
subclassing.
:type cls: subclass of :class:`DeferredValue`
:param src: If not ``None``, only evaluate values that were added under
that source name.
:type src: str or None
:param resolve_src: If ``True``, resolve the source of each key and
only compute deferred values for this source.
:type resolve_src: bool
:param error: If ``'raise'`` or ``None``, exception are raised as
usual. If ``log``, the exception is logged at error level.
:type error: str or None
"""
for key, src_map in self._key_map.items():
if src is None:
if resolve_src:
try:
src_ = self.resolve_src(key)
except ConfigKeyError:
continue
else:
src_map = {src_: src_map[src_]}
else:
pass
else:
src_map = {src: src_map[src]}
for src_, val in src_map.items():
if isinstance(val, cls):
try:
self._eval_deferred_val(src_, key, error=error)
except _DeferredInexistentError:
pass
for sublevel in self._sublevel_map.values():
sublevel.eval_deferred(cls, src=src, resolve_src=resolve_src, error=error)
[docs]
def __getstate__(self):
"""
Filter instances of :class:`DeferredValue` that are not computed
already since their runtime parameters will probably not be available
after deserialization.
If needed, call :meth:`eval_deferred` before serializing.
"""
# Filter-out DeferredValue key-value pairs before serialization
key_map = {
key: {
src: v
for src, v in src_map.items()
if not isinstance(v, DeferredValue)
}
for key, src_map in self._key_map.items()
}
# keys without any source are just removed
key_map = {
k: src_map for k, src_map in key_map.items()
if src_map
}
state = copy.copy(super().__getstate__())
state['_key_map'] = key_map
return state
[docs]
def get_key(self, key, src=None, eval_deferred=True, quiet=False):
"""
Get the value of the given key. It returns a deepcopy of the value.
The special key ``..`` can be used to refer to the parent in the
hierarchy.
:param key: name of the key to lookup
:type key: str
:param src: If not None, look up the value of the key in that source
:type src: str or None
:param eval_deferred: If True, evaluate instances of
:class:`DeferredValue` if needed
:type eval_deferred: bool
:param quiet: Avoid logging the access
:type quiet: bool
.. note:: Using the indexing operator ``self[key]`` is preferable in
most cases , but this method provides more parameters.
"""
if key == '..':
return self._parent
key_desc = self._structure[key]
if isinstance(key_desc, LevelKeyDesc):
return self._sublevel_map[key]
elif isinstance(key_desc, DerivedKeyDesc):
# Specifying a source is an error for a derived key
if src is not None:
key = key_desc.qualname
raise ValueError(f'Cannot specify the source when getting "{key}" since it is a derived key', key)
val = key_desc.compute_val(self, eval_deferred=eval_deferred)
src = self.resolve_src(key)
else:
while True:
# Compute the source to use for that key
if src is None:
src = self.resolve_src(key)
try:
val = self._key_map[key][src]
except KeyError:
# pylint: disable=raise-missing-from
key = key_desc.qualname
raise ConfigKeyError(
f'Key "{key}" is not available from source "{src}"',
key=key,
src=src,
)
if eval_deferred:
try:
val = self._eval_deferred_val(src, key, error='raise')
except _DeferredInexistentError:
continue
break
if isinstance(val, DeferredValue):
return val
else:
if key_desc.deepcopy_val:
val = copy.deepcopy(val)
return val
[docs]
def get_nested_key(self, key, *args, **kwargs):
"""
Same as :meth:`get_key` but works on a list of keys to access nested mappings.
:param key: List of nested keys.
:type key: list(str)
"""
val = self
for k in key:
val = val.get_key(k, *args, **kwargs)
return val
[docs]
def get_src_map(self, key):
"""
Get a mapping of all sources for the given ``key``, in priority order
(first item is the highest priority source).
"""
key_desc = self._structure[key]
if isinstance(key_desc, LevelKeyDesc):
key = key_desc.qualname
raise ValueError(f'Key "{key}" is a nested configuration level, it does not have a source on its own.', key)
def eval_key(src, key):
try:
val = self._eval_deferred_val(src, key, error='raise')
except _DeferredInexistentError:
return (False, None)
else:
return (True, val)
return OrderedDict(
(src, val)
for src, (add, val) in (
(src, eval_key(src, key))
for src in self._resolve_prio(key)
)
if add
)
def __str__(self):
return self.pretty_format()
def __getitem__(self, key):
return self.get_key(key)
def _get_key_names(self, eval_deferred=True):
if eval_deferred:
# Ensure we have an accurate list of keys, not including deferred value
# that turn out to not exist.
self.eval_deferred(cls=FilteredDeferredValue)
def has_key(key):
try:
self.get_key(key, src=None, eval_deferred=False, quiet=True)
except ConfigKeyError:
return False
else:
return True
return sorted(
[
key
for key in self._key_map.keys()
if has_key(key)
] + list(
self._sublevel_map.keys()
)
)
def _get_derived_key_names(self):
return sorted(
key
for key, key_desc in self._structure.items()
if (
isinstance(key_desc, DerivedKeyDesc)
and key_desc.can_be_computed(self)
)
)
def __iter__(self):
return iter(self._get_key_names())
def __len__(self):
return len(self._get_key_names())
[docs]
def items(self, eval_deferred=True):
"""
Override the default definition of
``collections.abc.Mapping.items()`` to allow not evaluating deferred
values if necessary.
"""
return (
(k, self.get_key(k, eval_deferred=eval_deferred, quiet=True))
for k in self._get_key_names(eval_deferred=eval_deferred)
)
def _ipython_key_completions_(self):
"Allow Jupyter keys completion in interactive notebooks"
regular_keys = set(self._get_key_names(eval_deferred=False))
# For autocompletion purposes, we show the derived keys
derived_keys = set(self._get_derived_key_names())
return sorted(regular_keys | derived_keys)
def _repr_pretty_(self, p, cycle):
"Pretty print instances in Jupyter notebooks"
# pylint: disable=unused-argument
p.text(self.pretty_format())
[docs]
class SimpleMultiSrcConf(MultiSrcConf):
"""
Like :class:`MultiSrcConf`, with a simpler config file.
``conf`` and ``source`` are not available, and the behaviour is as all keys
were located under a ``conf`` key. We do not allow overriding source for
this kind of configuration to keep the YAML interface simple and dict-like
"""
[docs]
@classmethod
def from_map(cls, *args, **kwargs):
return cls(*args, **kwargs)
[docs]
def to_map(self):
return dict(self._get_effective_map())
[docs]
def to_yaml_map(self, path, add_placeholder=False, placeholder='<no default>'):
"""
Write a configuration file, with the key descriptions in comments.
:param path: Path to the file to write to.
:type path: str
:param add_placeholder: If ``True``, a placeholder value will be used
for keys that don't have values. This allows creating template
configuration files that list all keys.
:type add_placeholder: bool
:param placeholder: Placeholder to use for missing values when
``add_placeholder`` is used.
:type placeholder: object
"""
def format_comment(key_desc):
comment = key_desc.get_help(style='yaml')
if comment:
return (comment[0].upper() + comment[1:]).strip()
else:
return comment
def add_help(key_desc, data, indent=0):
name = key_desc.name
if isinstance(key_desc, LevelKeyDesc):
if isinstance(key_desc, TopLevelKeyDesc):
levels = key_desc.levels
else:
levels = [key_desc.name]
try:
level_data = get_nested_key(data, levels)
except KeyError:
level_data = {}
level_data = CommentedMap(level_data)
for subkey_desc in key_desc.children:
if subkey_desc.name not in level_data:
if add_placeholder:
if isinstance(subkey_desc, DerivedKeyDesc):
continue
if not isinstance(subkey_desc, LevelKeyDesc):
level_data[subkey_desc.name] = placeholder
else:
continue
idt = indent + len(levels)
level_data.yaml_set_comment_before_after_key(
subkey_desc.name,
indent=idt * 4,
before='\n' + format_comment(subkey_desc),
)
add_help(subkey_desc, level_data, indent=idt)
if level_data:
set_nested_key(data, levels, level_data)
else:
with contextlib.suppress(KeyError):
del data[levels[0]]
data = CommentedMap(self.as_yaml_map)
data.yaml_set_start_comment(format_comment(self.STRUCTURE))
add_help(self.STRUCTURE, data)
if data:
return self._to_path(data, path, fmt='yaml-roundtrip')
else:
return None
[docs]
class Configurable(abc.ABC):
"""
Pair a regular class with a configuration class.
The pairing is achieved by inheriting from :class:`Configurable` and
setting ``CONF_CLASS`` attribute. The benefits are:
* The docstring of the class is processed as a string template and
``{configurable_params}`` is replaced with a Sphinx-compliant list of
parameters. The help and type of each parameter is extracted from the
configuration class.
* The ``DEFAULT_SRC`` attribute of the configuration class is updated
with non-``None`` default values of the class ``__init__`` parameters.
* The :meth:`~Configurable.conf_to_init_kwargs` method allows turning a
configuration object into a dictionary suitable for passing to
``__init__`` as ``**kwargs``.
* The :meth:`~Configurable.check_init_param` method allows checking
types of ``__init__`` parameters according to what is specified in the
configuration class.
Most of the time, the configuration keys and ``__init__`` parameters have
the same name (modulo underscore/dashes which are handled automatically).
In that case, the mapping between config keys and ``__init__`` parameters
is done without user intervention. When that is not the case, the
``INIT_KWARGS_KEY_MAP`` class attribute can be used. Its a dictionary with
keys being ``__init__`` parameter names, and values being path to
configuration key. That path is a list of strings to take into account
sublevels like ``['level-key', 'sublevel', 'foo']``.
.. note:: A given configuration class must be paired to only one class.
Otherwise, the ``DEFAULT_SRC`` conf class attribute will be updated
multiple times, leading to unexpected results.
.. note:: Some services offered by :class:`Configurable` are not extended
to subclasses of a class using it. For example, it would not make sense
to update ``DEFAULT_SRC`` using a subclass ``__init__`` parameters.
"""
INIT_KWARGS_KEY_MAP = {}
[docs]
@classmethod
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
dct = cls.__dict__
try:
# inherited CONF_CLASS will not be taken into account if we look at
# the dictionary directly
conf_cls = dct['CONF_CLASS']
except KeyError:
return
# Link the configuration to the signature of __init__
sig = inspect.signature(cls.__init__)
init_kwargs_key_map = cls._get_kwargs_key_map(sig, conf_cls)
# What was already there has priority over auto-detected bindings
init_kwargs_key_map.update(dct.get('INIT_KWARGS_KEY_MAP', {}))
cls.INIT_KWARGS_KEY_MAP = init_kwargs_key_map
# Create an instance with default configuration, to merge it with
# defaults taken from __init__
default_conf = conf_cls()
default_conf.add_src(
src='__init__-default',
conf=cls._get_default_conf(sig, init_kwargs_key_map),
# Default configuration set in the conf class still has priority
fallback=True,
# When an __init__ parameter has a None default value, we don't
# add any default value. That avoids failing the type check for
# keys that really need to be of a certain type when specified.
filter_none=True,
)
# Convert to a dict so that the Sphinx documentation is able to show
# the content of the source
conf_cls.DEFAULT_SRC = dict(default_conf._get_effective_map())
# Update the docstring by using the configuration help
docstring = inspect.getdoc(cls)
if docstring:
cls.__doc__ = docstring.format(
configurable_params=cls._get_rst_param_doc()
)
@staticmethod
def _get_kwargs_key_map(sig, conf_cls):
"""
Map implicitely keys in the conf class that matches param names.
"""
def iter_param_key(sig):
return (
(param, param.replace('_', '-'))
for param in sig.parameters.keys()
)
return {
param: [key]
for param, key in iter_param_key(sig)
if key in conf_cls.STRUCTURE
}
@staticmethod
def _get_default_conf(sig, kwargs_key_map):
"""
Get a default configuration source based on the the default parameter
values.
"""
default_conf = {}
for param, param_desc in sig.parameters.items():
try:
conf_path = kwargs_key_map[param]
except KeyError:
continue
else:
default = param_desc.default
if default is not param_desc.empty:
set_nested_key(default_conf, conf_path, default)
return default_conf
@classmethod
def _get_param_key_desc_map(cls):
return {
param: get_nested_key(cls.CONF_CLASS.STRUCTURE, conf_path)
for param, conf_path in cls.INIT_KWARGS_KEY_MAP.items()
}
@classmethod
def _get_rst_param_doc(cls):
# pylint: disable=no-value-for-parameter
return '\n'.join(
':param {param}: {help}\n:type {param}: {type}\n'.format(
param=param,
help=key_desc.help,
type=(
'collections.abc.Mapping'
if isinstance(key_desc, LevelKeyDesc) else
' or '.join(get_cls_name(t) for t in key_desc.classinfo)
),
)
for param, key_desc
in cls._get_param_key_desc_map().items()
)
[docs]
@classmethod
def conf_to_init_kwargs(cls, conf):
"""
Turn a configuration object into a dictionary suitable for passing to
``__init__`` as ``**kwargs``.
"""
kwargs = {}
for param, conf_path in cls.INIT_KWARGS_KEY_MAP.items():
try:
val = get_nested_key(conf, conf_path)
except KeyError:
continue
kwargs[param] = val
return kwargs
[docs]
@classmethod
def check_init_param(cls, **kwargs):
"""
Take the same parameters as ``__init__``, and check their types
according to what is specified in the configuration class.
:raises TypeError: When the wrong type is detected for a parameter.
"""
parameters = OrderedDict(inspect.signature(cls.__init__).parameters)
# pop `self` param
parameters.popitem(last=False)
mandatory_args = [
name
for name, param in parameters.items()
if (
param.default is inspect.Parameter.empty
and param.kind not in (
inspect.Parameter.VAR_POSITIONAL,
inspect.Parameter.VAR_KEYWORD,
)
)
]
param_key_desc_map = cls._get_param_key_desc_map()
missing_param = set(mandatory_args) - set(kwargs.keys())
if missing_param:
missing_key_paths = sorted(
key_desc.qualname
for param, key_desc in param_key_desc_map.items()
if param in missing_param
)
raise ConfigKeyError(f"Missing mandatory keys: {', '.join(missing_key_paths)}")
for param, key_desc in param_key_desc_map.items():
if param in kwargs:
key_desc.validate_val(kwargs[param])
# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab