Source code for aiida.storage.sqlite_zip.backend

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""The table models are dynamically generated from the sqlalchemy backend models."""
from __future__ import annotations

from contextlib import contextmanager
from functools import singledispatch
from pathlib import Path
import tempfile
from typing import BinaryIO, Iterable, Iterator, Optional, Sequence, Tuple, Type, cast
from zipfile import ZipFile, is_zipfile

from archive_path import extract_file_in_zip
from sqlalchemy.orm import Session

from aiida.common.exceptions import AiidaException, ClosedStorage, CorruptStorage
from aiida.manage import Profile
from aiida.orm.entities import EntityTypes
from aiida.orm.implementation import StorageBackend
from aiida.repository.backend.abstract import AbstractRepositoryBackend
from aiida.storage.psql_dos.orm import authinfos, comments, computers, entities, groups, logs, nodes, users
from aiida.storage.psql_dos.orm.querybuilder import SqlaQueryBuilder
from aiida.storage.psql_dos.orm.utils import ModelWrapper

from . import models
from .migrator import get_schema_version_head, validate_storage
from .utils import DB_FILENAME, REPO_FOLDER, create_sqla_engine, extract_metadata, read_version


[docs]class SqliteZipBackend(StorageBackend): # pylint: disable=too-many-public-methods """A read-only backend for a sqlite/zip format. The storage format uses an SQLite database and repository files, within a folder or zipfile. The content of the folder/zipfile should be:: |- metadata.json |- db.sqlite3 |- repo/ |- hashkey1 |- hashkey2 ... """
[docs] @classmethod def version_head(cls) -> str: return get_schema_version_head()
[docs] @staticmethod def create_profile(path: str | Path) -> Profile: """Create a new profile instance for this backend, from the path to the zip file.""" profile_name = Path(path).name return Profile( profile_name, { 'storage': { 'backend': 'sqlite_zip', 'config': { 'path': str(path) } }, 'process_control': { 'backend': 'null', 'config': {} } } )
[docs] @classmethod def version_profile(cls, profile: Profile) -> Optional[str]: return read_version(profile.storage_config['path'], search_limit=None)
[docs] @classmethod def migrate(cls, profile: Profile): raise NotImplementedError('use the migrate function directly.')
[docs] def __init__(self, profile: Profile): super().__init__(profile) self._path = Path(profile.storage_config['path']) validate_storage(self._path) # lazy open the archive zipfile and extract the database file self._db_file: Optional[Path] = None self._session: Optional[Session] = None self._repo: Optional[_RoBackendRepository] = None self._closed = False
[docs] def __str__(self) -> str: state = 'closed' if self.is_closed else 'open' return f'SqliteZip storage (read-only) [{state}] @ {self._path}'
@property def is_closed(self) -> bool: return self._closed
[docs] def close(self): """Close the backend""" if self._session: self._session.close() if self._db_file and self._db_file.exists(): self._db_file.unlink() if self._repo: self._repo.close() self._session = None self._db_file = None self._repo = None self._closed = True
[docs] def get_session(self) -> Session: """Return an SQLAlchemy session.""" if self._closed: raise ClosedStorage(str(self)) if self._session is None: if is_zipfile(self._path): _, path = tempfile.mkstemp() db_file = self._db_file = Path(path) with db_file.open('wb') as handle: try: extract_file_in_zip(self._path, DB_FILENAME, handle, search_limit=4) except Exception as exc: raise CorruptStorage(f'database could not be read: {exc}') from exc else: db_file = self._path / DB_FILENAME if not db_file.exists(): raise CorruptStorage(f'database could not be read: non-existent {db_file}') self._session = Session(create_sqla_engine(db_file)) return self._session
[docs] def get_repository(self) -> '_RoBackendRepository': if self._closed: raise ClosedStorage(str(self)) if self._repo is None: if is_zipfile(self._path): self._repo = ZipfileBackendRepository(self._path) elif (self._path / REPO_FOLDER).exists(): self._repo = FolderBackendRepository(self._path / REPO_FOLDER) else: raise CorruptStorage(f'repository could not be read: non-existent {self._path / REPO_FOLDER}') return self._repo
[docs] def query(self) -> 'SqliteBackendQueryBuilder': return SqliteBackendQueryBuilder(self)
[docs] def get_backend_entity(self, res): # pylint: disable=no-self-use """Return the backend entity that corresponds to the given Model instance.""" klass = get_backend_entity(res) return klass(self, res)
@property def authinfos(self): return create_backend_collection( authinfos.SqlaAuthInfoCollection, self, authinfos.SqlaAuthInfo, models.DbAuthInfo ) @property def comments(self): return create_backend_collection(comments.SqlaCommentCollection, self, comments.SqlaComment, models.DbComment) @property def computers(self): return create_backend_collection( computers.SqlaComputerCollection, self, computers.SqlaComputer, models.DbComputer ) @property def groups(self): return create_backend_collection(groups.SqlaGroupCollection, self, groups.SqlaGroup, models.DbGroup) @property def logs(self): return create_backend_collection(logs.SqlaLogCollection, self, logs.SqlaLog, models.DbLog) @property def nodes(self): return create_backend_collection(nodes.SqlaNodeCollection, self, nodes.SqlaNode, models.DbNode) @property def users(self): return create_backend_collection(users.SqlaUserCollection, self, users.SqlaUser, models.DbUser)
[docs] def _clear(self, recreate_user: bool = True) -> None: raise ReadOnlyError()
[docs] def transaction(self): raise ReadOnlyError()
@property def in_transaction(self) -> bool: return False
[docs] def bulk_insert(self, entity_type: EntityTypes, rows: list[dict], allow_defaults: bool = False) -> list[int]: raise ReadOnlyError()
[docs] def bulk_update(self, entity_type: EntityTypes, rows: list[dict]) -> None: raise ReadOnlyError()
[docs] def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]): raise ReadOnlyError()
[docs] def get_global_variable(self, key: str): raise NotImplementedError
[docs] def set_global_variable(self, key: str, value, description: Optional[str] = None, overwrite=True) -> None: raise ReadOnlyError()
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None: raise NotImplementedError
[docs] def get_info(self, detailed: bool = False) -> dict: # since extracting the database file is expensive, we only do it if detailed is True results = {'metadata': extract_metadata(self._path)} if detailed: results.update(super().get_info(detailed=detailed)) results['repository'] = self.get_repository().get_info(detailed) return results
[docs]class ReadOnlyError(AiidaException): """Raised when a write operation is called on a read-only archive."""
[docs] def __init__(self, msg='sqlite_zip storage is read-only'): # pylint: disable=useless-super-delegation super().__init__(msg)
[docs]class _RoBackendRepository(AbstractRepositoryBackend): # pylint: disable=abstract-method """A backend abstract for a read-only folder or zip file."""
[docs] def __init__(self, path: str | Path): """Initialise the repository backend. :param path: the path to the zip file """ self._path = Path(path) self._closed = False
[docs] def close(self) -> None: """Close the repository.""" self._closed = True
@property def uuid(self) -> Optional[str]: return None @property def key_format(self) -> Optional[str]: return 'sha256'
[docs] def initialise(self, **kwargs) -> None: pass
@property def is_initialised(self) -> bool: return True
[docs] def erase(self) -> None: raise ReadOnlyError()
[docs] def _put_object_from_filelike(self, handle: BinaryIO) -> str: raise ReadOnlyError()
[docs] def has_objects(self, keys: list[str]) -> list[bool]: return [self.has_object(key) for key in keys]
[docs] def iter_object_streams(self, keys: list[str]) -> Iterator[Tuple[str, BinaryIO]]: for key in keys: with self.open(key) as handle: # pylint: disable=not-context-manager yield key, handle
[docs] def delete_objects(self, keys: list[str]) -> None: raise ReadOnlyError()
[docs] def get_object_hash(self, key: str) -> str: return key
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None: pass
[docs] def get_info(self, detailed: bool = False, **kwargs) -> dict: return {'objects': {'count': len(list(self.list_objects()))}}
[docs]class ZipfileBackendRepository(_RoBackendRepository): """A read-only backend for a zip file. The zip file should contain repository files with the key format: ``repo/<sha256 hash>``, i.e. files named by the sha256 hash of the file contents, inside a ``repo`` directory. """
[docs] def __init__(self, path: str | Path): super().__init__(path) self._folder = REPO_FOLDER self.__zipfile: None | ZipFile = None
[docs] def close(self) -> None: if self._zipfile: self._zipfile.close() super().close()
@property def _zipfile(self) -> ZipFile: """Return the open zip file.""" if self._closed: raise ClosedStorage(f'repository is closed: {self._path}') if self.__zipfile is None: try: self.__zipfile = ZipFile(self._path, mode='r') # pylint: disable=consider-using-with except Exception as exc: raise CorruptStorage(f'repository could not be read {self._path}: {exc}') from exc return self.__zipfile
[docs] def has_object(self, key: str) -> bool: try: self._zipfile.getinfo(f'{self._folder}/{key}') except KeyError: return False return True
[docs] def list_objects(self) -> Iterable[str]: prefix = f'{self._folder}/' prefix_len = len(prefix) for name in self._zipfile.namelist(): if name.startswith(prefix) and name[prefix_len:]: yield name[prefix_len:]
[docs] @contextmanager def open(self, key: str) -> Iterator[BinaryIO]: try: handle = self._zipfile.open(f'{self._folder}/{key}') yield cast(BinaryIO, handle) except KeyError: raise FileNotFoundError(f'object with key `{key}` does not exist.') finally: handle.close()
[docs]class FolderBackendRepository(_RoBackendRepository): """A read-only backend for a folder. The folder should contain repository files, named by the sha256 hash of the file contents. """
[docs] def has_object(self, key: str) -> bool: return self._path.joinpath(key).is_file()
[docs] def list_objects(self) -> Iterable[str]: for subpath in self._path.iterdir(): if subpath.is_file(): yield subpath.name
[docs] @contextmanager def open(self, key: str) -> Iterator[BinaryIO]: if not self._path.joinpath(key).is_file(): raise FileNotFoundError(f'object with key `{key}` does not exist.') with self._path.joinpath(key).open('rb') as handle: yield handle
[docs]class SqliteBackendQueryBuilder(SqlaQueryBuilder): """Archive query builder""" @property def Node(self): return models.DbNode @property def Link(self): return models.DbLink @property def Computer(self): return models.DbComputer @property def User(self): return models.DbUser @property def Group(self): return models.DbGroup @property def AuthInfo(self): return models.DbAuthInfo @property def Comment(self): return models.DbComment @property def Log(self): return models.DbLog @property def table_groups_nodes(self): return models.DbGroupNodes.__table__ # type: ignore[attr-defined] # pylint: disable=no-member
[docs]def create_backend_cls(base_class, model_cls): """Create an archive backend class for the given model class.""" class ReadOnlyEntityBackend(base_class): # type: ignore """Backend class for the read-only archive.""" MODEL_CLASS = model_cls def __init__(self, _backend, model): """Initialise the backend entity.""" self._backend = _backend self._model = ModelWrapper(model, _backend) @property def model(self) -> ModelWrapper: """Return an ORM model that correctly updates and flushes the data model when getting or setting a field.""" return self._model @property def bare_model(self): """Return the underlying SQLAlchemy ORM model for this entity.""" return self.model._model # pylint: disable=protected-access @classmethod def from_dbmodel(cls, model, _backend): return cls(_backend, model) @property def is_stored(self): return True def store(self): # pylint: disable=no-self-use raise ReadOnlyError() return ReadOnlyEntityBackend
[docs]def create_backend_collection(cls, _backend, entity_cls, model): collection = cls(_backend) new_cls = create_backend_cls(entity_cls, model) collection.ENTITY_CLASS = new_cls return collection
[docs]@singledispatch def get_backend_entity(dbmodel) -> Type[entities.SqlaModelEntity]: # pylint: disable=unused-argument raise TypeError(f'Cannot get backend entity for {dbmodel}')
@get_backend_entity.register(models.DbAuthInfo) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(authinfos.SqlaAuthInfo, dbmodel.__class__) @get_backend_entity.register(models.DbComment) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(comments.SqlaComment, dbmodel.__class__) @get_backend_entity.register(models.DbComputer) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(computers.SqlaComputer, dbmodel.__class__) @get_backend_entity.register(models.DbGroup) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(groups.SqlaGroup, dbmodel.__class__) @get_backend_entity.register(models.DbLog) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(logs.SqlaLog, dbmodel.__class__) @get_backend_entity.register(models.DbNode) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(nodes.SqlaNode, dbmodel.__class__)
[docs]@get_backend_entity.register(models.DbUser) # type: ignore[call-overload] def _(dbmodel): return create_backend_cls(users.SqlaUser, dbmodel.__class__)