# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""The table models are dynamically generated from the sqlalchemy backend models."""
from __future__ import annotations
from contextlib import contextmanager
from functools import singledispatch
from pathlib import Path
import tempfile
from typing import BinaryIO, Iterable, Iterator, Optional, Sequence, Tuple, Type, cast
from zipfile import ZipFile, is_zipfile
from archive_path import extract_file_in_zip
from sqlalchemy.orm import Session
from aiida.common.exceptions import AiidaException, ClosedStorage, CorruptStorage
from aiida.manage import Profile
from aiida.orm.entities import EntityTypes
from aiida.orm.implementation import StorageBackend
from aiida.repository.backend.abstract import AbstractRepositoryBackend
from aiida.storage.psql_dos.orm import authinfos, comments, computers, entities, groups, logs, nodes, users
from aiida.storage.psql_dos.orm.querybuilder import SqlaQueryBuilder
from aiida.storage.psql_dos.orm.utils import ModelWrapper
from . import models
from .migrator import get_schema_version_head, validate_storage
from .utils import DB_FILENAME, REPO_FOLDER, create_sqla_engine, extract_metadata, read_version
[docs]class SqliteZipBackend(StorageBackend): # pylint: disable=too-many-public-methods
"""A read-only backend for a sqlite/zip format.
The storage format uses an SQLite database and repository files, within a folder or zipfile.
The content of the folder/zipfile should be::
|- metadata.json
|- db.sqlite3
|- repo/
|- hashkey1
|- hashkey2
...
"""
[docs] @classmethod
def version_head(cls) -> str:
return get_schema_version_head()
[docs] @staticmethod
def create_profile(path: str | Path) -> Profile:
"""Create a new profile instance for this backend, from the path to the zip file."""
profile_name = Path(path).name
return Profile(
profile_name, {
'storage': {
'backend': 'sqlite_zip',
'config': {
'path': str(path)
}
},
'process_control': {
'backend': 'null',
'config': {}
}
}
)
[docs] @classmethod
def version_profile(cls, profile: Profile) -> Optional[str]:
return read_version(profile.storage_config['path'], search_limit=None)
[docs] @classmethod
def migrate(cls, profile: Profile):
raise NotImplementedError('use the migrate function directly.')
[docs] def __init__(self, profile: Profile):
super().__init__(profile)
self._path = Path(profile.storage_config['path'])
validate_storage(self._path)
# lazy open the archive zipfile and extract the database file
self._db_file: Optional[Path] = None
self._session: Optional[Session] = None
self._repo: Optional[_RoBackendRepository] = None
self._closed = False
[docs] def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteZip storage (read-only) [{state}] @ {self._path}'
@property
def is_closed(self) -> bool:
return self._closed
[docs] def close(self):
"""Close the backend"""
if self._session:
self._session.close()
if self._db_file and self._db_file.exists():
self._db_file.unlink()
if self._repo:
self._repo.close()
self._session = None
self._db_file = None
self._repo = None
self._closed = True
[docs] def get_session(self) -> Session:
"""Return an SQLAlchemy session."""
if self._closed:
raise ClosedStorage(str(self))
if self._session is None:
if is_zipfile(self._path):
_, path = tempfile.mkstemp()
db_file = self._db_file = Path(path)
with db_file.open('wb') as handle:
try:
extract_file_in_zip(self._path, DB_FILENAME, handle, search_limit=4)
except Exception as exc:
raise CorruptStorage(f'database could not be read: {exc}') from exc
else:
db_file = self._path / DB_FILENAME
if not db_file.exists():
raise CorruptStorage(f'database could not be read: non-existent {db_file}')
self._session = Session(create_sqla_engine(db_file))
return self._session
[docs] def get_repository(self) -> '_RoBackendRepository':
if self._closed:
raise ClosedStorage(str(self))
if self._repo is None:
if is_zipfile(self._path):
self._repo = ZipfileBackendRepository(self._path)
elif (self._path / REPO_FOLDER).exists():
self._repo = FolderBackendRepository(self._path / REPO_FOLDER)
else:
raise CorruptStorage(f'repository could not be read: non-existent {self._path / REPO_FOLDER}')
return self._repo
[docs] def query(self) -> 'SqliteBackendQueryBuilder':
return SqliteBackendQueryBuilder(self)
[docs] def get_backend_entity(self, res): # pylint: disable=no-self-use
"""Return the backend entity that corresponds to the given Model instance."""
klass = get_backend_entity(res)
return klass(self, res)
@property
def authinfos(self):
return create_backend_collection(
authinfos.SqlaAuthInfoCollection, self, authinfos.SqlaAuthInfo, models.DbAuthInfo
)
@property
def comments(self):
return create_backend_collection(comments.SqlaCommentCollection, self, comments.SqlaComment, models.DbComment)
@property
def computers(self):
return create_backend_collection(
computers.SqlaComputerCollection, self, computers.SqlaComputer, models.DbComputer
)
@property
def groups(self):
return create_backend_collection(groups.SqlaGroupCollection, self, groups.SqlaGroup, models.DbGroup)
@property
def logs(self):
return create_backend_collection(logs.SqlaLogCollection, self, logs.SqlaLog, models.DbLog)
@property
def nodes(self):
return create_backend_collection(nodes.SqlaNodeCollection, self, nodes.SqlaNode, models.DbNode)
@property
def users(self):
return create_backend_collection(users.SqlaUserCollection, self, users.SqlaUser, models.DbUser)
[docs] def _clear(self, recreate_user: bool = True) -> None:
raise ReadOnlyError()
[docs] def transaction(self):
raise ReadOnlyError()
@property
def in_transaction(self) -> bool:
return False
[docs] def bulk_insert(self, entity_type: EntityTypes, rows: list[dict], allow_defaults: bool = False) -> list[int]:
raise ReadOnlyError()
[docs] def bulk_update(self, entity_type: EntityTypes, rows: list[dict]) -> None:
raise ReadOnlyError()
[docs] def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]):
raise ReadOnlyError()
[docs] def get_global_variable(self, key: str):
raise NotImplementedError
[docs] def set_global_variable(self, key: str, value, description: Optional[str] = None, overwrite=True) -> None:
raise ReadOnlyError()
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
raise NotImplementedError
[docs] def get_info(self, detailed: bool = False) -> dict:
# since extracting the database file is expensive, we only do it if detailed is True
results = {'metadata': extract_metadata(self._path)}
if detailed:
results.update(super().get_info(detailed=detailed))
results['repository'] = self.get_repository().get_info(detailed)
return results
[docs]class ReadOnlyError(AiidaException):
"""Raised when a write operation is called on a read-only archive."""
[docs] def __init__(self, msg='sqlite_zip storage is read-only'): # pylint: disable=useless-super-delegation
super().__init__(msg)
[docs]class _RoBackendRepository(AbstractRepositoryBackend): # pylint: disable=abstract-method
"""A backend abstract for a read-only folder or zip file."""
[docs] def __init__(self, path: str | Path):
"""Initialise the repository backend.
:param path: the path to the zip file
"""
self._path = Path(path)
self._closed = False
[docs] def close(self) -> None:
"""Close the repository."""
self._closed = True
@property
def uuid(self) -> Optional[str]:
return None
@property
def key_format(self) -> Optional[str]:
return 'sha256'
[docs] def initialise(self, **kwargs) -> None:
pass
@property
def is_initialised(self) -> bool:
return True
[docs] def erase(self) -> None:
raise ReadOnlyError()
[docs] def _put_object_from_filelike(self, handle: BinaryIO) -> str:
raise ReadOnlyError()
[docs] def has_objects(self, keys: list[str]) -> list[bool]:
return [self.has_object(key) for key in keys]
[docs] def iter_object_streams(self, keys: list[str]) -> Iterator[Tuple[str, BinaryIO]]:
for key in keys:
with self.open(key) as handle: # pylint: disable=not-context-manager
yield key, handle
[docs] def delete_objects(self, keys: list[str]) -> None:
raise ReadOnlyError()
[docs] def get_object_hash(self, key: str) -> str:
return key
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
pass
[docs] def get_info(self, detailed: bool = False, **kwargs) -> dict:
return {'objects': {'count': len(list(self.list_objects()))}}
[docs]class ZipfileBackendRepository(_RoBackendRepository):
"""A read-only backend for a zip file.
The zip file should contain repository files with the key format: ``repo/<sha256 hash>``,
i.e. files named by the sha256 hash of the file contents, inside a ``repo`` directory.
"""
[docs] def __init__(self, path: str | Path):
super().__init__(path)
self._folder = REPO_FOLDER
self.__zipfile: None | ZipFile = None
[docs] def close(self) -> None:
if self._zipfile:
self._zipfile.close()
super().close()
@property
def _zipfile(self) -> ZipFile:
"""Return the open zip file."""
if self._closed:
raise ClosedStorage(f'repository is closed: {self._path}')
if self.__zipfile is None:
try:
self.__zipfile = ZipFile(self._path, mode='r') # pylint: disable=consider-using-with
except Exception as exc:
raise CorruptStorage(f'repository could not be read {self._path}: {exc}') from exc
return self.__zipfile
[docs] def has_object(self, key: str) -> bool:
try:
self._zipfile.getinfo(f'{self._folder}/{key}')
except KeyError:
return False
return True
[docs] def list_objects(self) -> Iterable[str]:
prefix = f'{self._folder}/'
prefix_len = len(prefix)
for name in self._zipfile.namelist():
if name.startswith(prefix) and name[prefix_len:]:
yield name[prefix_len:]
[docs] @contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
try:
handle = self._zipfile.open(f'{self._folder}/{key}')
yield cast(BinaryIO, handle)
except KeyError:
raise FileNotFoundError(f'object with key `{key}` does not exist.')
finally:
handle.close()
[docs]class FolderBackendRepository(_RoBackendRepository):
"""A read-only backend for a folder.
The folder should contain repository files, named by the sha256 hash of the file contents.
"""
[docs] def has_object(self, key: str) -> bool:
return self._path.joinpath(key).is_file()
[docs] def list_objects(self) -> Iterable[str]:
for subpath in self._path.iterdir():
if subpath.is_file():
yield subpath.name
[docs] @contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
if not self._path.joinpath(key).is_file():
raise FileNotFoundError(f'object with key `{key}` does not exist.')
with self._path.joinpath(key).open('rb') as handle:
yield handle
[docs]class SqliteBackendQueryBuilder(SqlaQueryBuilder):
"""Archive query builder"""
@property
def Node(self):
return models.DbNode
@property
def Link(self):
return models.DbLink
@property
def Computer(self):
return models.DbComputer
@property
def User(self):
return models.DbUser
@property
def Group(self):
return models.DbGroup
@property
def AuthInfo(self):
return models.DbAuthInfo
@property
def Comment(self):
return models.DbComment
@property
def Log(self):
return models.DbLog
@property
def table_groups_nodes(self):
return models.DbGroupNodes.__table__ # type: ignore[attr-defined] # pylint: disable=no-member
[docs]def create_backend_cls(base_class, model_cls):
"""Create an archive backend class for the given model class."""
class ReadOnlyEntityBackend(base_class): # type: ignore
"""Backend class for the read-only archive."""
MODEL_CLASS = model_cls
def __init__(self, _backend, model):
"""Initialise the backend entity."""
self._backend = _backend
self._model = ModelWrapper(model, _backend)
@property
def model(self) -> ModelWrapper:
"""Return an ORM model that correctly updates and flushes the data model when getting or setting a field."""
return self._model
@property
def bare_model(self):
"""Return the underlying SQLAlchemy ORM model for this entity."""
return self.model._model # pylint: disable=protected-access
@classmethod
def from_dbmodel(cls, model, _backend):
return cls(_backend, model)
@property
def is_stored(self):
return True
def store(self): # pylint: disable=no-self-use
raise ReadOnlyError()
return ReadOnlyEntityBackend
[docs]def create_backend_collection(cls, _backend, entity_cls, model):
collection = cls(_backend)
new_cls = create_backend_cls(entity_cls, model)
collection.ENTITY_CLASS = new_cls
return collection
[docs]@singledispatch
def get_backend_entity(dbmodel) -> Type[entities.SqlaModelEntity]: # pylint: disable=unused-argument
raise TypeError(f'Cannot get backend entity for {dbmodel}')
@get_backend_entity.register(models.DbAuthInfo) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(authinfos.SqlaAuthInfo, dbmodel.__class__)
@get_backend_entity.register(models.DbComment) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(comments.SqlaComment, dbmodel.__class__)
@get_backend_entity.register(models.DbComputer) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(computers.SqlaComputer, dbmodel.__class__)
@get_backend_entity.register(models.DbGroup) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(groups.SqlaGroup, dbmodel.__class__)
@get_backend_entity.register(models.DbLog) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(logs.SqlaLog, dbmodel.__class__)
@get_backend_entity.register(models.DbNode) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(nodes.SqlaNode, dbmodel.__class__)
[docs]@get_backend_entity.register(models.DbUser) # type: ignore[call-overload]
def _(dbmodel):
return create_backend_cls(users.SqlaUser, dbmodel.__class__)