###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""The table models are dynamically generated from the sqlalchemy backend models."""
from __future__ import annotations
import json
import shutil
import tempfile
from contextlib import contextmanager
from datetime import datetime
from functools import cached_property
from pathlib import Path
from typing import BinaryIO, Iterable, Iterator, Optional, Sequence, Tuple, cast
from zipfile import ZipFile, is_zipfile
from archive_path import ZipPath, extract_file_in_zip
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.orm import Session
from aiida import __version__
from aiida.common.exceptions import ClosedStorage, CorruptStorage
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import Profile
from aiida.orm.entities import EntityTypes
from aiida.orm.implementation import StorageBackend
from aiida.repository.backend.abstract import AbstractRepositoryBackend
from . import orm
from .migrator import get_schema_version_head, migrate, validate_storage
from .utils import (
DB_FILENAME,
META_FILENAME,
REPO_FOLDER,
ReadOnlyError,
create_sqla_engine,
extract_metadata,
read_version,
)
__all__ = ('SqliteZipBackend',)
LOGGER = AIIDA_LOGGER.getChild(__file__)
[docs]
class SqliteZipBackend(StorageBackend):
"""A read-only backend for a sqlite/zip format.
The storage format uses an SQLite database and repository files, within a folder or zipfile.
The content of the folder/zipfile should be::
|- metadata.json
|- db.sqlite3
|- repo/
|- hashkey1
|- hashkey2
...
"""
read_only = True
"""This plugin is read only and data cannot be created or mutated."""
[docs]
class Model(BaseModel):
"""Model describing required information to configure an instance of the storage."""
filepath: str = Field(title='Filepath of the archive', description='Filepath of the archive.')
[docs]
@field_validator('filepath')
@classmethod
def filepath_exists_and_is_absolute(cls, value: str) -> str:
"""Validate the filepath exists and return the resolved and absolute filepath."""
filepath = Path(value)
assert filepath.is_file(), f'The archive `{value}` does not exist.'
return str(filepath.resolve().absolute())
[docs]
@classmethod
def version_head(cls) -> str:
return get_schema_version_head()
[docs]
@staticmethod
def create_profile(filepath: str | Path, options: dict | None = None) -> Profile:
"""Create a new profile instance for this backend, from the path to the zip file."""
profile_name = Path(filepath).name
return Profile(
profile_name,
{
'storage': {'backend': 'core.sqlite_zip', 'config': {'filepath': str(filepath)}},
'process_control': {'backend': None, 'config': {}},
'options': options or {},
},
)
[docs]
@classmethod
def version_profile(cls, profile: Profile) -> Optional[str]:
return read_version(profile.storage_config['filepath'], search_limit=None)
[docs]
@classmethod
def initialise(cls, profile: 'Profile', reset: bool = False) -> bool:
"""Initialise an instance of the ``SqliteZipBackend`` storage backend.
:param reset: If ``true``, destroy the backend if it already exists including all of its data before recreating
and initialising it. This is useful for example for test profiles that need to be reset before or after
tests having run.
:returns: ``True`` if the storage was initialised by the function call, ``False`` if it was already initialised.
"""
filepath_archive = Path(profile.storage_config['filepath'])
if filepath_archive.exists() and not reset:
# The archive exists but ``reset == False``, so we try to migrate to the latest schema version. If the
# migration works, we replace the original archive with the migrated one.
with tempfile.TemporaryDirectory() as dirpath:
filepath_migrated = Path(dirpath) / 'migrated.zip'
LOGGER.report(f'Migrating existing {cls.__name__}')
migrate(filepath_archive, filepath_migrated, cls.version_head())
shutil.move(filepath_migrated, filepath_archive) # type: ignore[arg-type]
return False
# Here the original archive either doesn't exist or ``reset == True`` so we simply create an empty base archive
# and move it to the path pointed to by the storage configuration of the profile.
with tempfile.TemporaryDirectory() as dirpath:
from .models import SqliteBase
if reset:
LOGGER.report(f'Resetting existing {cls.__name__} at {filepath_archive}')
else:
LOGGER.report(f'Initialising a new {cls.__name__} at {filepath_archive}')
filepath_database = Path(dirpath) / DB_FILENAME
filepath_zip = Path(dirpath) / 'profile.zip'
metadata = {
'export_version': cls.version_head(),
'aiida_version': __version__,
'key_format': 'sha256',
'compression': 6,
'ctime': datetime.now().isoformat(),
}
# Create the database schema
SqliteBase.metadata.create_all(create_sqla_engine(filepath_database))
with ZipPath(
filepath_zip, mode='w', compresslevel=metadata['compression'], info_order=(META_FILENAME, DB_FILENAME)
) as zip_handle:
(zip_handle / META_FILENAME).write_text(json.dumps(metadata))
(zip_handle / DB_FILENAME).putfile(filepath_database)
shutil.move(filepath_zip, filepath_archive) # type: ignore[arg-type]
return True
[docs]
@classmethod
def migrate(cls, profile: Profile):
raise NotImplementedError('use the :func:`aiida.storage.sqlite_zip.migrator.migrate` function directly.')
[docs]
def __init__(self, profile: Profile):
super().__init__(profile)
self._path = Path(profile.storage_config['filepath'])
validate_storage(self._path)
# lazy open the archive zipfile and extract the database file
self._db_file: Optional[Path] = None
self._session: Optional[Session] = None
self._repo: Optional[_RoBackendRepository] = None
self._closed = False
[docs]
def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteZip storage (read-only) [{state}] @ {self._path}'
@property
def is_closed(self) -> bool:
return self._closed
[docs]
def close(self):
"""Close the backend"""
if self._session:
self._session.close()
if self._db_file and self._db_file.exists():
self._db_file.unlink()
if self._repo:
self._repo.close()
self._session = None
self._db_file = None
self._repo = None
self._closed = True
[docs]
def get_session(self) -> Session:
"""Return an SQLAlchemy session."""
if self._closed:
raise ClosedStorage(str(self))
if self._session is None:
if is_zipfile(self._path):
_, path = tempfile.mkstemp()
db_file = self._db_file = Path(path)
with db_file.open('wb') as handle:
try:
extract_file_in_zip(self._path, DB_FILENAME, handle, search_limit=4)
except Exception as exc:
raise CorruptStorage(f'database could not be read: {exc}') from exc
else:
db_file = self._path / DB_FILENAME
if not db_file.exists():
raise CorruptStorage(f'database could not be read: non-existent {db_file}')
self._session = Session(create_sqla_engine(db_file), future=True)
return self._session
[docs]
def get_repository(self) -> '_RoBackendRepository':
if self._closed:
raise ClosedStorage(str(self))
if self._repo is None:
if is_zipfile(self._path):
self._repo = ZipfileBackendRepository(self._path)
elif (self._path / REPO_FOLDER).exists():
self._repo = FolderBackendRepository(self._path / REPO_FOLDER)
else:
raise CorruptStorage(f'repository could not be read: non-existent {self._path / REPO_FOLDER}')
return self._repo
[docs]
def query(self) -> orm.SqliteQueryBuilder:
return orm.SqliteQueryBuilder(self)
[docs]
def get_backend_entity(self, model):
"""Return the backend entity that corresponds to the given Model instance."""
return orm.get_backend_entity(model, self)
@cached_property
def authinfos(self):
return orm.SqliteAuthInfoCollection(self)
@cached_property
def comments(self):
return orm.SqliteCommentCollection(self)
@cached_property
def computers(self):
return orm.SqliteComputerCollection(self)
@cached_property
def groups(self):
return orm.SqliteGroupCollection(self)
@cached_property
def logs(self):
return orm.SqliteLogCollection(self)
@cached_property
def nodes(self):
return orm.SqliteNodeCollection(self)
@cached_property
def users(self):
return orm.SqliteUserCollection(self)
[docs]
def _clear(self) -> None:
raise ReadOnlyError()
[docs]
@contextmanager
def transaction(self):
session = self.get_session()
if session.in_transaction():
with session.begin_nested() as savepoint:
yield session
savepoint.commit()
session.commit()
else:
with session.begin():
with session.begin_nested() as savepoint:
yield session
savepoint.commit()
@property
def in_transaction(self) -> bool:
return False
[docs]
def bulk_insert(self, entity_type: EntityTypes, rows: list[dict], allow_defaults: bool = False) -> list[int]:
raise ReadOnlyError()
[docs]
def bulk_update(self, entity_type: EntityTypes, rows: list[dict]) -> None:
raise ReadOnlyError()
[docs]
def delete(self) -> None:
"""Delete the storage and all the data."""
filepath = Path(self.profile.storage_config['filepath'])
if filepath.exists():
filepath.unlink()
LOGGER.report(f'Deleted archive at `{filepath}`.')
[docs]
def delete_nodes_and_connections(self, pks_to_delete: Sequence[int]):
raise ReadOnlyError()
[docs]
def get_global_variable(self, key: str):
raise NotImplementedError
[docs]
def set_global_variable(self, key: str, value, description: Optional[str] = None, overwrite=True) -> None:
raise ReadOnlyError()
[docs]
def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
raise NotImplementedError
[docs]
def get_info(self, detailed: bool = False) -> dict:
# since extracting the database file is expensive, we only do it if detailed is True
results = {'metadata': extract_metadata(self._path)}
if detailed:
results.update(super().get_info(detailed=detailed))
results['repository'] = self.get_repository().get_info(detailed)
return results
[docs]
class _RoBackendRepository(AbstractRepositoryBackend):
"""A backend abstract for a read-only folder or zip file."""
[docs]
def __init__(self, path: str | Path):
"""Initialise the repository backend.
:param path: the path to the zip file
"""
self._path = Path(path)
self._closed = False
[docs]
def close(self) -> None:
"""Close the repository."""
self._closed = True
@property
def uuid(self) -> Optional[str]:
return None
@property
def key_format(self) -> Optional[str]:
return 'sha256'
[docs]
def initialise(self, **kwargs) -> None:
pass
@property
def is_initialised(self) -> bool:
return True
[docs]
def erase(self) -> None:
raise ReadOnlyError()
[docs]
def _put_object_from_filelike(self, handle: BinaryIO) -> str:
raise ReadOnlyError()
[docs]
def has_objects(self, keys: list[str]) -> list[bool]:
return [self.has_object(key) for key in keys]
[docs]
def iter_object_streams(self, keys: list[str]) -> Iterator[Tuple[str, BinaryIO]]:
for key in keys:
with self.open(key) as handle:
yield key, handle
[docs]
def delete_objects(self, keys: list[str]) -> None:
raise ReadOnlyError()
[docs]
def get_object_hash(self, key: str) -> str:
return key
[docs]
def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None:
pass
[docs]
def get_info(self, detailed: bool = False, **kwargs) -> dict:
return {'objects': {'count': len(list(self.list_objects()))}}
[docs]
class ZipfileBackendRepository(_RoBackendRepository):
"""A read-only backend for a zip file.
The zip file should contain repository files with the key format: ``repo/<sha256 hash>``,
i.e. files named by the sha256 hash of the file contents, inside a ``repo`` directory.
"""
[docs]
def __init__(self, path: str | Path):
super().__init__(path)
self._folder = REPO_FOLDER
self.__zipfile: None | ZipFile = None
[docs]
def close(self) -> None:
if self._zipfile:
self._zipfile.close()
super().close()
@property
def _zipfile(self) -> ZipFile:
"""Return the open zip file."""
if self._closed:
raise ClosedStorage(f'repository is closed: {self._path}')
if self.__zipfile is None:
try:
self.__zipfile = ZipFile(self._path, mode='r')
except Exception as exc:
raise CorruptStorage(f'repository could not be read {self._path}: {exc}') from exc
return self.__zipfile
[docs]
def has_object(self, key: str) -> bool:
try:
self._zipfile.getinfo(f'{self._folder}/{key}')
except KeyError:
return False
return True
[docs]
def list_objects(self) -> Iterable[str]:
prefix = f'{self._folder}/'
prefix_len = len(prefix)
for name in self._zipfile.namelist():
if name.startswith(prefix) and name[prefix_len:]:
yield name[prefix_len:]
[docs]
@contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
try:
handle = self._zipfile.open(f'{self._folder}/{key}')
yield cast(BinaryIO, handle)
except KeyError:
raise FileNotFoundError(f'object with key `{key}` does not exist.')
finally:
handle.close()
[docs]
class FolderBackendRepository(_RoBackendRepository):
"""A read-only backend for a folder.
The folder should contain repository files, named by the sha256 hash of the file contents.
"""
[docs]
def has_object(self, key: str) -> bool:
return self._path.joinpath(key).is_file()
[docs]
def list_objects(self) -> Iterable[str]:
for subpath in self._path.iterdir():
if subpath.is_file():
yield subpath.name
[docs]
@contextmanager
def open(self, key: str) -> Iterator[BinaryIO]:
if not self._path.joinpath(key).is_file():
raise FileNotFoundError(f'object with key `{key}` does not exist.')
with self._path.joinpath(key).open('rb', encoding='utf-8') as handle:
yield handle