aiida.engine package#

Module with all the internals that make up the engine of aiida-core.

Subpackages#

Submodules#

Exceptions that can be thrown by parts of the workflow engine.

exception aiida.engine.exceptions.PastException[源代码]#

基类:AiidaException

Raised when an attempt is made to continue a Process that has already excepted before.

__annotations__ = {}#
__module__ = 'aiida.engine.exceptions'#

Top level functions that can be used to launch a Process.

aiida.engine.launch.await_processes(nodes: Sequence[ProcessNode], wait_interval: int = 1) None[源代码]#

Run a loop until all processes are terminated.

参数:
  • nodes – Sequence of nodes that represent the processes to await.

  • wait_interval – The interval between each iteration of checking the status of all processes.

aiida.engine.launch.run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any][源代码]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

参数:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

返回:

the outputs of the process

aiida.engine.launch.run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) tuple[dict[str, Any], ProcessNode][源代码]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

参数:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

返回:

tuple of the outputs of the process and the process node

aiida.engine.launch.run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk[源代码]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

参数:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

返回:

tuple of the outputs of the process and process node pk

aiida.engine.launch.submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, *, wait: bool = False, wait_interval: int = 5, **kwargs: Any) ProcessNode[源代码]#

Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.

参数:
  • process – the process class, instance or builder to submit

  • inputs – the input dictionary to be passed to the process

  • wait – when set to True, the submission will be blocking and wait for the process to complete at which point the function returns the calculation node.

  • wait_interval – the number of seconds to wait between checking the state of the process when wait=True.

  • kwargs – inputs to be passed to the process. This is an alternative to the positional inputs argument.

返回:

the calculation node of the process

Definition of AiiDA’s process persister and the necessary object loaders.

class aiida.engine.persistence.AiiDAPersister[源代码]#

基类:Persister

Persister to take saved process instance states and persisting them to the database.

__abstractmethods__ = frozenset({})#
__module__ = 'aiida.engine.persistence'#
_abc_impl = <_abc._abc_data object>#
delete_checkpoint(pid: Hashable, tag: str | None = None) None[源代码]#

Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.

参数:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

delete_process_checkpoints(pid: Hashable)[源代码]#

Delete all persisted checkpoints related to the given process id.

参数:

pid – the process id of the aiida.engine.processes.process.Process

get_checkpoints()[源代码]#

Return a list of all the current persisted process checkpoints

返回:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

get_process_checkpoints(pid: Hashable)[源代码]#

Return a list of all the current persisted process checkpoints for the specified process.

参数:

pid – the process pid

返回:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

load_checkpoint(pid: Hashable, tag: str | None = None) Bundle[源代码]#

Load a process from a persisted checkpoint by its process id.

参数:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

返回:

a bundle with the process state

返回类型:

plumpy.Bundle

Raises:

PersistenceError Raised if there was a problem loading the checkpoint

save_checkpoint(process: Process, tag: str | None = None)[源代码]#

Persist a Process instance.

参数:
  • processaiida.engine.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

Raises:

PersistenceError Raised if there was a problem saving the checkpoint

class aiida.engine.persistence.ObjectLoader[源代码]#

基类:DefaultObjectLoader

Custom object loader for aiida-core.

__abstractmethods__ = frozenset({})#
__module__ = 'aiida.engine.persistence'#
_abc_impl = <_abc._abc_data object>#
load_object(identifier: str) Any[源代码]#

Attempt to load the object identified by the given identifier.

备注

We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.

参数:

identifier – concatenation of module and resource name

返回:

loaded object

抛出:

ImportError – if the object cannot be loaded

aiida.engine.persistence.get_object_loader() ObjectLoader[源代码]#

Return the global AiiDA object loader.

返回:

The global object loader

Runners that can run and submit processes.

class aiida.engine.runners.ResultAndNode(result, node)[源代码]#

基类:NamedTuple

__annotations__ = {'node': ForwardRef('ProcessNode'), 'result': ForwardRef('Dict[str, Any]')}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('result', 'node')#
__module__ = 'aiida.engine.runners'#
static __new__(_cls, result: Dict[str, Any], node: ProcessNode)#

Create new instance of ResultAndNode(result, node)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('result', 'node')#
classmethod _make(iterable)#

Make a new ResultAndNode object from a sequence or iterable

_replace(**kwds)#

Return a new ResultAndNode object replacing specified fields with new values

node: ProcessNode#

Alias for field number 1

result: Dict[str, Any]#

Alias for field number 0

class aiida.engine.runners.ResultAndPk(result, pk)[源代码]#

基类:NamedTuple

__annotations__ = {'pk': ForwardRef('int | None'), 'result': ForwardRef('Dict[str, Any]')}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('result', 'pk')#
__module__ = 'aiida.engine.runners'#
static __new__(_cls, result: Dict[str, Any], pk: int | None)#

Create new instance of ResultAndPk(result, pk)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('result', 'pk')#
classmethod _make(iterable)#

Make a new ResultAndPk object from a sequence or iterable

_replace(**kwds)#

Return a new ResultAndPk object replacing specified fields with new values

pk: int | None#

Alias for field number 1

result: Dict[str, Any]#

Alias for field number 0

class aiida.engine.runners.Runner(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[源代码]#

基类:object

Class that can launch processes by running in the current interpreter or by submitting them to the daemon.

__annotations__ = {'_closed': 'bool', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_persister': 'Optional[Persister]'}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.runners', '__annotations__': {'_persister': 'Optional[Persister]', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_closed': 'bool'}, '__doc__': 'Class that can launch processes by running in the current interpreter or by submitting them to the daemon.', '_persister': None, '_communicator': None, '_controller': None, '_closed': False, '__init__': <function Runner.__init__>, '__enter__': <function Runner.__enter__>, '__exit__': <function Runner.__exit__>, 'loop': <property object>, 'transport': <property object>, 'persister': <property object>, 'communicator': <property object>, 'plugin_version_provider': <property object>, 'job_manager': <property object>, 'controller': <property object>, 'is_daemon_runner': <property object>, 'is_closed': <function Runner.is_closed>, 'start': <function Runner.start>, 'stop': <function Runner.stop>, 'run_until_complete': <function Runner.run_until_complete>, 'close': <function Runner.close>, 'instantiate_process': <function Runner.instantiate_process>, 'submit': <function Runner.submit>, 'schedule': <function Runner.schedule>, '_run': <function Runner._run>, 'run': <function Runner.run>, 'run_get_node': <function Runner.run_get_node>, 'run_get_pk': <function Runner.run_get_pk>, 'call_on_process_finish': <function Runner.call_on_process_finish>, 'get_process_future': <function Runner.get_process_future>, '_poll_process': <function Runner._poll_process>, '__dict__': <attribute '__dict__' of 'Runner' objects>, '__weakref__': <attribute '__weakref__' of 'Runner' objects>})#
__enter__() Runner[源代码]#
__exit__(exc_type, exc_val, exc_tb)[源代码]#
__init__(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[源代码]#

Construct a new runner.

参数:
  • poll_interval – interval in seconds between polling for status of active sub processes

  • loop – an asyncio event loop, if none is suppled a new one will be created

  • communicator – the communicator to use

  • broker_submit – if True, processes will be submitted to the broker, otherwise they will be scheduled here

  • persister – the persister to use to persist processes

__module__ = 'aiida.engine.runners'#
__weakref__#

list of weak references to the object (if defined)

_closed: bool = False#
_communicator: Communicator | None = None#
_controller: RemoteProcessThreadController | None = None#
_persister: Persister | None = None#
_poll_process(node, callback)[源代码]#

Check whether the process state of the node is terminated and call the callback or reschedule it.

参数:
  • node – the process node

  • callback – callback to be called when process is terminated

_run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Tuple[Dict[str, Any], ProcessNode][源代码]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

参数:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

返回:

tuple of the outputs of the process and the calculation node

call_on_process_finish(pk: int, callback: Callable[[], Any]) None[源代码]#

Schedule a callback when the process of the given pk is terminated.

This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.

参数:
  • pk – pk of the process

  • callback – function to be called upon process termination

close() None[源代码]#

Close the runner by stopping the loop.

property communicator: Communicator | None#

Get the communicator used by this runner.

property controller: RemoteProcessThreadController | None#

Get the controller used by this runner.

get_process_future(pk: int) ProcessFuture[源代码]#

Return a future for a process.

The future will have the process node as the result when finished.

返回:

A future representing the completion of the process node

instantiate_process(process: Process | Type[Process] | ProcessBuilder, **inputs)[源代码]#
is_closed() bool[源代码]#
property is_daemon_runner: bool#

Return whether the runner is a daemon runner, which means it submits processes over a broker.

返回:

True if the runner is a daemon runner

property job_manager: JobManager#
property loop: AbstractEventLoop#

Get the event loop of this runner.

property persister: Persister | None#

Get the persister used by this runner.

property plugin_version_provider: PluginVersionProvider#
run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Dict[str, Any][源代码]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

参数:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

返回:

the outputs of the process

run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndNode[源代码]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

参数:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

返回:

tuple of the outputs of the process and the calculation node

run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk[源代码]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

参数:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

返回:

tuple of the outputs of the process and process node pk

run_until_complete(future: Future) Any[源代码]#

Run the loop until the future has finished and return the result.

schedule(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ProcessNode[源代码]#

Schedule a process to be executed by this runner.

参数:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

返回:

the calculation node of the process

start() None[源代码]#

Start the internal event loop.

stop() None[源代码]#

Stop the internal event loop.

submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any)[源代码]#

Submit the process with the supplied inputs to this runner immediately returning control to the interpreter.

The return value will be the calculation node of the submitted process

参数:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

返回:

the calculation node of the process

property transport: TransportQueue#

A transport queue to batch process multiple tasks that require a Transport.

class aiida.engine.transports.TransportQueue(loop: AbstractEventLoop | None = None)[源代码]#

基类:object

A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at some point in the future.

Internally the class will wait for a specific interval at the end of which it will open the transport and give it to all the clients that asked for it up to that point. This way opening of transports (a costly operation) can be minimised.

__dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'A queue to get transport objects from authinfo.  This class allows clients\n    to register their interest in a transport object which will be provided at\n    some point in the future.\n\n    Internally the class will wait for a specific interval at the end of which\n    it will open the transport and give it to all the clients that asked for it\n    up to that point.  This way opening of transports (a costly operation) can\n    be minimised.\n    ', '__init__': <function TransportQueue.__init__>, 'loop': <property object>, 'request_transport': <function TransportQueue.request_transport>, '__dict__': <attribute '__dict__' of 'TransportQueue' objects>, '__weakref__': <attribute '__weakref__' of 'TransportQueue' objects>, '__annotations__': {'_transport_requests': 'Dict[Hashable, TransportRequest]'}})#
__init__(loop: AbstractEventLoop | None = None)[源代码]#
参数:

loop – An asyncio event, will use asyncio.get_event_loop() if not supplied

__module__ = 'aiida.engine.transports'#
__weakref__#

list of weak references to the object (if defined)

property loop: AbstractEventLoop#

Get the loop being used by this transport queue

request_transport(authinfo: AuthInfo) Iterator[Awaitable[Transport]][源代码]#

Request a transport from an authinfo. Because the client is not allowed to request a transport immediately they will instead be given back a future that can be awaited to get the transport:

async def transport_task(transport_queue, authinfo):
    with transport_queue.request_transport(authinfo) as request:
        transport = await request
        # Do some work with the transport
参数:

authinfo – The authinfo to be used to get transport

返回:

A future that can be yielded to give the transport

class aiida.engine.transports.TransportRequest[源代码]#

基类:object

Information kept about request for a transport object

__dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'Information kept about request for a transport object', '__init__': <function TransportRequest.__init__>, '__dict__': <attribute '__dict__' of 'TransportRequest' objects>, '__weakref__': <attribute '__weakref__' of 'TransportRequest' objects>, '__annotations__': {'future': 'asyncio.Future'}})#
__init__()[源代码]#
__module__ = 'aiida.engine.transports'#
__weakref__#

list of weak references to the object (if defined)

Utilities for the workflow engine.

class aiida.engine.utils.InterruptableFuture(*, loop=None)[源代码]#

基类:Future

A future that can be interrupted by calling interrupt.

__module__ = 'aiida.engine.utils'#
interrupt(reason: Exception) None[源代码]#

This method should be called to interrupt the coroutine represented by this InterruptableFuture.

async with_interrupt(coro: Awaitable[Any]) Any[源代码]#

Return result of a coroutine which will be interrupted if this future is interrupted

import asyncio
loop = asyncio.get_event_loop()

interruptable = InterutableFuture()
loop.call_soon(interruptable.interrupt, RuntimeError("STOP"))
loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.)))
>>> RuntimeError: STOP
参数:

coro – The coroutine that can be interrupted

返回:

The result of the coroutine

aiida.engine.utils.ensure_coroutine(fct: Callable[[...], Any]) Callable[[...], Awaitable[Any]][源代码]#

Ensure that the given function fct is a coroutine

If the passed function is not already a coroutine, it will be made to be a coroutine

参数:

fct – the function

返回:

the coroutine

async aiida.engine.utils.exponential_backoff_retry(fct: Callable[[...], Any], initial_interval: int | float = 10.0, max_attempts: int = 5, logger: Logger | None = None, ignore_exceptions: None | Type[Exception] | Tuple[Type[Exception], ...] = None) Any[源代码]#

Coroutine to call a function, recalling it with an exponential backoff in the case of an exception

This coroutine will loop max_attempts times, calling the fct function, breaking immediately when the call finished without raising an exception, at which point the result will be returned. If an exception is caught, the function will await a asyncio.sleep with a time interval equal to the initial_interval multiplied by 2 ** (N - 1) where N is the number of excepted calls.

参数:
  • fct – the function to call, which will be turned into a coroutine first if it is not already

  • initial_interval – the time to wait after the first caught exception before calling the coroutine again

  • max_attempts – the maximum number of times to call the coroutine before re-raising the exception

  • ignore_exceptions – exceptions to ignore, i.e. when caught do nothing and simply re-raise

返回:

result if the coro call completes within max_attempts retries without raising

aiida.engine.utils.get_process_state_change_timestamp(process_type: str | None = None) datetime | None[源代码]#

Get the global setting that reflects the last time a process of the given process type changed its state. The returned value will be the corresponding timestamp or None if the setting does not exist.

参数:

process_type – optional process type for which to get the latest state change timestamp. Valid process types are either ‘calculation’ or ‘work’. If not specified, last timestamp for all known process types will be returned.

返回:

a timestamp or None

aiida.engine.utils.instantiate_process(runner: Runner, process: 'Process' | Type['Process'] | 'ProcessBuilder', **inputs) Process[源代码]#

Return an instance of the process with the given inputs. The function can deal with various types of the process:

  • Process instance: will simply return the instance

  • ProcessBuilder instance: will instantiate the Process from the class and inputs defined within it

  • Process class: will instantiate with the specified inputs

If anything else is passed, a ValueError will be raised

参数:
  • process – Process instance or class, CalcJobNode class or ProcessBuilder instance

  • inputs – the inputs for the process to be instantiated with

aiida.engine.utils.interruptable_task(coro: Callable[[InterruptableFuture], Awaitable[Any]], loop: AbstractEventLoop | None = None) InterruptableFuture[源代码]#

Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.

参数:
  • coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter

  • loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()

返回:

an InterruptableFuture

aiida.engine.utils.is_process_function(function: Any) bool[源代码]#

Return whether the given function is a process function

参数:

function – a function

返回:

True if the function is a wrapped process function, False otherwise

aiida.engine.utils.is_process_scoped() bool[源代码]#

Return whether the current scope is within a process.

返回:

True if the current scope is within a nested process, False otherwise

aiida.engine.utils.loop_scope(loop) Iterator[None][源代码]#

Make an event loop current for the scope of the context

参数:

loop – The event loop to make current for the duration of the scope

aiida.engine.utils.prepare_inputs(inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any][源代码]#

Prepare inputs for launch of a process.

This is a utility function to pre-process inputs for the process that can be specified both through keyword arguments as well as through the explicit inputs argument. When both are specified, a ValueError is raised.

参数:
  • inputs – Inputs dictionary.

  • kwargs – Inputs defined as keyword arguments.

抛出:

ValueError – If both kwargs and inputs are defined.

返回:

The dictionary of inputs for the process.

aiida.engine.utils.set_process_state_change_timestamp(process: Process) None[源代码]#

Set the global setting that reflects the last time a process changed state, for the process type of the given process, to the current timestamp. The process type will be determined based on the class of the calculation node it has as its database container.

参数:

process – the Process instance that changed its state