Source code for aiida.storage.sqlite_zip.backend

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""The table models are dynamically generated from the sqlalchemy backend models."""
from __future__ import annotations

from contextlib import contextmanager
from functools import cached_property
from pathlib import Path
import tempfile
from typing import BinaryIO, Iterable, Iterator, Optional, Sequence, Tuple, cast
from zipfile import ZipFile, is_zipfile

from archive_path import extract_file_in_zip
from sqlalchemy.orm import Session

from aiida.common.exceptions import ClosedStorage, CorruptStorage
from aiida.manage import Profile
from aiida.orm.entities import EntityTypes
from aiida.orm.implementation import StorageBackend
from aiida.repository.backend.abstract import AbstractRepositoryBackend

from . import orm
from .migrator import get_schema_version_head, validate_storage
from .utils import DB_FILENAME, REPO_FOLDER, ReadOnlyError, create_sqla_engine, extract_metadata, read_version

__all__ = ('SqliteZipBackend',)


[docs]class SqliteZipBackend(StorageBackend): # pylint: disable=too-many-public-methods """A read-only backend for a sqlite/zip format. The storage format uses an SQLite database and repository files, within a folder or zipfile. The content of the folder/zipfile should be:: |- metadata.json |- db.sqlite3 |- repo/ |- hashkey1 |- hashkey2 ... """ _read_only = True
[docs] @classmethod def version_head(cls) -> str: return get_schema_version_head()
[docs] @staticmethod def create_profile(path: str | Path, options: dict | None = None) -> Profile: """Create a new profile instance for this backend, from the path to the zip file.""" profile_name = Path(path).name return Profile( profile_name, { 'storage': { 'backend': 'sqlite_zip', 'config': { 'path': str(path) } }, 'process_control': { 'backend': 'null', 'config': {} }, 'options': options or {}, } )
[docs] @classmethod def version_profile(cls, profile: Profile) -> Optional[str]: return read_version(profile.storage_config['path'], search_limit=None)
[docs] @classmethod def migrate(cls, profile: Profile): raise NotImplementedError('use the migrate function directly.')
[docs] def __init__(self, profile: Profile): super().__init__(profile) self._path = Path(profile.storage_config['path']) validate_storage(self._path) # lazy open the archive zipfile and extract the database file self._db_file: Optional[Path] = None self._session: Optional[Session] = None self._repo: Optional[_RoBackendRepository] = None self._closed = False
[docs] def __str__(self) -> str: state = 'closed' if self.is_closed else 'open' return f'SqliteZip storage (read-only) [{state}] @ {self._path}'
@property def is_closed(self) -> bool: return self._closed
[docs] def close(self): """Close the backend""" if self._session: self._session.close() if self._db_file and self._db_file.exists(): self._db_file.unlink() if self._repo: self._repo.close() self._session = None self._db_file = None self._repo = None self._closed = True
[docs] def get_session(self) -> Session: """Return an SQLAlchemy session.""" if self._closed: raise ClosedStorage(str(self)) if self._session is None: if is_zipfile(self._path): _, path = tempfile.mkstemp() db_file = self._db_file = Path(path) with db_file.open('wb') as handle: try: extract_file_in_zip(self._path, DB_FILENAME, handle, search_limit=4) except Exception as exc: raise CorruptStorage(f'database could not be read: {exc}') from exc else: db_file = self._path / DB_FILENAME if not db_file.exists(): raise CorruptStorage(f'database could not be read: non-existent {db_file}') self._session = Session(create_sqla_engine(db_file), future=True) return self._session
[docs] def get_repository(self) -> '_RoBackendRepository': if self._closed: raise ClosedStorage(str(self)) if self._repo is None: if is_zipfile(self._path): self._repo = ZipfileBackendRepository(self._path) elif (self._path / REPO_FOLDER).exists(): self._repo = FolderBackendRepository(self._path / REPO_FOLDER) else: raise CorruptStorage(f'repository could not be read: non-existent {self._path / REPO_FOLDER}') return self._repo
[docs] def query(self) -> orm.SqliteQueryBuilder: return orm.SqliteQueryBuilder(self)
[docs] def get_backend_entity(self, model): # pylint: disable=no-self-use """Return the backend entity that corresponds to the given Model instance.""" return orm.get_backend_entity(model, self)
@cached_property def authinfos(self): return orm.SqliteAuthInfoCollection(self) @cached_property def comments(self): return orm.SqliteCommentCollection(self) @cached_property def computers(self): return orm.SqliteComputerCollection(self) @cached_property def groups(self): return orm.SqliteGroupCollection(self) @cached_property def logs(self): return orm.SqliteLogCollection(self) @cached_property def nodes(self): return orm.SqliteNodeCollection(self) @cached_property def users(self): return orm.SqliteUserCollection(self)
[docs] def _clear(self, recreate_user: bool = True) -> None: raise ReadOnlyError()
[docs] def transaction(self): raise ReadOnlyError()
@property def in_transaction(self) -> bool: return False
[docs] def bulk_insert(self, entity_type: EntityTypes, rows: list[dict], allow_defaults: bool = False) -> list[int]: raise ReadOnlyError()
[docs] def bulk_update(self, entity_type: EntityTypes, rows: list[dict]) -> None: raise ReadOnlyError()
[docs] def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]): raise ReadOnlyError()
[docs] def get_global_variable(self, key: str): raise NotImplementedError
[docs] def set_global_variable(self, key: str, value, description: Optional[str] = None, overwrite=True) -> None: raise ReadOnlyError()
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None: raise NotImplementedError
[docs] def get_info(self, detailed: bool = False) -> dict: # since extracting the database file is expensive, we only do it if detailed is True results = {'metadata': extract_metadata(self._path)} if detailed: results.update(super().get_info(detailed=detailed)) results['repository'] = self.get_repository().get_info(detailed) return results
[docs]class _RoBackendRepository(AbstractRepositoryBackend): # pylint: disable=abstract-method """A backend abstract for a read-only folder or zip file."""
[docs] def __init__(self, path: str | Path): """Initialise the repository backend. :param path: the path to the zip file """ self._path = Path(path) self._closed = False
[docs] def close(self) -> None: """Close the repository.""" self._closed = True
@property def uuid(self) -> Optional[str]: return None @property def key_format(self) -> Optional[str]: return 'sha256'
[docs] def initialise(self, **kwargs) -> None: pass
@property def is_initialised(self) -> bool: return True
[docs] def erase(self) -> None: raise ReadOnlyError()
[docs] def _put_object_from_filelike(self, handle: BinaryIO) -> str: raise ReadOnlyError()
[docs] def has_objects(self, keys: list[str]) -> list[bool]: return [self.has_object(key) for key in keys]
[docs] def iter_object_streams(self, keys: list[str]) -> Iterator[Tuple[str, BinaryIO]]: for key in keys: with self.open(key) as handle: # pylint: disable=not-context-manager yield key, handle
[docs] def delete_objects(self, keys: list[str]) -> None: raise ReadOnlyError()
[docs] def get_object_hash(self, key: str) -> str: return key
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None: pass
[docs] def get_info(self, detailed: bool = False, **kwargs) -> dict: return {'objects': {'count': len(list(self.list_objects()))}}
[docs]class ZipfileBackendRepository(_RoBackendRepository): """A read-only backend for a zip file. The zip file should contain repository files with the key format: ``repo/<sha256 hash>``, i.e. files named by the sha256 hash of the file contents, inside a ``repo`` directory. """
[docs] def __init__(self, path: str | Path): super().__init__(path) self._folder = REPO_FOLDER self.__zipfile: None | ZipFile = None
[docs] def close(self) -> None: if self._zipfile: self._zipfile.close() super().close()
@property def _zipfile(self) -> ZipFile: """Return the open zip file.""" if self._closed: raise ClosedStorage(f'repository is closed: {self._path}') if self.__zipfile is None: try: self.__zipfile = ZipFile(self._path, mode='r') # pylint: disable=consider-using-with except Exception as exc: raise CorruptStorage(f'repository could not be read {self._path}: {exc}') from exc return self.__zipfile
[docs] def has_object(self, key: str) -> bool: try: self._zipfile.getinfo(f'{self._folder}/{key}') except KeyError: return False return True
[docs] def list_objects(self) -> Iterable[str]: prefix = f'{self._folder}/' prefix_len = len(prefix) for name in self._zipfile.namelist(): if name.startswith(prefix) and name[prefix_len:]: yield name[prefix_len:]
[docs] @contextmanager def open(self, key: str) -> Iterator[BinaryIO]: try: handle = self._zipfile.open(f'{self._folder}/{key}') yield cast(BinaryIO, handle) except KeyError: raise FileNotFoundError(f'object with key `{key}` does not exist.') finally: handle.close()
[docs]class FolderBackendRepository(_RoBackendRepository): """A read-only backend for a folder. The folder should contain repository files, named by the sha256 hash of the file contents. """
[docs] def has_object(self, key: str) -> bool: return self._path.joinpath(key).is_file()
[docs] def list_objects(self) -> Iterable[str]: for subpath in self._path.iterdir(): if subpath.is_file(): yield subpath.name
[docs] @contextmanager def open(self, key: str) -> Iterator[BinaryIO]: if not self._path.joinpath(key).is_file(): raise FileNotFoundError(f'object with key `{key}` does not exist.') with self._path.joinpath(key).open('rb') as handle: yield handle