# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""The table models are dynamically generated from the sqlalchemy backend models."""
from __future__ import annotations
from contextlib import contextmanager
from functools import cached_property
from pathlib import Path
import tempfile
from typing import BinaryIO, Iterable, Iterator, Optional, Sequence, Tuple, cast
from zipfile import ZipFile, is_zipfile
from archive_path import extract_file_in_zip
from sqlalchemy.orm import Session
from aiida.common.exceptions import ClosedStorage, CorruptStorage
from aiida.manage import Profile
from aiida.orm.entities import EntityTypes
from aiida.orm.implementation import StorageBackend
from aiida.repository.backend.abstract import AbstractRepositoryBackend
from . import orm
from .migrator import get_schema_version_head, validate_storage
from .utils import DB_FILENAME, REPO_FOLDER, ReadOnlyError, create_sqla_engine, extract_metadata, read_version
__all__ = ('SqliteZipBackend',)
[docs]class SqliteZipBackend(StorageBackend): # pylint: disable=too-many-public-methods
"""A read-only backend for a sqlite/zip format.
The storage format uses an SQLite database and repository files, within a folder or zipfile.
The content of the folder/zipfile should be::
|- metadata.json
|- db.sqlite3
|- repo/
|- hashkey1
|- hashkey2
...
"""
_read_only = True
[docs] @classmethod
def version_head(cls) -> str:
return get_schema_version_head()
[docs] @staticmethod
def create_profile(path: str | Path, options: dict | None = None) -> Profile:
"""Create a new profile instance for this backend, from the path to the zip file."""
profile_name = Path(path).name
return Profile(
profile_name, {
'storage': {
'backend': 'sqlite_zip',
'config': {
'path': str(path)
}
},
'process_control': {
'backend': 'null',
'config': {}
},
'options': options or {},
}
)
[docs] @classmethod
def version_profile(cls, profile: Profile) -> Optional[str]:
return read_version(profile.storage_config['path'], search_limit=None)
[docs] @classmethod
def migrate(cls, profile: Profile):
raise NotImplementedError('use the migrate function directly.')
[docs] def __init__(self, profile: Profile):
super().__init__(profile)
self._path = Path(profile.storage_config['path'])
validate_storage(self._path)
# lazy open the archive zipfile and extract the database file
self._db_file: Optional[Path] = None
self._session: Optional[Session] = None
self._repo: Optional[_RoBackendRepository] = None
self._closed = False
[docs] def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteZip storage (read-only) [{state}] @ {self._path}'
@property
def is_closed(self) -> bool:
return self._closed
[docs] def close(self):
"""Close the backend"""
if self._session:
self._session.close()
if self._db_file and self._db_file.exists():
self._db_file.unlink()
if self._repo:
self._repo.close()
self._session = None
self._db_file = None
self._repo = None
self._closed = True
[docs] def get_session(self) -> Session:
"""Return an SQLAlchemy session."""
if self._closed:
raise ClosedStorage(str(self))
if self._session is None:
if is_zipfile(self._path):
_, path = tempfile.mkstemp()
db_file = self._db_file = Path(path)
with db_file.open('wb') as handle:
try:
extract_file_in_zip(self._path, DB_FILENAME, handle, search_limit=4)
except Exception as exc:
raise CorruptStorage(f'database could not be read: {exc}') from exc
else:
db_file = self._path / DB_FILENAME
if not db_file.exists():
raise CorruptStorage(f'database could not be read: non-existent {db_file}')
self._session = Session(create_sqla_engine(db_file), future=True)
return self._session
[docs] def get_repository(self) -> '_RoBackendRepository':
if self._closed:
raise ClosedStorage(str(self))
if self._repo is None:
if is_zipfile(self._path):
self._repo = ZipfileBackendRepository(self._path)
elif (self._path / REPO_FOLDER).exists():
self._repo = FolderBackendRepository(self._path / REPO_FOLDER)
else:
raise CorruptStorage(f'repository could not be read: non-existent {self._path / REPO_FOLDER}')
return self._repo
[docs] def query(self) -> orm.SqliteQueryBuilder:
return orm.SqliteQueryBuilder(self)
[docs] def get_backend_entity(self, model): # pylint: disable=no-self-use
"""Return the backend entity that corresponds to the given Model instance."""
return orm.get_backend_entity(model, self)
@cached_property
def authinfos(self):
return orm.SqliteAuthInfoCollection(self)
@cached_property
def comments(self):
return orm.SqliteCommentCollection(self)
@cached_property
def computers(self):
return orm.SqliteComputerCollection(self)
@cached_property
def groups(self):
return orm.SqliteGroupCollection(self)
@cached_property
def logs(self):
return orm.SqliteLogCollection(self)
@cached_property
def nodes(self):
return orm.SqliteNodeCollection(self)
@cached_property
def users(self):
return orm.SqliteUserCollection(self)
[docs] def _clear(self, recreate_user: bool = True) -> None:
raise ReadOnlyError()
[docs] def transaction(self):
raise ReadOnlyError()
@property
def in_transaction(self) -> bool:
return False
[docs] def bulk_insert(self, entity_type: EntityTypes, rows: list[dict], allow_defaults: bool = False) -> list[int]:
raise ReadOnlyError()
[docs] def bulk_update(self, entity_type: EntityTypes, rows: list[dict]) -> None:
raise ReadOnlyError()
[docs] def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]):
raise ReadOnlyError()
[docs] def get_global_variable(self, key: str):
raise NotImplementedError
[docs] def set_global_variable(self, key: str, value, description: Optional[str] = None, overwrite=True) -> None:
raise ReadOnlyError()
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
raise NotImplementedError
[docs] def get_info(self, detailed: bool = False) -> dict:
# since extracting the database file is expensive, we only do it if detailed is True
results = {'metadata': extract_metadata(self._path)}
if detailed:
results.update(super().get_info(detailed=detailed))
results['repository'] = self.get_repository().get_info(detailed)
return results
[docs]class _RoBackendRepository(AbstractRepositoryBackend): # pylint: disable=abstract-method
"""A backend abstract for a read-only folder or zip file."""
[docs] def __init__(self, path: str | Path):
"""Initialise the repository backend.
:param path: the path to the zip file
"""
self._path = Path(path)
self._closed = False
[docs] def close(self) -> None:
"""Close the repository."""
self._closed = True
@property
def uuid(self) -> Optional[str]:
return None
@property
def key_format(self) -> Optional[str]:
return 'sha256'
[docs] def initialise(self, **kwargs) -> None:
pass
@property
def is_initialised(self) -> bool:
return True
[docs] def erase(self) -> None:
raise ReadOnlyError()
[docs] def _put_object_from_filelike(self, handle: BinaryIO) -> str:
raise ReadOnlyError()
[docs] def has_objects(self, keys: list[str]) -> list[bool]:
return [self.has_object(key) for key in keys]
[docs] def iter_object_streams(self, keys: list[str]) -> Iterator[Tuple[str, BinaryIO]]:
for key in keys:
with self.open(key) as handle: # pylint: disable=not-context-manager
yield key, handle
[docs] def delete_objects(self, keys: list[str]) -> None:
raise ReadOnlyError()
[docs] def get_object_hash(self, key: str) -> str:
return key
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
pass
[docs] def get_info(self, detailed: bool = False, **kwargs) -> dict:
return {'objects': {'count': len(list(self.list_objects()))}}
[docs]class ZipfileBackendRepository(_RoBackendRepository):
"""A read-only backend for a zip file.
The zip file should contain repository files with the key format: ``repo/<sha256 hash>``,
i.e. files named by the sha256 hash of the file contents, inside a ``repo`` directory.
"""
[docs] def __init__(self, path: str | Path):
super().__init__(path)
self._folder = REPO_FOLDER
self.__zipfile: None | ZipFile = None
[docs] def close(self) -> None:
if self._zipfile:
self._zipfile.close()
super().close()
@property
def _zipfile(self) -> ZipFile:
"""Return the open zip file."""
if self._closed:
raise ClosedStorage(f'repository is closed: {self._path}')
if self.__zipfile is None:
try:
self.__zipfile = ZipFile(self._path, mode='r') # pylint: disable=consider-using-with
except Exception as exc:
raise CorruptStorage(f'repository could not be read {self._path}: {exc}') from exc
return self.__zipfile
[docs] def has_object(self, key: str) -> bool:
try:
self._zipfile.getinfo(f'{self._folder}/{key}')
except KeyError:
return False
return True
[docs] def list_objects(self) -> Iterable[str]:
prefix = f'{self._folder}/'
prefix_len = len(prefix)
for name in self._zipfile.namelist():
if name.startswith(prefix) and name[prefix_len:]:
yield name[prefix_len:]
[docs] @contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
try:
handle = self._zipfile.open(f'{self._folder}/{key}')
yield cast(BinaryIO, handle)
except KeyError:
raise FileNotFoundError(f'object with key `{key}` does not exist.')
finally:
handle.close()
[docs]class FolderBackendRepository(_RoBackendRepository):
"""A read-only backend for a folder.
The folder should contain repository files, named by the sha256 hash of the file contents.
"""
[docs] def has_object(self, key: str) -> bool:
return self._path.joinpath(key).is_file()
[docs] def list_objects(self) -> Iterable[str]:
for subpath in self._path.iterdir():
if subpath.is_file():
yield subpath.name
[docs] @contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
if not self._path.joinpath(key).is_file():
raise FileNotFoundError(f'object with key `{key}` does not exist.')
with self._path.joinpath(key).open('rb') as handle:
yield handle