# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""AiiDA profile related code"""
import collections
from copy import deepcopy
import os
import pathlib
from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional, Type
from aiida.common import exceptions
from .options import parse_option
from .settings import DAEMON_DIR, DAEMON_LOG_DIR
if TYPE_CHECKING:
from aiida.orm.implementation import StorageBackend
__all__ = ('Profile',)
CIRCUS_PID_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'circus-{}.pid')
DAEMON_PID_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'aiida-{}.pid')
CIRCUS_LOG_FILE_TEMPLATE = os.path.join(DAEMON_LOG_DIR, 'circus-{}.log')
DAEMON_LOG_FILE_TEMPLATE = os.path.join(DAEMON_LOG_DIR, 'aiida-{}.log')
CIRCUS_PORT_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'circus-{}.port')
CIRCUS_SOCKET_FILE_TEMPATE = os.path.join(DAEMON_DIR, 'circus-{}.sockets')
CIRCUS_CONTROLLER_SOCKET_TEMPLATE = 'circus.c.sock'
CIRCUS_PUBSUB_SOCKET_TEMPLATE = 'circus.p.sock'
CIRCUS_STATS_SOCKET_TEMPLATE = 'circus.s.sock'
[docs]class Profile: # pylint: disable=too-many-public-methods
"""Class that models a profile as it is stored in the configuration file of an AiiDA instance."""
KEY_UUID = 'PROFILE_UUID'
KEY_DEFAULT_USER_EMAIL = 'default_user_email'
KEY_STORAGE = 'storage'
KEY_PROCESS = 'process_control'
KEY_STORAGE_BACKEND = 'backend'
KEY_STORAGE_CONFIG = 'config'
KEY_PROCESS_BACKEND = 'backend'
KEY_PROCESS_CONFIG = 'config'
KEY_OPTIONS = 'options'
KEY_TEST_PROFILE = 'test_profile'
# keys that are expected to be in the parsed configuration
REQUIRED_KEYS = (
KEY_STORAGE,
KEY_PROCESS,
)
[docs] def __init__(self, name: str, config: Mapping[str, Any], validate=True):
"""Load a profile with the profile configuration."""
if not isinstance(config, collections.abc.Mapping):
raise TypeError(f'config should be a mapping but is {type(config)}')
if validate and not set(config.keys()).issuperset(self.REQUIRED_KEYS):
raise exceptions.ConfigurationError(
f'profile {name!r} configuration does not contain all required keys: {self.REQUIRED_KEYS}'
)
self._name = name
self._attributes: Dict[str, Any] = deepcopy(config)
# Create a default UUID if not specified
if self._attributes.get(self.KEY_UUID, None) is None:
from uuid import uuid4
self._attributes[self.KEY_UUID] = uuid4().hex
[docs] def __str__(self) -> str:
return f'Profile<{self.uuid!r} ({self.name!r})>'
[docs] def copy(self):
"""Return a copy of the profile."""
return self.__class__(self.name, self._attributes)
@property
def uuid(self) -> str:
"""Return the profile uuid.
:return: string UUID
"""
return self._attributes[self.KEY_UUID]
@uuid.setter
def uuid(self, value: str) -> None:
self._attributes[self.KEY_UUID] = value
@property
def default_user_email(self) -> Optional[str]:
"""Return the default user email."""
return self._attributes.get(self.KEY_DEFAULT_USER_EMAIL, None)
@default_user_email.setter
def default_user_email(self, value: Optional[str]) -> None:
"""Set the default user email."""
self._attributes[self.KEY_DEFAULT_USER_EMAIL] = value
@property
def storage_backend(self) -> str:
"""Return the type of the storage backend."""
return self._attributes[self.KEY_STORAGE][self.KEY_STORAGE_BACKEND]
@property
def storage_config(self) -> Dict[str, Any]:
"""Return the configuration required by the storage backend."""
return self._attributes[self.KEY_STORAGE][self.KEY_STORAGE_CONFIG]
[docs] def set_storage(self, name: str, config: Dict[str, Any]) -> None:
"""Set the storage backend and its configuration.
:param name: the name of the storage backend
:param config: the configuration of the storage backend
"""
self._attributes.setdefault(self.KEY_STORAGE, {})
self._attributes[self.KEY_STORAGE][self.KEY_STORAGE_BACKEND] = name
self._attributes[self.KEY_STORAGE][self.KEY_STORAGE_CONFIG] = config
@property
def storage_cls(self) -> Type['StorageBackend']:
"""Return the storage backend class for this profile."""
if self.storage_backend == 'psql_dos':
from aiida.storage.psql_dos.backend import PsqlDosBackend
return PsqlDosBackend
if self.storage_backend == 'sqlite_zip':
from aiida.storage.sqlite_zip.backend import SqliteZipBackend
return SqliteZipBackend
raise ValueError(f'unknown storage backend type: {self.storage_backend}')
@property
def process_control_backend(self) -> str:
"""Return the type of the process control backend."""
return self._attributes[self.KEY_PROCESS][self.KEY_PROCESS_BACKEND]
@property
def process_control_config(self) -> Dict[str, Any]:
"""Return the configuration required by the process control backend."""
return self._attributes[self.KEY_PROCESS][self.KEY_PROCESS_CONFIG]
[docs] def set_process_controller(self, name: str, config: Dict[str, Any]) -> None:
"""Set the process control backend and its configuration.
:param name: the name of the process backend
:param config: the configuration of the process backend
"""
self._attributes.setdefault(self.KEY_PROCESS, {})
self._attributes[self.KEY_PROCESS][self.KEY_PROCESS_BACKEND] = name
self._attributes[self.KEY_PROCESS][self.KEY_PROCESS_CONFIG] = config
@property
def options(self):
self._attributes.setdefault(self.KEY_OPTIONS, {})
return self._attributes[self.KEY_OPTIONS]
@options.setter
def options(self, value):
self._attributes[self.KEY_OPTIONS] = value
[docs] def get_option(self, option_key, default=None):
return self.options.get(option_key, default)
[docs] def set_option(self, option_key, value, override=True):
"""Set a configuration option for a certain scope.
:param option_key: the key of the configuration option
:param option_value: the option value
:param override: boolean, if False, will not override the option if it already exists
"""
_, parsed_value = parse_option(option_key, value) # ensure the value is validated
if option_key not in self.options or override:
self.options[option_key] = parsed_value
[docs] def unset_option(self, option_key):
self.options.pop(option_key, None)
@property
def name(self):
"""Return the profile name.
:return: the profile name
"""
return self._name
@property
def dictionary(self) -> Dict[str, Any]:
"""Return the profile attributes as a dictionary with keys as it is stored in the config
:return: the profile configuration dictionary
"""
return self._attributes
@property
def is_test_profile(self) -> bool:
"""Return whether the profile is a test profile
:return: boolean, True if test profile, False otherwise
"""
# Check explicitly for ``True`` for safety. If an invalid value is defined, we default to treating it as not
# a test profile as that can unintentionally clear the database.
return self._attributes.get(self.KEY_TEST_PROFILE, False) is True
@is_test_profile.setter
def is_test_profile(self, value: bool) -> None:
"""Set whether the profile is a test profile.
:param value: boolean indicating whether this profile is a test profile.
"""
self._attributes[self.KEY_TEST_PROFILE] = value
@property
def repository_path(self) -> pathlib.Path:
"""Return the absolute path of the repository configured for this profile.
The URI should be in the format `protocol://address`
:note: At the moment, only the file protocol is supported.
:return: absolute filepath of the profile's file repository
"""
from urllib.parse import urlparse
parts = urlparse(self.storage_config['repository_uri'])
if parts.scheme != 'file':
raise exceptions.ConfigurationError('invalid repository protocol, only the local `file://` is supported')
if not os.path.isabs(parts.path):
raise exceptions.ConfigurationError('invalid repository URI: the path has to be absolute')
return pathlib.Path(os.path.expanduser(parts.path))
@property
def rmq_prefix(self) -> str:
"""Return the prefix that should be used for RMQ resources
:return: the rmq prefix string
"""
return f'aiida-{self.uuid}'
[docs] def get_rmq_url(self) -> str:
"""Return the RMQ url for this profile."""
from aiida.manage.external.rmq import get_rmq_url
if self.process_control_backend != 'rabbitmq':
raise exceptions.ConfigurationError(
f"invalid process control backend, only 'rabbitmq' is supported: {self.process_control_backend}"
)
kwargs = {key[7:]: val for key, val in self.process_control_config.items() if key.startswith('broker_')}
additional_kwargs = kwargs.pop('parameters', {})
return get_rmq_url(**kwargs, **additional_kwargs)
@property
def filepaths(self):
"""Return the filepaths used by this profile.
:return: a dictionary of filepaths
"""
return {
'circus': {
'log': CIRCUS_LOG_FILE_TEMPLATE.format(self.name),
'pid': CIRCUS_PID_FILE_TEMPLATE.format(self.name),
'port': CIRCUS_PORT_FILE_TEMPLATE.format(self.name),
'socket': {
'file': CIRCUS_SOCKET_FILE_TEMPATE.format(self.name),
'controller': CIRCUS_CONTROLLER_SOCKET_TEMPLATE,
'pubsub': CIRCUS_PUBSUB_SOCKET_TEMPLATE,
'stats': CIRCUS_STATS_SOCKET_TEMPLATE,
}
},
'daemon': {
'log': DAEMON_LOG_FILE_TEMPLATE.format(self.name),
'pid': DAEMON_PID_FILE_TEMPLATE.format(self.name),
}
}