aiida.engine package#
Module with all the internals that make up the engine of aiida-core.
Subpackages#
Submodules#
Exceptions that can be thrown by parts of the workflow engine.
- exception aiida.engine.exceptions.PastException[source]#
Bases:
aiida.common.exceptions.AiidaException
Raised when an attempt is made to continue a Process that has already excepted before.
- __annotations__ = {}#
- __module__ = 'aiida.engine.exceptions'#
Top level functions that can be used to launch a Process.
- aiida.engine.launch.run(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) Dict[str, Any] [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns
the outputs of the process
- aiida.engine.launch.run_get_node(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.nodes.process.process.ProcessNode] [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns
tuple of the outputs of the process and the process node
- aiida.engine.launch.run_get_pk(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) aiida.engine.runners.ResultAndPk [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns
tuple of the outputs of the process and process node pk
- aiida.engine.launch.submit(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], **inputs: Any) aiida.orm.nodes.process.process.ProcessNode [source]#
Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
- Parameters
process – the process class, instance or builder to submit
inputs – the inputs to be passed to the process
- Returns
the calculation node of the process
Definition of AiiDA’s process persister and the necessary object loaders.
- class aiida.engine.persistence.AiiDAPersister[source]#
Bases:
plumpy.persistence.Persister
Persister to take saved process instance states and persisting them to the database.
- __abstractmethods__ = frozenset({})#
- __module__ = 'aiida.engine.persistence'#
- _abc_impl = <_abc._abc_data object>#
- delete_checkpoint(pid: Hashable, tag: Optional[str] = None) None [source]#
Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- delete_process_checkpoints(pid: Hashable)[source]#
Delete all persisted checkpoints related to the given process id.
- Parameters
pid – the process id of the
aiida.engine.processes.process.Process
- get_checkpoints()[source]#
Return a list of all the current persisted process checkpoints
- Returns
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- get_process_checkpoints(pid: Hashable)[source]#
Return a list of all the current persisted process checkpoints for the specified process.
- Parameters
pid – the process pid
- Returns
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- load_checkpoint(pid: Hashable, tag: Optional[str] = None) plumpy.persistence.Bundle [source]#
Load a process from a persisted checkpoint by its process id.
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- Returns
a bundle with the process state
- Return type
- Raises
PersistenceError
Raised if there was a problem loading the checkpoint
- save_checkpoint(process: Process, tag: Optional[str] = None)[source]#
Persist a Process instance.
- Parameters
process –
aiida.engine.Process
tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process
- Raises
PersistenceError
Raised if there was a problem saving the checkpoint
- class aiida.engine.persistence.ObjectLoader[source]#
Bases:
plumpy.loaders.DefaultObjectLoader
Custom object loader for aiida-core.
- __abstractmethods__ = frozenset({})#
- __module__ = 'aiida.engine.persistence'#
- _abc_impl = <_abc._abc_data object>#
- load_object(identifier: str) Any [source]#
Attempt to load the object identified by the given identifier.
Note
We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.
- Parameters
identifier – concatenation of module and resource name
- Returns
loaded object
- Raises
ImportError – if the object cannot be loaded
- aiida.engine.persistence.get_object_loader() aiida.engine.persistence.ObjectLoader [source]#
Return the global AiiDA object loader.
- Returns
The global object loader
Runners that can run and submit processes.
- class aiida.engine.runners.ResultAndNode(result, node)[source]#
Bases:
NamedTuple
- __annotations__ = {'node': ForwardRef('ProcessNode'), 'result': ForwardRef('Dict[str, Any]')}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('result', 'node')#
- __module__ = 'aiida.engine.runners'#
- static __new__(_cls, result: Dict[str, Any], node: ProcessNode)#
Create new instance of ResultAndNode(result, node)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {}#
- _fields = ('result', 'node')#
- classmethod _make(iterable)#
Make a new ResultAndNode object from a sequence or iterable
- _replace(**kwds)#
Return a new ResultAndNode object replacing specified fields with new values
- node: aiida.orm.nodes.process.process.ProcessNode#
Alias for field number 1
- class aiida.engine.runners.ResultAndPk(result, pk)[source]#
Bases:
NamedTuple
- __annotations__ = {'pk': ForwardRef('int | None'), 'result': ForwardRef('Dict[str, Any]')}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('result', 'pk')#
- __module__ = 'aiida.engine.runners'#
- static __new__(_cls, result: Dict[str, Any], pk: int | None)#
Create new instance of ResultAndPk(result, pk)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {}#
- _fields = ('result', 'pk')#
- classmethod _make(iterable)#
Make a new ResultAndPk object from a sequence or iterable
- _replace(**kwds)#
Return a new ResultAndPk object replacing specified fields with new values
- class aiida.engine.runners.Runner(poll_interval: Union[int, float] = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, communicator: Optional[kiwipy.communications.Communicator] = None, rmq_submit: bool = False, persister: Optional[plumpy.persistence.Persister] = None)[source]#
Bases:
object
Class that can launch processes by running in the current interpreter or by submitting them to the daemon.
- __annotations__ = {'_closed': 'bool', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_persister': 'Optional[Persister]'}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.runners', '__annotations__': {'_persister': 'Optional[Persister]', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_closed': 'bool'}, '__doc__': 'Class that can launch processes by running in the current interpreter or by submitting them to the daemon.', '_persister': None, '_communicator': None, '_controller': None, '_closed': False, '__init__': <function Runner.__init__>, '__enter__': <function Runner.__enter__>, '__exit__': <function Runner.__exit__>, 'loop': <property object>, 'transport': <property object>, 'persister': <property object>, 'communicator': <property object>, 'plugin_version_provider': <property object>, 'job_manager': <property object>, 'controller': <property object>, 'is_daemon_runner': <property object>, 'is_closed': <function Runner.is_closed>, 'start': <function Runner.start>, 'stop': <function Runner.stop>, 'run_until_complete': <function Runner.run_until_complete>, 'close': <function Runner.close>, 'instantiate_process': <function Runner.instantiate_process>, 'submit': <function Runner.submit>, 'schedule': <function Runner.schedule>, '_run': <function Runner._run>, 'run': <function Runner.run>, 'run_get_node': <function Runner.run_get_node>, 'run_get_pk': <function Runner.run_get_pk>, 'call_on_process_finish': <function Runner.call_on_process_finish>, 'get_process_future': <function Runner.get_process_future>, '_poll_process': <function Runner._poll_process>, '__dict__': <attribute '__dict__' of 'Runner' objects>, '__weakref__': <attribute '__weakref__' of 'Runner' objects>})#
- __enter__() aiida.engine.runners.Runner [source]#
- __init__(poll_interval: Union[int, float] = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, communicator: Optional[kiwipy.communications.Communicator] = None, rmq_submit: bool = False, persister: Optional[plumpy.persistence.Persister] = None)[source]#
Construct a new runner.
- Parameters
poll_interval – interval in seconds between polling for status of active sub processes
loop – an asyncio event loop, if none is suppled a new one will be created
communicator – the communicator to use
rmq_submit – if True, processes will be submitted to RabbitMQ, otherwise they will be scheduled here
persister – the persister to use to persist processes
- __module__ = 'aiida.engine.runners'#
- __weakref__#
list of weak references to the object (if defined)
- _communicator: Optional[kiwipy.communications.Communicator] = None#
- _controller: Optional[plumpy.process_comms.RemoteProcessThreadController] = None#
- _persister: Optional[plumpy.persistence.Persister] = None#
- _poll_process(node, callback)[source]#
Check whether the process state of the node is terminated and call the callback or reschedule it.
- Parameters
node – the process node
callback – callback to be called when process is terminated
- _run(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.nodes.process.process.ProcessNode] [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns
tuple of the outputs of the process and the calculation node
- call_on_process_finish(pk: int, callback: Callable[[], Any]) None [source]#
Schedule a callback when the process of the given pk is terminated.
This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.
- Parameters
pk – pk of the process
callback – function to be called upon process termination
- property communicator: Optional[kiwipy.communications.Communicator]#
Get the communicator used by this runner.
- property controller: Optional[plumpy.process_comms.RemoteProcessThreadController]#
Get the controller used by this runner.
- get_process_future(pk: int) aiida.engine.processes.futures.ProcessFuture [source]#
Return a future for a process.
The future will have the process node as the result when finished.
- Returns
A future representing the completion of the process node
- instantiate_process(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], **inputs)[source]#
- property is_daemon_runner: bool#
Return whether the runner is a daemon runner, which means it submits processes over RabbitMQ.
- Returns
True if the runner is a daemon runner
- property job_manager: aiida.engine.processes.calcjobs.manager.JobManager#
- property loop: asyncio.events.AbstractEventLoop#
Get the event loop of this runner.
- property persister: Optional[plumpy.persistence.Persister]#
Get the persister used by this runner.
- property plugin_version_provider: aiida.plugins.utils.PluginVersionProvider#
- run(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) Dict[str, Any] [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns
the outputs of the process
- run_get_node(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) aiida.engine.runners.ResultAndNode [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns
tuple of the outputs of the process and the calculation node
- run_get_pk(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) aiida.engine.runners.ResultAndPk [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns
tuple of the outputs of the process and process node pk
- run_until_complete(future: _asyncio.Future) Any [source]#
Run the loop until the future has finished and return the result.
- schedule(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], *args: Any, **inputs: Any) aiida.orm.nodes.process.process.ProcessNode [source]#
Schedule a process to be executed by this runner
- Parameters
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns
the calculation node of the process
- submit(process: Union[aiida.engine.processes.process.Process, Type[aiida.engine.processes.process.Process], aiida.engine.processes.builder.ProcessBuilder], **inputs: Any)[source]#
Submit the process with the supplied inputs to this runner immediately returning control to the interpreter. The return value will be the calculation node of the submitted process
- Parameters
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns
the calculation node of the process
- property transport: aiida.engine.transports.TransportQueue#
A transport queue to batch process multiple tasks that require a Transport.
- class aiida.engine.transports.TransportQueue(loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]#
Bases:
object
A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at some point in the future.
Internally the class will wait for a specific interval at the end of which it will open the transport and give it to all the clients that asked for it up to that point. This way opening of transports (a costly operation) can be minimised.
- __dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': '\n A queue to get transport objects from authinfo. This class allows clients\n to register their interest in a transport object which will be provided at\n some point in the future.\n\n Internally the class will wait for a specific interval at the end of which\n it will open the transport and give it to all the clients that asked for it\n up to that point. This way opening of transports (a costly operation) can\n be minimised.\n ', '__init__': <function TransportQueue.__init__>, 'loop': <property object>, 'request_transport': <function TransportQueue.request_transport>, '__dict__': <attribute '__dict__' of 'TransportQueue' objects>, '__weakref__': <attribute '__weakref__' of 'TransportQueue' objects>, '__annotations__': {'_transport_requests': 'Dict[Hashable, TransportRequest]'}})#
- __init__(loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]#
- Parameters
loop – An asyncio event, will use asyncio.get_event_loop() if not supplied
- __module__ = 'aiida.engine.transports'#
- __weakref__#
list of weak references to the object (if defined)
- property loop: asyncio.events.AbstractEventLoop#
Get the loop being used by this transport queue
- request_transport(authinfo: aiida.orm.authinfos.AuthInfo) Iterator[Awaitable[Transport]] [source]#
Request a transport from an authinfo. Because the client is not allowed to request a transport immediately they will instead be given back a future that can be awaited to get the transport:
async def transport_task(transport_queue, authinfo): with transport_queue.request_transport(authinfo) as request: transport = await request # Do some work with the transport
- Parameters
authinfo – The authinfo to be used to get transport
- Returns
A future that can be yielded to give the transport
- class aiida.engine.transports.TransportRequest[source]#
Bases:
object
Information kept about request for a transport object
- __dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': ' Information kept about request for a transport object ', '__init__': <function TransportRequest.__init__>, '__dict__': <attribute '__dict__' of 'TransportRequest' objects>, '__weakref__': <attribute '__weakref__' of 'TransportRequest' objects>, '__annotations__': {'future': 'asyncio.Future'}})#
- __module__ = 'aiida.engine.transports'#
- __weakref__#
list of weak references to the object (if defined)
Utilities for the workflow engine.
- class aiida.engine.utils.InterruptableFuture(*, loop=None)[source]#
Bases:
_asyncio.Future
A future that can be interrupted by calling interrupt.
- __module__ = 'aiida.engine.utils'#
- interrupt(reason: Exception) None [source]#
This method should be called to interrupt the coroutine represented by this InterruptableFuture.
- async with_interrupt(coro: Awaitable[Any]) Any [source]#
return result of a coroutine which will be interrupted if this future is interrupted
import asyncio loop = asyncio.get_event_loop() interruptable = InterutableFuture() loop.call_soon(interruptable.interrupt, RuntimeError("STOP")) loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.))) >>> RuntimeError: STOP
- Parameters
coro – The coroutine that can be interrupted
- Returns
The result of the coroutine
- aiida.engine.utils.ensure_coroutine(fct: Callable[[...], Any]) Callable[[...], Awaitable[Any]] [source]#
Ensure that the given function
fct
is a coroutineIf the passed function is not already a coroutine, it will be made to be a coroutine
- Parameters
fct – the function
- Returns
the coroutine
- async aiida.engine.utils.exponential_backoff_retry(fct: Callable[[...], Any], initial_interval: Union[int, float] = 10.0, max_attempts: int = 5, logger: Optional[logging.Logger] = None, ignore_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None) Any [source]#
Coroutine to call a function, recalling it with an exponential backoff in the case of an exception
This coroutine will loop
max_attempts
times, calling thefct
function, breaking immediately when the call finished without raising an exception, at which point the result will be returned. If an exception is caught, the function will await aasyncio.sleep
with a time interval equal to theinitial_interval
multiplied by2 ** (N - 1)
whereN
is the number of excepted calls.- Parameters
fct – the function to call, which will be turned into a coroutine first if it is not already
initial_interval – the time to wait after the first caught exception before calling the coroutine again
max_attempts – the maximum number of times to call the coroutine before re-raising the exception
ignore_exceptions – exceptions to ignore, i.e. when caught do nothing and simply re-raise
- Returns
result if the
coro
call completes withinmax_attempts
retries without raising
- aiida.engine.utils.get_process_state_change_timestamp(process_type: Optional[str] = None) Optional[datetime.datetime] [source]#
Get the global setting that reflects the last time a process of the given process type changed its state. The returned value will be the corresponding timestamp or None if the setting does not exist.
- Parameters
process_type – optional process type for which to get the latest state change timestamp. Valid process types are either ‘calculation’ or ‘work’. If not specified, last timestamp for all known process types will be returned.
- Returns
a timestamp or None
- aiida.engine.utils.instantiate_process(runner: Runner, process: Union[Process, Type[Process], ProcessBuilder], **inputs) Process [source]#
Return an instance of the process with the given inputs. The function can deal with various types of the process:
Process instance: will simply return the instance
ProcessBuilder instance: will instantiate the Process from the class and inputs defined within it
Process class: will instantiate with the specified inputs
If anything else is passed, a ValueError will be raised
- Parameters
process – Process instance or class, CalcJobNode class or ProcessBuilder instance
inputs – the inputs for the process to be instantiated with
- aiida.engine.utils.interruptable_task(coro: Callable[[aiida.engine.utils.InterruptableFuture], Awaitable[Any]], loop: Optional[asyncio.events.AbstractEventLoop] = None) aiida.engine.utils.InterruptableFuture [source]#
Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.
- Parameters
coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter
loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()
- Returns
an InterruptableFuture
- aiida.engine.utils.is_process_function(function: Any) bool [source]#
Return whether the given function is a process function
- Parameters
function – a function
- Returns
True if the function is a wrapped process function, False otherwise
- aiida.engine.utils.is_process_scoped() bool [source]#
Return whether the current scope is within a process.
- Returns
True if the current scope is within a nested process, False otherwise
- aiida.engine.utils.loop_scope(loop) Iterator[None] [source]#
Make an event loop current for the scope of the context
- Parameters
loop – The event loop to make current for the duration of the scope
- aiida.engine.utils.set_process_state_change_timestamp(process: Process) None [source]#
Set the global setting that reflects the last time a process changed state, for the process type of the given process, to the current timestamp. The process type will be determined based on the class of the calculation node it has as its database container.
- Parameters
process – the Process instance that changed its state