# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Module that defines the configuration file of an AiiDA instance and functions to create and load it."""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import io
import os
import shutil
from aiida.common import json
from .migrations import CURRENT_CONFIG_VERSION, OLDEST_COMPATIBLE_CONFIG_VERSION
from .options import get_option, parse_option, NO_DEFAULT
from .profile import Profile
from .settings import DEFAULT_UMASK, DEFAULT_CONFIG_INDENT_SIZE
__all__ = ('Config',)
[docs]class Config(object): # pylint: disable=too-many-public-methods
"""Object that represents the configuration file of an AiiDA instance."""
KEY_VERSION = 'CONFIG_VERSION'
KEY_VERSION_CURRENT = 'CURRENT'
KEY_VERSION_OLDEST_COMPATIBLE = 'OLDEST_COMPATIBLE'
KEY_DEFAULT_PROFILE = 'default_profile'
KEY_PROFILES = 'profiles'
KEY_OPTIONS = 'options'
[docs] def __init__(self, filepath, config):
"""Instantiate a configuration object from a configuration dictionary and its filepath.
If an empty dictionary is passed, the constructor will create the skeleton configuration dictionary.
:param filepath: the absolute filepath of the configuration file
:param config: the content of the configuration file in dictionary form
"""
version = config.get(self.KEY_VERSION, {})
current_version = version.get(self.KEY_VERSION_CURRENT, CURRENT_CONFIG_VERSION)
compatible_version = version.get(self.KEY_VERSION_OLDEST_COMPATIBLE, OLDEST_COMPATIBLE_CONFIG_VERSION)
self._filepath = filepath
self._current_version = current_version
self._oldest_compatible_version = compatible_version
self._profiles = {}
known_keys = [self.KEY_VERSION, self.KEY_PROFILES, self.KEY_OPTIONS, self.KEY_DEFAULT_PROFILE]
unknown_keys = set(config.keys()) - set(known_keys)
if unknown_keys:
keys = ', '.join(unknown_keys)
self.handle_invalid('encountered unknown keys [{}] in `{}` which have been removed'.format(keys, filepath))
try:
self._options = config[self.KEY_OPTIONS]
except KeyError:
self._options = {}
try:
self._default_profile = config[self.KEY_DEFAULT_PROFILE]
except KeyError:
self._default_profile = None
for name, config_profile in config.get(self.KEY_PROFILES, {}).items():
if Profile.contains_unknown_keys(config_profile):
self.handle_invalid('encountered unknown keys in profile `{}` which have been removed'.format(name))
self._profiles[name] = Profile(name, config_profile, from_config=True)
[docs] def __eq__(self, other):
"""Two configurations are considered equal, when their dictionaries are equal."""
return self.dictionary == other.dictionary
[docs] def __ne__(self, other):
"""Two configurations are considered unequal, when their dictionaries are unequal."""
return self.dictionary != other.dictionary
[docs] def handle_invalid(self, message):
"""Handle an incoming invalid configuration dictionary.
The current content of the configuration file will be written to a backup file.
:param message: a string message to echo with describing the infraction
"""
from aiida.cmdline.utils import echo
filepath = self._filepath + '.bak'
self._backup(filepath)
echo.echo_warning(message)
echo.echo_warning('backup of the original config file written to: `{}`'.format(filepath))
@property
def dictionary(self):
"""Return the dictionary representation of the config as it would be written to file.
:return: dictionary representation of config as it should be written to file
"""
config = {
self.KEY_VERSION: self.version_settings,
self.KEY_PROFILES: {name: profile.dictionary for name, profile in self._profiles.items()}
}
if self._default_profile:
config[self.KEY_DEFAULT_PROFILE] = self._default_profile
if self._options:
config[self.KEY_OPTIONS] = self._options
return config
@property
def version(self):
return self._current_version
@version.setter
def version(self, version):
self._current_version = version
@property
def version_oldest_compatible(self):
return self._oldest_compatible_version
@version_oldest_compatible.setter
def version_oldest_compatible(self, version_oldest_compatible):
self._oldest_compatible_version = version_oldest_compatible
@property
def version_settings(self):
return {
self.KEY_VERSION_CURRENT: self.version,
self.KEY_VERSION_OLDEST_COMPATIBLE: self.version_oldest_compatible
}
@property
def filepath(self):
return self._filepath
@property
def dirpath(self):
return os.path.dirname(self.filepath)
@property
def default_profile_name(self):
"""Return the default profile name.
:return: the default profile name or None if not defined
"""
return self._default_profile
@property
def current_profile(self):
"""Return the currently loaded profile.
:return: the current profile or None if not defined
"""
from . import get_profile
return get_profile()
@property
def profile_names(self):
"""Return the list of profile names.
:return: list of profile names
"""
return list(self._profiles.keys())
@property
def profiles(self):
"""Return the list of profiles.
:return: the profiles
:rtype: list of `Profile` instances
"""
return list(self._profiles.values())
[docs] def validate_profile(self, name):
"""Validate that a profile exists.
:param name: name of the profile:
:raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file
"""
from aiida.common import exceptions
if name not in self.profile_names:
raise exceptions.ProfileConfigurationError('profile `{}` does not exist'.format(name))
[docs] def get_profile(self, name=None):
"""Return the profile for the given name or the default one if not specified.
:return: the profile instance or None if it does not exist
:raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file
"""
from aiida.common import exceptions
if not name and not self.default_profile_name:
raise exceptions.ProfileConfigurationError(
'no default profile defined: {}\n{}'.format(self._default_profile, self.dictionary)
)
if not name:
name = self.default_profile_name
self.validate_profile(name)
return self._profiles[name]
[docs] def add_profile(self, profile):
"""Add a profile to the configuration.
:param profile: the profile configuration dictionary
:return: self
"""
self._profiles[profile.name] = profile
return self
[docs] def update_profile(self, profile):
"""Update a profile in the configuration.
:param profile: the profile instance to update
:return: self
"""
self._profiles[profile.name] = profile
return self
[docs] def remove_profile(self, name):
"""Remove a profile from the configuration.
:param name: the name of the profile to remove
:raises aiida.common.ProfileConfigurationError: if the given profile does not exist
:return: self
"""
self.validate_profile(name)
self._profiles.pop(name)
return self
[docs] def set_default_profile(self, name, overwrite=False):
"""Set the given profile as the new default.
:param name: name of the profile to set as new default
:param overwrite: when True, set the profile as the new default even if a default profile is already defined
:raises aiida.common.ProfileConfigurationError: if the given profile does not exist
:return: self
"""
if self.default_profile_name and not overwrite:
return self
self.validate_profile(name)
self._default_profile = name
return self
@property
def options(self):
return self._options
@options.setter
def options(self, value):
self._options = value
[docs] def set_option(self, option_name, option_value, scope=None, override=True):
"""Set a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param option_value: the option value
:param scope: set the option for this profile or globally if not specified
:param override: boolean, if False, will not override the option if it already exists
"""
option, parsed_value = parse_option(option_name, option_value)
if parsed_value is not None:
value = parsed_value
elif option.default is not NO_DEFAULT:
value = option.default
else:
return
if not option.global_only and scope is not None:
self.get_profile(scope).set_option(option.key, value, override=override)
else:
if option.key not in self.options or override:
self.options[option.key] = value
[docs] def unset_option(self, option_name, scope=None):
"""Unset a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param scope: unset the option for this profile or globally if not specified
"""
option = get_option(option_name)
if scope is not None:
self.get_profile(scope).unset_option(option.key)
else:
self.options.pop(option.key, None)
[docs] def get_option(self, option_name, scope=None, default=True):
"""Get a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param scope: get the option for this profile or globally if not specified
:param default: boolean, If True will return the option default, even if not defined within the given scope
:return: the option value or None if not set for the given scope
"""
option = get_option(option_name)
# Default value is `None` unless `default=True` and the `option.default` is not `NO_DEFAULT`
default_value = option.default if default and option.default is not NO_DEFAULT else None
if scope is not None:
value = self.get_profile(scope).get_option(option.key, default_value)
else:
value = self.options.get(option.key, default_value)
return value
[docs] def store(self):
"""Write the current config to file."""
self._backup()
umask = os.umask(DEFAULT_UMASK)
try:
with io.open(self.filepath, 'wb') as handle:
json.dump(self.dictionary, handle, indent=DEFAULT_CONFIG_INDENT_SIZE)
finally:
os.umask(umask)
return self
[docs] def _backup(self, filepath=None):
"""Create a backup of the current config as it exists on disk."""
if not os.path.isfile(self.filepath):
return
umask = os.umask(DEFAULT_UMASK)
if filepath is None:
filepath = self.filepath + '~'
try:
shutil.copy(self.filepath, filepath)
finally:
os.umask(umask)