Source code for aiida.orm.implementation.storage_backend

# -*- coding: utf-8 -*-
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit               #
"""Generic backend related objects"""
from __future__ import annotations

import abc
from typing import TYPE_CHECKING, Any, ContextManager, List, Optional, Sequence, TypeVar, Union

    from aiida.manage.configuration.profile import Profile
    from aiida.orm.autogroup import AutogroupManager
    from aiida.orm.entities import EntityTypes
    from aiida.orm.implementation import (
    from aiida.orm.users import User
    from aiida.repository.backend.abstract import AbstractRepositoryBackend

__all__ = ('StorageBackend',)

TransactionType = TypeVar('TransactionType')  # pylint: disable=invalid-name

[docs] class StorageBackend(abc.ABC): # pylint: disable=too-many-public-methods """Abstraction for a backend to read/write persistent data for a profile's provenance graph. AiiDA splits data storage into two sources: - Searchable data, which is stored in the database and can be queried using the QueryBuilder - Non-searchable (binary) data, which is stored in the repository and can be loaded using the RepositoryBackend The two sources are inter-linked by the ``Node.base.repository.metadata``. Once stored, the leaf values of this dictionary must be valid pointers to object keys in the repository. For a completely new storage, the ``initialise`` method should be called first. This will automatically initialise the repository and the database with the current schema. The class methods,`version_profile` and `migrate` should be able to be called for existing storage, at any supported schema version. But an instance of this class should be created only for the latest schema version. """
[docs] @classmethod @abc.abstractmethod def version_head(cls) -> str: """Return the head schema version of this storage backend type."""
[docs] @classmethod @abc.abstractmethod def version_profile(cls, profile: 'Profile') -> Optional[str]: """Return the schema version of the given profile's storage, or None for empty/uninitialised storage. :raises: `~aiida.common.exceptions.UnreachableStorage` if the storage cannot be accessed """
[docs] @classmethod @abc.abstractmethod def initialise(cls, profile: 'Profile', reset: bool = False) -> bool: """Initialise the storage backend. This is typically used once when a new storage backed is created. If this method returns without exceptions the storage backend is ready for use. If the backend already seems initialised, this method is a no-op. :param reset: If ``true``, destroy the backend if it already exists including all of its data before recreating and initialising it. This is useful for example for test profiles that need to be reset before or after tests having run. :returns: ``True`` if the storage was initialised by the function call, ``False`` if it was already initialised. """
[docs] @classmethod @abc.abstractmethod def migrate(cls, profile: 'Profile') -> None: """Migrate the storage of a profile to the latest schema version. If the schema version is already the latest version, this method does nothing. If the storage is uninitialised, this method will raise an exception. :raises: :class`~aiida.common.exceptions.UnreachableStorage` if the storage cannot be accessed. :raises: :class:`~aiida.common.exceptions.StorageMigrationError` if the storage is not initialised. """
[docs] @abc.abstractmethod def __init__(self, profile: 'Profile') -> None: """Initialize the backend, for this profile. :raises: `~aiida.common.exceptions.UnreachableStorage` if the storage cannot be accessed :raises: `~aiida.common.exceptions.IncompatibleStorageSchema` if the profile's storage schema is not at the latest version (and thus should be migrated) :raises: :raises: :class:`aiida.common.exceptions.CorruptStorage` if the storage is internally inconsistent """ from aiida.orm.autogroup import AutogroupManager self._profile = profile self._default_user: Optional['User'] = None self._autogroup = AutogroupManager(self)
[docs] @abc.abstractmethod def __str__(self) -> str: """Return a string showing connection details for this instance."""
@property def profile(self) -> 'Profile': """Return the profile for this backend.""" return self._profile @property def autogroup(self) -> 'AutogroupManager': """Return the autogroup manager for this backend.""" return self._autogroup
[docs] def version(self) -> str: """Return the schema version of the profile's storage.""" version = self.version_profile(self.profile) assert version is not None return version
[docs] @abc.abstractmethod def close(self): """Close the storage access."""
@property @abc.abstractmethod def is_closed(self) -> bool: """Return whether the storage is closed."""
[docs] @abc.abstractmethod def _clear(self) -> None: """Clear the storage, removing all data. .. warning:: This is a destructive operation, and should only be used for testing purposes. """ from aiida.orm.autogroup import AutogroupManager self.reset_default_user() self._autogroup = AutogroupManager(self)
[docs] def reset_default_user(self) -> None: """Reset the default user. This should be done when the default user of the storage backend is changed on the corresponding profile because the old default user is cached on this instance. """ self._default_user = None
@property @abc.abstractmethod def authinfos(self) -> 'BackendAuthInfoCollection': """Return the collection of authorisation information objects""" @property @abc.abstractmethod def comments(self) -> 'BackendCommentCollection': """Return the collection of comments""" @property @abc.abstractmethod def computers(self) -> 'BackendComputerCollection': """Return the collection of computers""" @property @abc.abstractmethod def groups(self) -> 'BackendGroupCollection': """Return the collection of groups""" @property @abc.abstractmethod def logs(self) -> 'BackendLogCollection': """Return the collection of logs""" @property @abc.abstractmethod def nodes(self) -> 'BackendNodeCollection': """Return the collection of nodes""" @property @abc.abstractmethod def users(self) -> 'BackendUserCollection': """Return the collection of users""" @property def default_user(self) -> Optional['User']: """Return the default user for the profile, if it has been created. This is cached, since it is a frequently used operation, for creating other entities. """ from aiida.orm import QueryBuilder, User if self._default_user is None and self.profile.default_user_email: query = QueryBuilder(self).append(User, filters={'email': self.profile.default_user_email}) self._default_user = query.first(flat=True) return self._default_user
[docs] @abc.abstractmethod def query(self) -> 'BackendQueryBuilder': """Return an instance of a query builder implementation for this backend"""
[docs] @abc.abstractmethod def transaction(self) -> ContextManager[Any]: """ Get a context manager that can be used as a transaction context for a series of backend operations. If there is an exception within the context then the changes will be rolled back and the state will be as before entering. Transactions can be nested. :return: a context manager to group database operations """
@property @abc.abstractmethod def in_transaction(self) -> bool: """Return whether a transaction is currently active."""
[docs] @abc.abstractmethod def bulk_insert(self, entity_type: 'EntityTypes', rows: List[dict], allow_defaults: bool = False) -> List[int]: """Insert a list of entities into the database, directly into a backend transaction. :param entity_type: The type of the entity :param data: A list of dictionaries, containing all fields of the backend model, except the `id` field (a.k.a primary key), which will be generated dynamically :param allow_defaults: If ``False``, assert that each row contains all fields (except primary key(s)), otherwise, allow default values for missing fields. :raises: ``IntegrityError`` if the keys in a row are not a subset of the columns in the table :returns: The list of generated primary keys for the entities """
[docs] @abc.abstractmethod def bulk_update(self, entity_type: 'EntityTypes', rows: List[dict]) -> None: """Update a list of entities in the database, directly with a backend transaction. :param entity_type: The type of the entity :param data: A list of dictionaries, containing fields of the backend model to update, and the `id` field (a.k.a primary key) :raises: ``IntegrityError`` if the keys in a row are not a subset of the columns in the table """
[docs] @abc.abstractmethod def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]): """Delete all nodes corresponding to pks in the input and any links to/from them. This method is intended to be used within a transaction context. :param pks_to_delete: a sequence of node pks to delete :raises: ``AssertionError`` if a transaction is not active """
[docs] @abc.abstractmethod def get_repository(self) -> 'AbstractRepositoryBackend': """Return the object repository configured for this backend."""
[docs] @abc.abstractmethod def set_global_variable( self, key: str, value: Union[None, str, int, float], description: Optional[str] = None, overwrite=True ) -> None: """Set a global variable in the storage. :param key: the key of the setting :param value: the value of the setting :param description: the description of the setting (optional) :param overwrite: if True, overwrite the setting if it already exists :raises: `ValueError` if the key already exists and `overwrite` is False """
[docs] @abc.abstractmethod def get_global_variable(self, key: str) -> Union[None, str, int, float]: """Return a global variable from the storage. :param key: the key of the setting :raises: `KeyError` if the setting does not exist """
[docs] @abc.abstractmethod def maintain(self, full: bool = False, dry_run: bool = False, **kwargs) -> None: """Perform maintenance tasks on the storage. If `full == True`, then this method may attempt to block the profile associated with the storage to guarantee the safety of its procedures. This will not only prevent any other subsequent process from accessing that profile, but will also first check if there is already any process using it and raise if that is the case. The user will have to manually stop any processes that is currently accessing the profile themselves or wait for it to finish on its own. :param full: flag to perform operations that require to stop using the profile to be maintained. :param dry_run: flag to only print the actions that would be taken without actually executing them. """
[docs] def get_info(self, detailed: bool = False) -> dict: """Return general information on the storage. :param detailed: flag to request more detailed information about the content of the storage. :returns: a nested dict with the relevant information. """ return {'entities': self.get_orm_entities(detailed=detailed)}
[docs] def get_orm_entities(self, detailed: bool = False) -> dict: """Return a mapping with an overview of the storage contents regarding ORM entities. :param detailed: flag to request more detailed information about the content of the storage. :returns: a nested dict with the relevant information. """ from aiida.orm import Comment, Computer, Group, Log, Node, QueryBuilder, User data = {} query_user = QueryBuilder(self).append(User, project=['email']) data['Users'] = {'count': query_user.count()} if detailed: data['Users']['emails'] = sorted({email for email, in query_user.iterall() if email is not None}) query_comp = QueryBuilder(self).append(Computer, project=['label']) data['Computers'] = {'count': query_comp.count()} if detailed: data['Computers']['labels'] = sorted({comp for comp, in query_comp.iterall() if comp is not None}) count = QueryBuilder(self).append(Node).count() data['Nodes'] = {'count': count} if detailed: node_types = sorted({ typ for typ, in QueryBuilder(self).append(Node, project=['node_type']).iterall() if typ is not None }) data['Nodes']['node_types'] = node_types process_types = sorted({ typ for typ, in QueryBuilder(self).append(Node, project=['process_type']).iterall() if typ is not None }) data['Nodes']['process_types'] = [p for p in process_types if p] query_group = QueryBuilder(self).append(Group, project=['type_string']) data['Groups'] = {'count': query_group.count()} if detailed: data['Groups']['type_strings'] = sorted({typ for typ, in query_group.iterall() if typ is not None}) count = QueryBuilder(self).append(Comment).count() data['Comments'] = {'count': count} count = QueryBuilder(self).append(Log).count() data['Logs'] = {'count': count} count = QueryBuilder(self).append(entity_type='link').count() data['Links'] = {'count': count} return data