Source code for aiida.tools.pytest_fixtures.daemon
"""Fixtures to interact with the daemon."""
from __future__ import annotations
import pathlib
import typing as t
import pytest
if t.TYPE_CHECKING:
from aiida.engine import Process, ProcessBuilder
from aiida.orm import ProcessNode
[docs]
@pytest.fixture(scope='session')
def daemon_client(aiida_profile):
"""Return a daemon client for the configured test profile for the test session.
The daemon will be automatically stopped at the end of the test session.
Usage::
def test(daemon_client):
from aiida.engine.daemon.client import DaemonClient
assert isinstance(daemon_client, DaemonClient)
"""
from aiida.engine.daemon import get_daemon_client
from aiida.engine.daemon.client import DaemonNotRunningException, DaemonTimeoutException
daemon_client = get_daemon_client(aiida_profile.name)
try:
yield daemon_client
finally:
try:
daemon_client.stop_daemon(wait=True)
except DaemonNotRunningException:
pass
# Give an additional grace period by manually waiting for the daemon to be stopped. In certain unit test
# scenarios, the built in wait time in ``daemon_client.stop_daemon`` is not sufficient and even though the
# daemon is stopped, ``daemon_client.is_daemon_running`` will return false for a little bit longer.
daemon_client._await_condition(
lambda: not daemon_client.is_daemon_running,
DaemonTimeoutException('The daemon failed to stop.'),
)
[docs]
@pytest.fixture
def started_daemon_client(daemon_client):
"""Ensure that the daemon is running for the test profile and return the associated client.
Usage::
def test(started_daemon_client):
assert started_daemon_client.is_daemon_running
"""
if not daemon_client.is_daemon_running:
daemon_client.start_daemon()
assert daemon_client.is_daemon_running
yield daemon_client
[docs]
@pytest.fixture
def stopped_daemon_client(daemon_client):
"""Ensure that the daemon is not running for the test profile and return the associated client.
Usage::
def test(stopped_daemon_client):
assert not stopped_daemon_client.is_daemon_running
"""
from aiida.engine.daemon.client import DaemonTimeoutException
if daemon_client.is_daemon_running:
daemon_client.stop_daemon(wait=True)
# Give an additional grace period by manually waiting for the daemon to be stopped. In certain unit test
# scenarios, the built in wait time in ``daemon_client.stop_daemon`` is not sufficient and even though the
# daemon is stopped, ``daemon_client.is_daemon_running`` will return false for a little bit longer.
daemon_client._await_condition(
lambda: not daemon_client.is_daemon_running,
DaemonTimeoutException('The daemon failed to stop.'),
)
yield daemon_client
[docs]
@pytest.fixture
def submit_and_await(started_daemon_client):
"""Return a factory to submit a process and wait for it to achieve the given state.
This fixture automatically loads the ``started_daemon_client`` fixture ensuring the daemon is already running,
therefore it is not necessary to manually start the daemon.
Usage::
def test(submit_and_await):
inputs = {
...
}
node = submit_and_await(SomeProcess, **inputs)
The factory has the following signature:
:param submittable: A process, a process builder or a process node. If it is a process or builder, it is submitted
first before awaiting the desired state.
:param state: The process state to wait for, by default it waits for the submittable to be ``FINISHED``.
:param timeout: The time to wait for the process to achieve the state.
:param kwargs: If the ``submittable`` is a process class, it is instantiated with the ``kwargs`` as inputs.
:raises RuntimeError: If the process fails to achieve the specified state before the timeout expires.
:returns `~aiida.orm.nodes.process.process.ProcessNode`: The process node.
"""
from aiida.engine import ProcessState
def factory(
submittable: 'Process' | 'ProcessBuilder' | 'ProcessNode',
state: ProcessState = ProcessState.FINISHED,
timeout: int = 20,
**kwargs,
):
import inspect
import time
from aiida.engine import Process, ProcessBuilder, submit
from aiida.orm import ProcessNode
if inspect.isclass(submittable) and issubclass(submittable, Process):
node = submit(submittable, **kwargs)
elif isinstance(submittable, ProcessBuilder):
node = submit(submittable)
elif isinstance(submittable, ProcessNode):
node = submittable
else:
raise ValueError(f'type of submittable `{type(submittable)}` is not supported.')
start_time = time.time()
while node.process_state is not state:
if node.is_excepted:
raise RuntimeError(f'The process excepted: {node.exception}')
if time.time() - start_time >= timeout:
daemon_log_file = pathlib.Path(started_daemon_client.daemon_log_file).read_text(encoding='utf-8')
daemon_status = 'running' if started_daemon_client.is_daemon_running else 'stopped'
raise RuntimeError(
f'Timed out waiting for process with state `{node.process_state}` to enter state `{state}`.\n'
f'Daemon <{started_daemon_client.profile.name}|{daemon_status}> log file content: \n'
f'{daemon_log_file}'
)
return node
return factory