###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Top level functions that can be used to launch a Process."""
from __future__ import annotations
import time
import typing as t
from aiida.common import InvalidOperation
from aiida.common.lang import type_check
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import manager
from aiida.orm import ProcessNode
from .processes.builder import ProcessBuilder
from .processes.functions import FunctionProcess
from .processes.process import Process
from .runners import ResultAndPk
from .utils import instantiate_process, is_process_scoped, prepare_inputs
__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit', 'await_processes')
TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder]
# run can also be process function, but it is not clear what type this should be
TYPE_SUBMIT_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder]
LOGGER = AIIDA_LOGGER.getChild('engine.launch')
[文档]
def run(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> dict[str, t.Any]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class or process function to run
:param inputs: the inputs to be passed to the process
:return: the outputs of the process
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()
return runner.run(process, inputs, **kwargs)
[文档]
def run_get_node(
process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any
) -> tuple[dict[str, t.Any], ProcessNode]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and the process node
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()
return runner.run_get_node(process, inputs, **kwargs)
[文档]
def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> ResultAndPk:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and process node pk
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()
return runner.run_get_pk(process, inputs, **kwargs)
[文档]
def submit(
process: TYPE_SUBMIT_PROCESS,
inputs: dict[str, t.Any] | None = None,
*,
wait: bool = False,
wait_interval: int = 5,
**kwargs: t.Any,
) -> ProcessNode:
"""Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
.. warning: this should not be used within another process. Instead, there one should use the ``submit`` method of
the wrapping process itself, i.e. use ``self.submit``.
.. warning: submission of processes requires ``store_provenance=True``.
:param process: the process class, instance or builder to submit
:param inputs: the input dictionary to be passed to the process
:param wait: when set to ``True``, the submission will be blocking and wait for the process to complete at which
point the function returns the calculation node.
:param wait_interval: the number of seconds to wait between checking the state of the process when ``wait=True``.
:param kwargs: inputs to be passed to the process. This is an alternative to the positional ``inputs`` argument.
:return: the calculation node of the process
"""
inputs = prepare_inputs(inputs, **kwargs)
# Submitting from within another process requires ``self.submit``` unless it is a work function, in which case the
# current process in the scope should be an instance of ``FunctionProcess``.
if is_process_scoped() and not isinstance(Process.current(), FunctionProcess):
raise InvalidOperation('Cannot use top-level `submit` from within another process, use `self.submit` instead')
runner = manager.get_manager().get_runner()
assert runner.persister is not None, 'runner does not have a persister'
assert runner.controller is not None, 'runner does not have a controller'
process_inited = instantiate_process(runner, process, **inputs)
# If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this
# instead of raising, because in this way the user does not have to change the launcher when testing. The same goes
# for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation.
if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs:
_, node = run_get_node(process_inited)
return node
if not process_inited.metadata.store_provenance:
raise InvalidOperation('cannot submit a process with `store_provenance=False`')
runner.persister.save_checkpoint(process_inited)
process_inited.close()
# Do not wait for the future's result, because in the case of a single worker this would cock-block itself
runner.controller.continue_process(process_inited.pid, nowait=False, no_reply=True)
node = process_inited.node
if not wait:
return node
while not node.is_terminated:
LOGGER.report(
f'Process<{node.pk}> has not yet terminated, current state is `{node.process_state}`. '
f'Waiting for {wait_interval} seconds.'
)
time.sleep(wait_interval)
return node
[文档]
def await_processes(nodes: t.Sequence[ProcessNode], wait_interval: int = 1) -> None:
"""Run a loop until all processes are terminated.
:param nodes: Sequence of nodes that represent the processes to await.
:param wait_interval: The interval between each iteration of checking the status of all processes.
"""
type_check(nodes, (list, tuple))
if any(not isinstance(node, ProcessNode) for node in nodes):
raise TypeError(f'`nodes` should be a list of `ProcessNode`s but got: {nodes}')
start_time = time.time()
terminated = False
while not terminated:
running = [not node.is_terminated for node in nodes]
terminated = not any(running)
seconds_passed = time.time() - start_time
LOGGER.report(f'{running.count(False)} out of {len(nodes)} processes terminated. [{round(seconds_passed)} s]')
time.sleep(wait_interval)
# Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves
run.get_node = run_get_node # type: ignore[attr-defined]
run.get_pk = run_get_pk # type: ignore[attr-defined]