###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Module that defines the configuration file of an AiiDA instance and functions to create and load it.
Despite the import of the annotations backport below which enables postponed type annotation evaluation as implemented
with PEP 563 (https://peps.python.org/pep-0563/), this is not compatible with ``pydantic`` for Python 3.9 and older (
See https://github.com/pydantic/pydantic/issues/2678 for details).
"""
from __future__ import annotations
import codecs
import contextlib
import io
import json
import os
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type
from pydantic import (
BaseModel,
ConfigDict,
Field,
ValidationError,
field_serializer,
field_validator,
)
from aiida.common.exceptions import ConfigurationError, EntryPointError, StorageMigrationError
from aiida.common.log import AIIDA_LOGGER, LogLevels
from .options import Option, get_option, get_option_names, parse_option
from .profile import Profile
if TYPE_CHECKING:
from aiida.orm.implementation.storage_backend import StorageBackend
LOGGER = AIIDA_LOGGER.getChild(__file__)
[docs]
class ConfigVersionSchema(BaseModel, defer_build=True):
"""Schema for the version configuration of an AiiDA instance."""
CURRENT: int
OLDEST_COMPATIBLE: int
[docs]
class ProfileOptionsSchema(BaseModel, defer_build=True):
"""Schema for the options of an AiiDA profile."""
model_config = ConfigDict(use_enum_values=True)
runner__poll__interval: int = Field(60, description='Polling interval in seconds to be used by process runners.')
daemon__default_workers: int = Field(
1, description='Default number of workers to be launched by `verdi daemon start`.'
)
daemon__timeout: int = Field(
2,
description='Used to set default timeout in the `DaemonClient` for calls to the daemon.',
)
daemon__worker_process_slots: int = Field(
200, description='Maximum number of concurrent process tasks that each daemon worker can handle.'
)
daemon__recursion_limit: int = Field(3000, description='Maximum recursion depth for the daemon workers.')
db__batch_size: int = Field(
100000,
description='Batch size for bulk CREATE operations in the database. Avoids hitting MaxAllocSize of PostgreSQL '
'(1GB) when creating large numbers of database records in one go.',
)
verdi__shell__auto_import: str = Field(
':',
description='Additional modules/functions/classes to be automatically loaded in `verdi shell`, split by `:`.',
)
logging__aiida_loglevel: LogLevels = Field(
'REPORT', description='Minimum level to log to daemon log and the `DbLog` table for the `aiida` logger.'
)
logging__verdi_loglevel: LogLevels = Field(
'REPORT', description='Minimum level to log to console when running a `verdi` command.'
)
logging__db_loglevel: LogLevels = Field('REPORT', description='Minimum level to log to the DbLog table.')
logging__plumpy_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `plumpy` logger.'
)
logging__kiwipy_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `kiwipy` logger'
)
logging__paramiko_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `paramiko` logger'
)
logging__alembic_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `alembic` logger'
)
logging__sqlalchemy_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `sqlalchemy` logger'
)
logging__circus_loglevel: LogLevels = Field(
'INFO', description='Minimum level to log to daemon log and the `DbLog` table for the `circus` logger'
)
logging__aiopika_loglevel: LogLevels = Field(
'WARNING', description='Minimum level to log to daemon log and the `DbLog` table for the `aiopika` logger'
)
warnings__showdeprecations: bool = Field(True, description='Whether to print AiiDA deprecation warnings.')
warnings__rabbitmq_version: bool = Field(
True, description='Whether to print a warning when an incompatible version of RabbitMQ is configured.'
)
transport__task_retry_initial_interval: int = Field(
20, description='Initial time interval for the exponential backoff mechanism.'
)
transport__task_maximum_attempts: int = Field(
5, description='Maximum number of transport task attempts before a Process is Paused.'
)
rmq__task_timeout: int = Field(10, description='Timeout in seconds for communications with RabbitMQ.')
storage__sandbox: Optional[str] = Field(
None, description='Absolute path to the directory to store sandbox folders.'
)
caching__default_enabled: bool = Field(False, description='Enable calculation caching by default.')
caching__enabled_for: List[str] = Field([], description='Calculation entry points to enable caching on.')
caching__disabled_for: List[str] = Field([], description='Calculation entry points to disable caching on.')
[docs]
@field_validator('caching__enabled_for', 'caching__disabled_for')
@classmethod
def validate_caching_identifier_pattern(cls, value: List[str]) -> List[str]:
"""Validate the caching identifier patterns."""
from aiida.manage.caching import _validate_identifier_pattern
for identifier in value:
_validate_identifier_pattern(identifier=identifier, strict=True)
return value
[docs]
class GlobalOptionsSchema(ProfileOptionsSchema, defer_build=True):
"""Schema for the global options of an AiiDA instance."""
autofill__user__email: Optional[str] = Field(
None, description='Default user email to use when creating new profiles.'
)
autofill__user__first_name: Optional[str] = Field(
None, description='Default user first name to use when creating new profiles.'
)
autofill__user__last_name: Optional[str] = Field(
None, description='Default user last name to use when creating new profiles.'
)
autofill__user__institution: Optional[str] = Field(
None, description='Default user institution to use when creating new profiles.'
)
rest_api__profile_switching: bool = Field(
False, description='Toggle whether the profile can be specified in requests submitted to the REST API.'
)
warnings__development_version: bool = Field(
True,
description='Whether to print a warning when a profile is loaded while a development version is installed.',
)
[docs]
class ProfileStorageConfig(BaseModel, defer_build=True):
"""Schema for the storage backend configuration of an AiiDA profile."""
backend: str
config: Dict[str, Any]
[docs]
class ProcessControlConfig(BaseModel, defer_build=True):
"""Schema for the process control configuration of an AiiDA profile."""
broker_protocol: str = Field('amqp', description='Protocol for connecting to the message broker.')
broker_username: str = Field('guest', description='Username for message broker authentication.')
broker_password: str = Field('guest', description='Password for message broker.')
broker_host: str = Field('127.0.0.1', description='Hostname of the message broker.')
broker_port: int = Field(5432, description='Port of the message broker.')
broker_virtual_host: str = Field('', description='Virtual host to use for the message broker.')
broker_parameters: dict[str, Any] = Field(
default_factory=dict, description='Arguments to be encoded as query parameters.'
)
[docs]
class ProfileSchema(BaseModel, defer_build=True):
"""Schema for the configuration of an AiiDA profile."""
uuid: str = Field(description='A UUID that uniquely identifies the profile.', default_factory=uuid.uuid4)
storage: ProfileStorageConfig
process_control: ProcessControlConfig
default_user_email: Optional[str] = None
test_profile: bool = False
options: Optional[ProfileOptionsSchema] = None
[docs]
@field_serializer('uuid')
def serialize_dt(self, value: uuid.UUID, _info):
return str(value)
[docs]
class ConfigSchema(BaseModel, defer_build=True):
"""Schema for the configuration of an AiiDA instance."""
CONFIG_VERSION: Optional[ConfigVersionSchema] = None
profiles: Optional[dict[str, ProfileSchema]] = None
options: Optional[GlobalOptionsSchema] = None
default_profile: Optional[str] = None
[docs]
class Config:
"""Object that represents the configuration file of an AiiDA instance."""
KEY_VERSION = 'CONFIG_VERSION'
KEY_VERSION_CURRENT = 'CURRENT'
KEY_VERSION_OLDEST_COMPATIBLE = 'OLDEST_COMPATIBLE'
KEY_DEFAULT_PROFILE = 'default_profile'
KEY_PROFILES = 'profiles'
KEY_OPTIONS = 'options'
KEY_SCHEMA = '$schema'
[docs]
@classmethod
def from_file(cls, filepath):
"""Instantiate a configuration object from the contents of a given file.
.. note:: if the filepath does not exist an empty file will be created with the current default configuration
and will be written to disk. If the filepath does already exist but contains a configuration with an
outdated schema, the content will be migrated and then written to disk.
:param filepath: the absolute path to the configuration file
:return: `Config` instance
"""
from aiida.cmdline.utils import echo
from .migrations import check_and_migrate_config, config_needs_migrating
try:
with open(filepath, 'rb') as handle:
config = json.load(handle)
except FileNotFoundError:
config = Config(filepath, check_and_migrate_config({}))
config.store()
else:
migrated = False
# If the configuration file needs to be migrated first create a specific backup so it can easily be reverted
if config_needs_migrating(config, filepath):
migrated = True
echo.echo_warning(f'current configuration file `{filepath}` is outdated and will be migrated')
filepath_backup = cls._backup(filepath)
echo.echo_warning(f'original backed up to `{filepath_backup}`')
config = Config(filepath, check_and_migrate_config(config))
if migrated:
config.store()
return config
[docs]
@classmethod
def _backup(cls, filepath):
"""Create a backup of the configuration file with the given filepath.
:param filepath: absolute path to the configuration file to backup
:return: the absolute path of the created backup
"""
import shutil
from aiida.common import timezone
filepath_backup = None
# Keep generating a new backup filename based on the current time until it does not exist
while not filepath_backup or os.path.isfile(filepath_backup):
filepath_backup = f"{filepath}.{timezone.now().strftime('%Y%m%d-%H%M%S.%f')}"
shutil.copy(filepath, filepath_backup)
return filepath_backup
[docs]
@staticmethod
def validate(config: dict, filepath: Optional[str] = None):
"""Validate a configuration dictionary."""
try:
ConfigSchema(**config)
except ValidationError as exception:
raise ConfigurationError(f'invalid config schema: {filepath}: {exception!s}')
[docs]
def __init__(self, filepath: str, config: dict, validate: bool = True):
"""Instantiate a configuration object from a configuration dictionary and its filepath.
If an empty dictionary is passed, the constructor will create the skeleton configuration dictionary.
:param filepath: the absolute filepath of the configuration file
:param config: the content of the configuration file in dictionary form
:param validate: validate the dictionary against the schema
"""
from .migrations import CURRENT_CONFIG_VERSION, OLDEST_COMPATIBLE_CONFIG_VERSION
if validate:
self.validate(config, filepath)
self._filepath = filepath
self._schema = config.get(self.KEY_SCHEMA, None)
version = config.get(self.KEY_VERSION, {})
self._current_version = version.get(self.KEY_VERSION_CURRENT, CURRENT_CONFIG_VERSION)
self._oldest_compatible_version = version.get(
self.KEY_VERSION_OLDEST_COMPATIBLE, OLDEST_COMPATIBLE_CONFIG_VERSION
)
self._profiles = {}
known_keys = [self.KEY_SCHEMA, self.KEY_VERSION, self.KEY_PROFILES, self.KEY_OPTIONS, self.KEY_DEFAULT_PROFILE]
unknown_keys = set(config.keys()) - set(known_keys)
if unknown_keys:
keys = ', '.join(unknown_keys)
self.handle_invalid(f'encountered unknown keys [{keys}] in `{filepath}` which have been removed')
try:
self._options = config[self.KEY_OPTIONS]
except KeyError:
self._options = {}
try:
self._default_profile = config[self.KEY_DEFAULT_PROFILE]
except KeyError:
self._default_profile = None
for name, config_profile in config.get(self.KEY_PROFILES, {}).items():
self._profiles[name] = Profile(name, config_profile)
[docs]
def __eq__(self, other):
"""Two configurations are considered equal, when their dictionaries are equal."""
return self.dictionary == other.dictionary
[docs]
def __ne__(self, other):
"""Two configurations are considered unequal, when their dictionaries are unequal."""
return self.dictionary != other.dictionary
[docs]
def handle_invalid(self, message):
"""Handle an incoming invalid configuration dictionary.
The current content of the configuration file will be written to a backup file.
:param message: a string message to echo with describing the infraction
"""
from aiida.cmdline.utils import echo
filepath_backup = self._backup(self.filepath)
echo.echo_warning(message)
echo.echo_warning(f'backup of the original config file written to: `{filepath_backup}`')
@property
def dictionary(self) -> dict:
"""Return the dictionary representation of the config as it would be written to file.
:return: dictionary representation of config as it should be written to file
"""
config = {}
if self._schema:
config[self.KEY_SCHEMA] = self._schema
config[self.KEY_VERSION] = self.version_settings
config[self.KEY_PROFILES] = {name: profile.dictionary for name, profile in self._profiles.items()}
if self._default_profile:
config[self.KEY_DEFAULT_PROFILE] = self._default_profile
if self._options:
config[self.KEY_OPTIONS] = self._options
return config
@property
def version(self):
return self._current_version
@version.setter
def version(self, version):
self._current_version = version
@property
def version_oldest_compatible(self):
return self._oldest_compatible_version
@version_oldest_compatible.setter
def version_oldest_compatible(self, version_oldest_compatible):
self._oldest_compatible_version = version_oldest_compatible
@property
def version_settings(self):
return {
self.KEY_VERSION_CURRENT: self.version,
self.KEY_VERSION_OLDEST_COMPATIBLE: self.version_oldest_compatible,
}
@property
def filepath(self):
return self._filepath
@property
def dirpath(self):
return os.path.dirname(self.filepath)
@property
def default_profile_name(self):
"""Return the default profile name.
:return: the default profile name or None if not defined
"""
return self._default_profile
@property
def profile_names(self):
"""Return the list of profile names.
:return: list of profile names
"""
return list(self._profiles.keys())
@property
def profiles(self):
"""Return the list of profiles.
:return: the profiles
:rtype: list of `Profile` instances
"""
return list(self._profiles.values())
[docs]
def validate_profile(self, name):
"""Validate that a profile exists.
:param name: name of the profile:
:raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file
"""
from aiida.common import exceptions
if name not in self.profile_names:
raise exceptions.ProfileConfigurationError(f'profile `{name}` does not exist')
[docs]
def get_profile(self, name: Optional[str] = None) -> Profile:
"""Return the profile for the given name or the default one if not specified.
:return: the profile instance or None if it does not exist
:raises aiida.common.ProfileConfigurationError: if the name is not found in the configuration file
"""
from aiida.common import exceptions
if not name and not self.default_profile_name:
raise exceptions.ProfileConfigurationError(
f'no default profile defined: {self._default_profile}\n{self.dictionary}'
)
if not name:
name = self.default_profile_name
self.validate_profile(name)
return self._profiles[name]
[docs]
def create_profile(self, name: str, storage_cls: Type['StorageBackend'], storage_config: dict[str, str]) -> Profile:
"""Create a new profile and initialise its storage.
:param name: The profile name.
:param storage_cls: The :class:`aiida.orm.implementation.storage_backend.StorageBackend` implementation to use.
:param storage_config: The configuration necessary to initialise and connect to the storage backend.
:returns: The created profile.
:raises ValueError: If the profile already exists.
:raises TypeError: If the ``storage_cls`` is not a subclass of
:class:`aiida.orm.implementation.storage_backend.StorageBackend`.
:raises EntryPointError: If the ``storage_cls`` does not have an associated entry point.
:raises StorageMigrationError: If the storage cannot be initialised.
"""
from aiida.orm.implementation.storage_backend import StorageBackend
from aiida.plugins.entry_point import get_entry_point_from_class
if name in self.profile_names:
raise ValueError(f'The profile `{name}` already exists.')
if not issubclass(storage_cls, StorageBackend):
raise TypeError(
f'The `storage_cls={storage_cls}` is not subclass of `aiida.orm.implementationStorageBackend`.'
)
_, storage_entry_point = get_entry_point_from_class(storage_cls.__module__, storage_cls.__name__)
if storage_entry_point is None:
raise EntryPointError(f'`{storage_cls}` does not have a registered entry point.')
profile = Profile(
name,
{
'storage': {
'backend': storage_entry_point.name,
'config': storage_config,
},
'process_control': {
'backend': 'rabbitmq',
'config': {
'broker_protocol': 'amqp',
'broker_username': 'guest',
'broker_password': 'guest',
'broker_host': '127.0.0.1',
'broker_port': 5672,
'broker_virtual_host': '',
},
},
},
)
LOGGER.report('Initialising the storage backend.')
try:
with contextlib.redirect_stdout(io.StringIO()):
profile.storage_cls.initialise(profile)
except Exception as exception:
raise StorageMigrationError(
f'Storage backend initialisation failed, probably because the configuration is incorrect:\n{exception}'
)
LOGGER.report('Storage initialisation completed.')
self.add_profile(profile)
self.store()
return profile
[docs]
def add_profile(self, profile):
"""Add a profile to the configuration.
:param profile: the profile configuration dictionary
:return: self
"""
self._profiles[profile.name] = profile
return self
[docs]
def update_profile(self, profile):
"""Update a profile in the configuration.
:param profile: the profile instance to update
:return: self
"""
self._profiles[profile.name] = profile
return self
[docs]
def remove_profile(self, name):
"""Remove a profile from the configuration.
:param name: the name of the profile to remove
:raises aiida.common.ProfileConfigurationError: if the given profile does not exist
:return: self
"""
self.validate_profile(name)
self._profiles.pop(name)
return self
[docs]
def delete_profile(self, name: str, delete_storage: bool = True) -> None:
"""Delete a profile including its storage.
:param delete_storage: Whether to delete the storage with all its data or not.
"""
from aiida.plugins import StorageFactory
profile = self.get_profile(name)
is_default_profile: bool = profile.name == self.default_profile_name
if delete_storage:
storage_cls = StorageFactory(profile.storage_backend)
storage = storage_cls(profile)
storage.delete()
LOGGER.report(f'Data storage deleted, configuration was: {profile.storage_config}')
else:
LOGGER.report(f'Data storage not deleted, configuration is: {profile.storage_config}')
self.remove_profile(name)
if is_default_profile and not self.profile_names:
LOGGER.warning(f'`{name}` was the default profile, no profiles remain to set as default.')
self.store()
return
if is_default_profile:
LOGGER.warning(f'`{name}` was the default profile, setting `{self.profile_names[0]}` as the new default.')
self.set_default_profile(self.profile_names[0], overwrite=True)
self.store()
[docs]
def set_default_profile(self, name, overwrite=False):
"""Set the given profile as the new default.
:param name: name of the profile to set as new default
:param overwrite: when True, set the profile as the new default even if a default profile is already defined
:raises aiida.common.ProfileConfigurationError: if the given profile does not exist
:return: self
"""
if self.default_profile_name and not overwrite:
return self
self.validate_profile(name)
self._default_profile = name
return self
[docs]
def set_default_user_email(self, profile: Profile, user_email: str) -> None:
"""Set the default user for the given profile.
.. warning::
This does not update the cached default user on the storage backend associated with the profile. To do so,
use :meth:`aiida.manage.manager.Manager.set_default_user_email` instead.
:param profile: The profile to update.
:param user_email: The email of the user to set as the default user.
"""
profile.default_user_email = user_email
self.update_profile(profile)
self.store()
@property
def options(self):
return self._options
@options.setter
def options(self, value):
self._options = value
[docs]
def set_option(self, option_name, option_value, scope=None, override=True):
"""Set a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param option_value: the option value
:param scope: set the option for this profile or globally if not specified
:param override: boolean, if False, will not override the option if it already exists
:returns: the parsed value (potentially cast to a valid type)
"""
option, parsed_value = parse_option(option_name, option_value)
if parsed_value is not None:
value = parsed_value
elif option.default is not None:
value = option.default
else:
return
if not option.global_only and scope is not None:
self.get_profile(scope).set_option(option.name, value, override=override)
elif option.name not in self.options or override:
self.options[option.name] = value
return value
[docs]
def unset_option(self, option_name: str, scope=None):
"""Unset a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param scope: unset the option for this profile or globally if not specified
"""
option = get_option(option_name)
if scope is not None:
self.get_profile(scope).unset_option(option.name)
else:
self.options.pop(option.name, None)
[docs]
def get_option(self, option_name, scope=None, default=True):
"""Get a configuration option for a certain scope.
:param option_name: the name of the configuration option
:param scope: get the option for this profile or globally if not specified
:param default: boolean, If True will return the option default, even if not defined within the given scope
:return: the option value or None if not set for the given scope
"""
option = get_option(option_name)
default_value = option.default if default else None
if scope is not None:
value = self.get_profile(scope).get_option(option.name, default_value)
else:
value = self.options.get(option.name, default_value)
return value
[docs]
def get_options(self, scope: Optional[str] = None) -> Dict[str, Tuple[Option, str, Any]]:
"""Return a dictionary of all option values and their source ('profile', 'global', or 'default').
:param scope: the profile name or globally if not specified
:returns: (option, source, value)
"""
profile = self.get_profile(scope) if scope else None
output = {}
for name in get_option_names():
option = get_option(name)
if profile and name in profile.options:
value = profile.options.get(name)
source = 'profile'
elif name in self.options:
value = self.options.get(name)
source = 'global'
elif option.default is not None:
value = option.default
source = 'default'
else:
continue
output[name] = (option, source, value)
return output
[docs]
def store(self):
"""Write the current config to file.
.. note:: if the configuration file already exists on disk and its contents differ from those in memory, a
backup of the original file on disk will be created before overwriting it.
:return: self
"""
import tempfile
from aiida.common.files import md5_file, md5_from_filelike
from .settings import DEFAULT_CONFIG_INDENT_SIZE
# If the filepath of this configuration does not yet exist, simply write it.
if not os.path.isfile(self.filepath):
self._atomic_write()
return self
# Otherwise, we write the content to a temporary file and compare its md5 checksum with the current config on
# disk. When the checksums differ, we first create a backup and only then overwrite the existing file.
with tempfile.NamedTemporaryFile() as handle:
json.dump(self.dictionary, codecs.getwriter('utf-8')(handle), indent=DEFAULT_CONFIG_INDENT_SIZE)
handle.seek(0)
if md5_from_filelike(handle) != md5_file(self.filepath):
self._backup(self.filepath)
self._atomic_write()
return self
[docs]
def _atomic_write(self, filepath=None):
"""Write the config as it is in memory, i.e. the contents of ``self.dictionary``, to disk.
.. note:: this command will write the config from memory to a temporary file in the same directory as the
target file ``filepath``. It will then use ``os.rename`` to move the temporary file to ``filepath`` which
will be overwritten if it already exists. The ``os.rename`` is the operation that gives the best guarantee
of being atomic within the limitations of the application.
:param filepath: optional filepath to write the contents to, if not specified, the default filename is used.
"""
import tempfile
from .settings import DEFAULT_CONFIG_INDENT_SIZE, DEFAULT_UMASK
umask = os.umask(DEFAULT_UMASK)
if filepath is None:
filepath = self.filepath
# Create a temporary file in the same directory as the target filepath, which guarantees that the temporary
# file is on the same filesystem, which is necessary to be able to use ``os.rename``. Since we are moving the
# temporary file, we should also tell the tempfile to not be automatically deleted as that will raise.
with tempfile.NamedTemporaryFile(dir=os.path.dirname(filepath), delete=False, mode='w') as handle:
try:
json.dump(self.dictionary, handle, indent=DEFAULT_CONFIG_INDENT_SIZE)
finally:
os.umask(umask)
handle.flush()
os.rename(handle.name, self.filepath)