Source code for aiida.manage.configuration.config

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""Module that defines the configuration file of an AiiDA instance and functions to create and load it."""

import os
import shutil

from aiida.common import json

from .options import get_option, parse_option, NO_DEFAULT
from .profile import Profile

__all__ = ('Config',)


[docs]class Config: # pylint: disable=too-many-public-methods """Object that represents the configuration file of an AiiDA instance.""" KEY_VERSION = 'CONFIG_VERSION' KEY_VERSION_CURRENT = 'CURRENT' KEY_VERSION_OLDEST_COMPATIBLE = 'OLDEST_COMPATIBLE' KEY_DEFAULT_PROFILE = 'default_profile' KEY_PROFILES = 'profiles' KEY_OPTIONS = 'options'
[docs] @classmethod def from_file(cls, filepath): """Instantiate a configuration object from the contents of a given file. .. note:: if the filepath does not exist an empty file will be created with the current default configuration and will be written to disk. If the filepath does already exist but contains a configuration with an outdated schema, the content will be migrated and then written to disk. :param filepath: the absolute path to the configuration file :return: `Config` instance """ from aiida.cmdline.utils import echo from .migrations import check_and_migrate_config, config_needs_migrating try: with open(filepath, 'r', encoding='utf8') as handle: config = json.load(handle) except (IOError, OSError): config = Config(filepath, check_and_migrate_config({})) config.store() else: # If the configuration file needs to be migrated first create a specific backup so it can easily be reverted if config_needs_migrating(config): echo.echo_warning('current configuration file `{}` is outdated and will be migrated'.format(filepath)) filepath_backup = cls._backup(filepath) echo.echo_warning('original backed up to `{}`'.format(filepath_backup)) config = Config(filepath, check_and_migrate_config(config)) config.store() return config
[docs] @classmethod def _backup(cls, filepath): """Create a backup of the configuration file with the given filepath. :param filepath: absolute path to the configuration file to backup :return: the absolute path of the created backup """ from aiida.common import timezone filepath_backup = None # Keep generating a new backup filename based on the current time until it does not exist while not filepath_backup or os.path.isfile(filepath_backup): filepath_backup = '{}.{}'.format(filepath, timezone.now().strftime('%Y%m%d-%H%M%S.%f')) shutil.copy(filepath, filepath_backup) return filepath_backup
[docs] def __init__(self, filepath, config): """Instantiate a configuration object from a configuration dictionary and its filepath. If an empty dictionary is passed, the constructor will create the skeleton configuration dictionary. :param filepath: the absolute filepath of the configuration file :param config: the content of the configuration file in dictionary form """ from .migrations import CURRENT_CONFIG_VERSION, OLDEST_COMPATIBLE_CONFIG_VERSION version = config.get(self.KEY_VERSION, {}) current_version = version.get(self.KEY_VERSION_CURRENT, CURRENT_CONFIG_VERSION) compatible_version = version.get(self.KEY_VERSION_OLDEST_COMPATIBLE, OLDEST_COMPATIBLE_CONFIG_VERSION) self._filepath = filepath self._current_version = current_version self._oldest_compatible_version = compatible_version self._profiles = {} known_keys = [self.KEY_VERSION, self.KEY_PROFILES, self.KEY_OPTIONS, self.KEY_DEFAULT_PROFILE] unknown_keys = set(config.keys()) - set(known_keys) if unknown_keys: keys = ', '.join(unknown_keys) self.handle_invalid('encountered unknown keys [{}] in `{}` which have been removed'.format(keys, filepath)) try: self._options = config[self.KEY_OPTIONS] except KeyError: self._options = {} try: self._default_profile = config[self.KEY_DEFAULT_PROFILE] except KeyError: self._default_profile = None for name, config_profile in config.get(self.KEY_PROFILES, {}).items(): if Profile.contains_unknown_keys(config_profile): self.handle_invalid('encountered unknown keys in profile `{}` which have been removed'.format(name)) self._profiles[name] = Profile(name, config_profile, from_config=True)
[docs] def __eq__(self, other): """Two configurations are considered equal, when their dictionaries are equal.""" return self.dictionary == other.dictionary
[docs] def __ne__(self, other): """Two configurations are considered unequal, when their dictionaries are unequal.""" return self.dictionary != other.dictionary
[docs] def handle_invalid(self, message): """Handle an incoming invalid configuration dictionary. The current content of the configuration file will be written to a backup file. :param message: a string message to echo with describing the infraction """ from aiida.cmdline.utils import echo filepath_backup = self._backup(self.filepath) echo.echo_warning(message) echo.echo_warning('backup of the original config file written to: `{}`'.format(filepath_backup))
@property def dictionary(self): """Return the dictionary representation of the config as it would be written to file. :return: dictionary representation of config as it should be written to file """ config = { self.KEY_VERSION: self.version_settings, self.KEY_PROFILES: {name: profile.dictionary for name, profile in self._profiles.items()} } if self._default_profile: config[self.KEY_DEFAULT_PROFILE] = self._default_profile if self._options: config[self.KEY_OPTIONS] = self._options return config @property def version(self): return self._current_version @version.setter def version(self, version): self._current_version = version @property def version_oldest_compatible(self): return self._oldest_compatible_version @version_oldest_compatible.setter def version_oldest_compatible(self, version_oldest_compatible): self._oldest_compatible_version = version_oldest_compatible @property def version_settings(self): return { self.KEY_VERSION_CURRENT: self.version, self.KEY_VERSION_OLDEST_COMPATIBLE: self.version_oldest_compatible } @property def filepath(self): return self._filepath @property def dirpath(self): return os.path.dirname(self.filepath) @property def default_profile_name(self): """Return the default profile name. :return: the default profile name or None if not defined """ return self._default_profile @property def current_profile(self): """Return the currently loaded profile. :return: the current profile or None if not defined """ from . import get_profile return get_profile() @property def profile_names(self): """Return the list of profile names. :return: list of profile names """ return list(self._profiles.keys()) @property def profiles(self): """Return the list of profiles. :return: the profiles :rtype: list of `Profile` instances """ return list(self._profiles.values())
[docs] def validate_profile(self, name): """Validate that a profile exists. :param name: name of the profile: :raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file """ from aiida.common import exceptions if name not in self.profile_names: raise exceptions.ProfileConfigurationError('profile `{}` does not exist'.format(name))
[docs] def get_profile(self, name=None): """Return the profile for the given name or the default one if not specified. :return: the profile instance or None if it does not exist :raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file """ from aiida.common import exceptions if not name and not self.default_profile_name: raise exceptions.ProfileConfigurationError( 'no default profile defined: {}\n{}'.format(self._default_profile, self.dictionary) ) if not name: name = self.default_profile_name self.validate_profile(name) return self._profiles[name]
[docs] def add_profile(self, profile): """Add a profile to the configuration. :param profile: the profile configuration dictionary :return: self """ self._profiles[profile.name] = profile return self
[docs] def update_profile(self, profile): """Update a profile in the configuration. :param profile: the profile instance to update :return: self """ self._profiles[profile.name] = profile return self
[docs] def remove_profile(self, name): """Remove a profile from the configuration. :param name: the name of the profile to remove :raises aiida.common.ProfileConfigurationError: if the given profile does not exist :return: self """ self.validate_profile(name) self._profiles.pop(name) return self
[docs] def set_default_profile(self, name, overwrite=False): """Set the given profile as the new default. :param name: name of the profile to set as new default :param overwrite: when True, set the profile as the new default even if a default profile is already defined :raises aiida.common.ProfileConfigurationError: if the given profile does not exist :return: self """ if self.default_profile_name and not overwrite: return self self.validate_profile(name) self._default_profile = name return self
@property def options(self): return self._options @options.setter def options(self, value): self._options = value
[docs] def set_option(self, option_name, option_value, scope=None, override=True): """Set a configuration option for a certain scope. :param option_name: the name of the configuration option :param option_value: the option value :param scope: set the option for this profile or globally if not specified :param override: boolean, if False, will not override the option if it already exists """ option, parsed_value = parse_option(option_name, option_value) if parsed_value is not None: value = parsed_value elif option.default is not NO_DEFAULT: value = option.default else: return if not option.global_only and scope is not None: self.get_profile(scope).set_option(option.key, value, override=override) else: if option.key not in self.options or override: self.options[option.key] = value
[docs] def unset_option(self, option_name, scope=None): """Unset a configuration option for a certain scope. :param option_name: the name of the configuration option :param scope: unset the option for this profile or globally if not specified """ option = get_option(option_name) if scope is not None: self.get_profile(scope).unset_option(option.key) else: self.options.pop(option.key, None)
[docs] def get_option(self, option_name, scope=None, default=True): """Get a configuration option for a certain scope. :param option_name: the name of the configuration option :param scope: get the option for this profile or globally if not specified :param default: boolean, If True will return the option default, even if not defined within the given scope :return: the option value or None if not set for the given scope """ option = get_option(option_name) # Default value is `None` unless `default=True` and the `option.default` is not `NO_DEFAULT` default_value = option.default if default and option.default is not NO_DEFAULT else None if scope is not None: value = self.get_profile(scope).get_option(option.key, default_value) else: value = self.options.get(option.key, default_value) return value
[docs] def store(self): """Write the current config to file. .. note:: if the configuration file already exists on disk and its contents differ from those in memory, a backup of the original file on disk will be created before overwriting it. :return: self """ import tempfile from aiida.common.files import md5_from_filelike, md5_file # If the filepath of this configuration does not yet exist, simply write it. if not os.path.isfile(self.filepath): with open(self.filepath, 'wb') as handle: self._write(handle) return self # Otherwise, we write the content to a temporary file and compare its md5 checksum with the current config on # disk. When the checksums differ, we first create a backup and only then overwrite the existing file. with tempfile.NamedTemporaryFile() as handle: self._write(handle) handle.seek(0) if md5_from_filelike(handle) != md5_file(self.filepath): self._backup(self.filepath) shutil.copy(handle.name, self.filepath) return self
[docs] def _write(self, filelike): """Write the contents of `self.dictionary` to the given file handle. :param filelike: the filelike object to write the current configuration to """ from .settings import DEFAULT_UMASK, DEFAULT_CONFIG_INDENT_SIZE umask = os.umask(DEFAULT_UMASK) try: json.dump(self.dictionary, filelike, indent=DEFAULT_CONFIG_INDENT_SIZE) finally: os.umask(umask)