Source code for aiida.repository.backend.sandbox

"""Implementation of the ``AbstractRepositoryBackend`` using a sandbox folder on disk as the backend."""

from __future__ import annotations

import contextlib
import os
import pathlib
import shutil
import typing as t
import uuid

from aiida.common.folders import SandboxFolder

from .abstract import AbstractRepositoryBackend

__all__ = ('SandboxRepositoryBackend',)


[docs] class SandboxRepositoryBackend(AbstractRepositoryBackend): """Implementation of the ``AbstractRepositoryBackend`` using a sandbox folder on disk as the backend."""
[docs] def __init__(self, filepath: str | None = None): """Construct a new instance. :param filepath: The path to the directory in which the sandbox folder should be created. """ self._sandbox: SandboxFolder | None = None self._filepath: str | None = filepath
[docs] def __str__(self) -> str: """Return the string representation of this repository.""" if self.is_initialised: return f'SandboxRepository: {self._sandbox.abspath if self._sandbox else "null"}' return 'SandboxRepository: <uninitialised>'
[docs] def __del__(self): """Delete the entire sandbox folder if it was instantiated and still exists.""" self.erase()
@property def uuid(self) -> str | None: """Return the unique identifier of the repository. .. note:: A sandbox folder does not have the concept of a unique identifier and so always returns ``None``. """ return None @property def key_format(self) -> str | None: return 'uuid4'
[docs] def initialise(self, **kwargs) -> None: """Initialise the repository if it hasn't already been initialised. :param kwargs: parameters for the initialisation. """ # Merely calling the property will cause the sandbox folder to be initialised. self.sandbox
@property def is_initialised(self) -> bool: """Return whether the repository has been initialised.""" return isinstance(self._sandbox, SandboxFolder) @property def sandbox(self): """Return the sandbox instance of this repository.""" if self._sandbox is None: self._sandbox = SandboxFolder(filepath=pathlib.Path(self._filepath) if self._filepath is not None else None) return self._sandbox
[docs] def erase(self): """Delete the repository itself and all its contents.""" if getattr(self, '_sandbox', None) is not None: try: shutil.rmtree(self.sandbox.abspath) except FileNotFoundError: pass finally: self._sandbox = None
[docs] def _put_object_from_filelike(self, handle: t.BinaryIO) -> str: """Store the byte contents of a file in the repository. :param handle: filelike object with the byte content to be stored. :return: the generated fully qualified identifier for the object within the repository. :raises TypeError: if the handle is not a byte stream. """ key = str(uuid.uuid4()) filepath = os.path.join(self.sandbox.abspath, key) with open(filepath, 'wb') as target: shutil.copyfileobj(handle, target) return key
[docs] def has_objects(self, keys: list[str]) -> list[bool]: result = [] dirlist = os.listdir(self.sandbox.abspath) for key in keys: result.append(key in dirlist) return result
[docs] @contextlib.contextmanager def open(self, key: str) -> t.Iterator[t.BinaryIO]: super().open(key) with self.sandbox.open(key, mode='rb') as handle: yield handle
[docs] def iter_object_streams(self, keys: list[str]) -> t.Iterator[tuple[str, t.BinaryIO]]: for key in keys: with self.open(key) as handle: yield key, handle
[docs] def delete_objects(self, keys: list[str]) -> None: super().delete_objects(keys) for key in keys: os.remove(os.path.join(self.sandbox.abspath, key))
[docs] def list_objects(self) -> t.Iterable[str]: return self.sandbox.get_content_list()
[docs] def maintain(self, dry_run: bool = False, live: bool = True, **kwargs) -> None: raise NotImplementedError
[docs] def get_info(self, detailed: bool = False, **kwargs) -> dict: raise NotImplementedError