aiida.engine.processes package#

Module for processes and related utilities.

Subpackages#

Submodules#

Convenience classes to help building the input dictionaries for Processes.

class aiida.engine.processes.builder.PrettyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: JSONEncoder

JSON encoder for returning a pretty representation of an AiiDA ProcessBuilder.

__module__ = 'aiida.engine.processes.builder'#
default(o)[source]#

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class aiida.engine.processes.builder.ProcessBuilder(process_class: Type[Process])[source]#

Bases: ProcessBuilderNamespace

A process builder that helps setting up the inputs for creating a new process.

__abstractmethods__ = frozenset({})#
__annotations__ = {}#
__init__(process_class: Type[Process])[source]#

Construct a ProcessBuilder instance for the given Process class.

Parameters:

process_class – the Process subclass

__module__ = 'aiida.engine.processes.builder'#
_abc_impl = <_abc._abc_data object>#
_repr_pretty_(p, _) str[source]#

Pretty representation for in the IPython console and notebooks.

property process_class: Type[Process]#

Return the process class for which this builder is constructed.

class aiida.engine.processes.builder.ProcessBuilderNamespace(port_namespace: PortNamespace)[source]#

Bases: MutableMapping

Input namespace for the ProcessBuilder.

Dynamically generates the getters and setters for the input ports of a given PortNamespace

__abstractmethods__ = frozenset({})#
__annotations__ = {}#
__delattr__(item)[source]#

Implement delattr(self, name).

__delitem__(item)[source]#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.builder', '__doc__': 'Input namespace for the `ProcessBuilder`.\n\n    Dynamically generates the getters and setters for the input ports of a given PortNamespace\n    ', '__init__': <function ProcessBuilderNamespace.__init__>, '__setattr__': <function ProcessBuilderNamespace.__setattr__>, '__repr__': <function ProcessBuilderNamespace.__repr__>, '__dir__': <function ProcessBuilderNamespace.__dir__>, '__iter__': <function ProcessBuilderNamespace.__iter__>, '__len__': <function ProcessBuilderNamespace.__len__>, '__getitem__': <function ProcessBuilderNamespace.__getitem__>, '__setitem__': <function ProcessBuilderNamespace.__setitem__>, '__delitem__': <function ProcessBuilderNamespace.__delitem__>, '__delattr__': <function ProcessBuilderNamespace.__delattr__>, '_recursive_merge': <function ProcessBuilderNamespace._recursive_merge>, '_merge': <function ProcessBuilderNamespace._merge>, '_update': <function ProcessBuilderNamespace._update>, '_inputs': <function ProcessBuilderNamespace._inputs>, '__dict__': <attribute '__dict__' of 'ProcessBuilderNamespace' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessBuilderNamespace' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>, '__annotations__': {}})#
__dir__()[source]#

Default dir() implementation.

__getitem__(item)[source]#
__init__(port_namespace: PortNamespace) None[source]#

Dynamically construct the get and set properties for the ports of the given port namespace.

For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.

Parameters:

port_namespace – the inputs PortNamespace for which to construct the builder

__iter__()[source]#
__len__()[source]#
__module__ = 'aiida.engine.processes.builder'#
__repr__()[source]#

Return repr(self).

__setattr__(attr: str, value: Any) None[source]#

Assign the given value to the port with key attr.

Note

Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec

__setitem__(item, value)[source]#
__weakref__#

list of weak references to the object (if defined)

_abc_impl = <_abc._abc_data object>#
_inputs(prune: bool = False) dict[source]#

Return the entire mapping of inputs specified for this builder.

Parameters:

prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever

Returns:

mapping of inputs ports and their input values.

_merge(*args, **kwds)[source]#

Merge the content of a dictionary or keyword arguments in .

Note

This method differs in behavior from _update in that _merge will recursively update the existing dictionary with the one that is specified in the arguments. The _update method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.

The method is prefixed with an underscore in order to not reserve the name for a potential port.

Parameters:
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

_recursive_merge(dictionary, key, value)[source]#

Recursively merge the contents of dictionary setting its key to value.

_update(*args, **kwds)[source]#

Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.

The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.

Parameters:
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

Functions to control and interact with running processes.

exception aiida.engine.processes.control.ProcessTimeoutException[source]#

Bases: AiidaException

Raised when action to communicate with a process times out.

__annotations__ = {}#
__module__ = 'aiida.engine.processes.control'#
aiida.engine.processes.control._perform_actions(processes: list[ProcessNode], action: Callable, infinitive: str, present: str, timeout: float | None = None, wait: bool = False, **kwargs: Any) None[source]#

Perform an action on a list of processes.

Parameters:
  • processes – The list of processes to perform the action on.

  • action – The action to perform.

  • infinitive – The infinitive of the verb that represents the action.

  • present – The present tense of the verb that represents the action.

  • past – The past tense of the verb that represents the action.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

  • kwargs – Keyword arguments that will be passed to the method action.

Raises:

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control._resolve_futures(futures: dict[Future, ProcessNode], infinitive: str, present: str, wait: bool = False, timeout: float | None = None) None[source]#

Process a mapping of futures representing an action on an active process.

This function will echo the correct information strings based on the outcomes of the futures and the given verb conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a timeout to put a maximum wait time on the actions.

Parameters:
  • futures – The map of action futures and the corresponding processes.

  • infinitive – The infinitive form of the action verb.

  • present – The present tense form of the action verb.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

aiida.engine.processes.control.get_active_processes(paused: bool = False, project: str | list[str] = '*') list[ProcessNode] | list[Any][source]#

Return all active processes, i.e., those with a process state of created, waiting or running.

Parameters:
  • paused – Boolean, if True, filter for processes that are paused.

  • project – Single or list of properties to project. By default projects the entire node.

Returns:

A list of process nodes of active processes.

aiida.engine.processes.control.get_process_tasks(broker: Broker) list[int][source]#

Return the list of process pks that have a process task in the RabbitMQ process queue.

Returns:

A list of process pks that have a corresponding process task with RabbitMQ.

aiida.engine.processes.control.iterate_process_tasks(broker: Broker) collections.abc.Iterator[kiwipy.rmq.RmqIncomingTask][source]#

Return the list of process pks that have a process task in the RabbitMQ process queue.

Returns:

A list of process pks that have a corresponding process task with RabbitMQ.

aiida.engine.processes.control.kill_processes(processes: list[ProcessNode] | None = None, *, message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Kill running processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters:
  • processes – List of processes to play.

  • all_entries – Kill all active processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises:

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.pause_processes(processes: list[ProcessNode] | None = None, *, message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Pause running processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters:
  • processes – List of processes to play.

  • all_entries – Pause all playing processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises:

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.play_processes(processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Play (unpause) paused processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters:
  • processes – List of processes to play.

  • all_entries – Play all paused processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises:

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.revive_processes(processes: list[ProcessNode], *, wait: bool = False) None[source]#

Revive processes that seem stuck and are no longer reachable.

Warning: Use only as a last resort after you’ve gone through the checklist below.

  1. Does verdi status indicate that both daemon and RabbitMQ are running properly? If not, restart the daemon with verdi daemon restart and restart RabbitMQ.

  2. Try to play the process through play_processes. If a ProcessTimeoutException is raised use this method to attempt to revive it.

Details: When RabbitMQ loses the process task before the process has completed, the process is never picked up by the daemon and will remain “stuck”. This method recreates the task, which can lead to multiple instances of the task being executed and should thus be used with caution.

Note

Requires the daemon to be running.

Parameters:
  • processes – List of processes to revive.

  • wait – Set to True to wait for a response, for False the action is fire-and-forget.

A namedtuple and namespace for ExitCodes that can be used to exit from Processes.

class aiida.engine.processes.exit_code.ExitCode(status: int = 0, message: str | None = None, invalidates_cache: bool = False)[source]#

Bases: NamedTuple

A simple data class to define an exit code for a Process.

When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.

Parameters:
  • status – positive integer exit status, where a non-zero value indicated the process failed, default is 0

  • message – optional message with more details about the failure mode

  • invalidates_cache – optional flag, indicating that a process should not be used in caching

__annotations__ = {'invalidates_cache': <class 'bool'>, 'message': typing.Optional[str], 'status': <class 'int'>}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('status', 'message', 'invalidates_cache')#
__module__ = 'aiida.engine.processes.exit_code'#
static __new__(_cls, status: int = 0, message: str | None = None, invalidates_cache: bool = False)#

Create new instance of ExitCode(status, message, invalidates_cache)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {'invalidates_cache': False, 'message': None, 'status': 0}#
_fields = ('status', 'message', 'invalidates_cache')#
classmethod _make(iterable)#

Make a new ExitCode object from a sequence or iterable

_replace(**kwds)#

Return a new ExitCode object replacing specified fields with new values

format(**kwargs: str) ExitCode[source]#

Create a clone of this exit code where the template message is replaced by the keyword arguments.

Parameters:

kwargs – replacement parameters for the template message

invalidates_cache: bool#

Alias for field number 2

message: str | None#

Alias for field number 1

status: int#

Alias for field number 0

class aiida.engine.processes.exit_code.ExitCodesNamespace(dictionary=None)[source]#

Bases: AttributeDict

A namespace of ExitCode instances that can be accessed through getattr as well as getitem.

Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.

__annotations__ = {}#
__call__(identifier: int | str) ExitCode[source]#

Return a specific exit code identified by either its exit status or label.

Parameters:

identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label

Returns:

an ExitCode instance

Raises:

ValueError – if no exit code with the given label is defined for this process

__module__ = 'aiida.engine.processes.exit_code'#

Class and decorators to generate processes out of simple python functions.

class aiida.engine.processes.functions.FunctionProcess(*args: Any, **kwargs: Any)[source]#

Bases: Process

Function process class used for turning functions into a Process

__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_func_args': 't.Sequence[str]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]', '_var_keyword': 'str | None', '_var_positional': 'str | None'}#
__init__(*args, **kwargs) None[source]#

Process constructor.

Parameters:
  • inputs – process inputs

  • logger – aiida logger

  • runner – process runner

  • parent_pid – id of parent process

  • enable_persistence – whether to persist this process

__module__ = 'aiida.engine.processes.functions'#
_abc_impl = <_abc._abc_data object>#
static _func(*_args, **_kwargs) dict[source]#

This is used internally to store the actual function that is being wrapped and will be replaced by the build method.

_func_args: t.Sequence[str] = ()#
_setup_db_record() None[source]#

Set up the database record for the process.

_var_keyword: str | None = None#
_var_positional: str | None = None#
static build(func: FunctionType, node_class: Type[ProcessNode]) Type[FunctionProcess][source]#

Build a Process from the given function.

All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.

Parameters:
  • func – The function to build a process from

  • node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin FunctionCalculationMixin.

Returns:

A Process class that represents the function

classmethod create_inputs(*args: Any, **kwargs: Any) dict[str, Any][source]#

Create the input dictionary for the FunctionProcess.

execute() dict[str, Any] | None[source]#

Execute the process.

classmethod get_or_create_db_record() ProcessNode[source]#

Create a process node that represents what happened in this process.

Returns:

A process node

property process_class: Callable[[...], Any]#

Return the class that represents this Process, for the FunctionProcess this is the function itself.

For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.

Returns:

A Process class that represents the function

run() 'ExitCode' | None[source]#

Run the process.

classmethod validate_inputs(*args: Any, **kwargs: Any) None[source]#

Validate the positional and keyword arguments passed in the function call.

Raises:

TypeError – if more positional arguments are passed than the function defines

class aiida.engine.processes.functions.ProcessFunctionType(*args, **kwargs)[source]#

Bases: Protocol, Generic[P, R_co, N]

Protocol for a decorated process function.

__abstractmethods__ = frozenset({})#
__annotations__ = {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}#
__call__(*args: ~typing.~P, **kwargs: ~typing.~P) R_co[source]#

Call self as a function.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.functions', '__annotations__': {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}, '__doc__': 'Protocol for a decorated process function.', '__call__': <function ProcessFunctionType.__call__>, 'run': <function ProcessFunctionType.run>, 'run_get_pk': <function ProcessFunctionType.run_get_pk>, 'run_get_node': <function ProcessFunctionType.run_get_node>, '__orig_bases__': (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N]), '__dict__': <attribute '__dict__' of 'ProcessFunctionType' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessFunctionType' objects>, '__parameters__': (~P, +R_co, ~N), '_is_protocol': True, '__subclasshook__': <function Protocol.__init_subclass__.<locals>._proto_hook>, '__init__': <function _no_init_or_replace_init>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>})#
__init__(*args, **kwargs)#
__module__ = 'aiida.engine.processes.functions'#
__orig_bases__ = (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N])#
__parameters__ = (~P, +R_co, ~N)#
__subclasshook__()#

Abstract classes can override this to customize issubclass().

This is invoked early on by abc.ABCMeta.__subclasscheck__(). It should return True, False or NotImplemented. If it returns NotImplemented, the normal algorithm is used. Otherwise, it overrides the normal algorithm (and the outcome is cached).

__weakref__#

list of weak references to the object (if defined)

_abc_impl = <_abc._abc_data object>#
_is_protocol = True#
is_process_function: bool#
node_class: Type[N]#
process_class: Type[Process]#
recreate_from: Callable[[N], Process]#
run(*args: ~typing.~P, **kwargs: ~typing.~P) R_co[source]#
run_get_node(*args: ~typing.~P, **kwargs: ~typing.~P) tuple[dict[str, Any] | None, N][source]#
run_get_pk(*args: ~typing.~P, **kwargs: ~typing.~P) tuple[dict[str, Any] | None, int][source]#
spec: Callable[[], ProcessSpec]#
aiida.engine.processes.functions.calcfunction(function: Callable[[P], R_co]) ProcessFunctionType[P, R_co, CalcFunctionNode][source]#

A decorator to turn a standard python function into a calcfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the calcfunction
>>> @calcfunction
>>> def sum(a, b):
>>>    return a + b
>>> # Run it with some input
>>> r = sum(Int(4), Int(5))
>>> print(r)
9
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters:

function – The function to decorate.

Returns:

The decorated function.

aiida.engine.processes.functions.get_stack_size(size: int = 2) int[source]#

Return the stack size for the caller’s frame.

This solution is taken from https://stackoverflow.com/questions/34115298/ as a more performant alternative to the naive len(inspect.stack())` solution. This implementation is about three orders of magnitude faster compared to the naive solution and it scales especially well for larger stacks, which will be usually the case for the usage of ``aiida-core. However, it does use the internal _getframe of the sys standard library. It this ever were to stop working, simply switch to using len(inspect.stack()).

Parameters:

size – Hint for the expected stack size.

Returns:

The stack size for caller’s frame.

aiida.engine.processes.functions.infer_valid_type_from_type_annotation(annotation: Any) tuple[Any, ...][source]#

Infer the value for the valid_type of an input port from the given function argument annotation.

Parameters:

annotation – The annotation of a function argument as returned by inspect.get_annotation.

Returns:

A tuple of valid types. If no valid types were defined or they could not be successfully parsed, an empty tuple is returned.

aiida.engine.processes.functions.process_function(node_class: Type[ProcessNode]) Callable[[FunctionType], FunctionType][source]#

The base function decorator to create a FunctionProcess out of a normal python function.

Parameters:

node_class – the ORM class to be used as the Node record for the FunctionProcess

aiida.engine.processes.functions.workfunction(function: Callable[[P], R_co]) ProcessFunctionType[P, R_co, WorkFunctionNode][source]#

A decorator to turn a standard python function into a workfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the workfunction
>>> @workfunction
>>> def select(a, b):
>>>    return a
>>> # Run it with some input
>>> r = select(Int(4), Int(5))
>>> print(r)
4
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters:

function – The function to decorate.

Returns:

The decorated function.

Futures that can poll or receive broadcasted messages while waiting for a task to be completed.

class aiida.engine.processes.futures.ProcessFuture(pk: int, loop: AbstractEventLoop | None = None, poll_interval: None | int | float = None, communicator: Communicator | None = None)[source]#

Bases: Future

Future that waits for a process to complete using both polling and listening for broadcast events if possible.

__init__(pk: int, loop: AbstractEventLoop | None = None, poll_interval: None | int | float = None, communicator: Communicator | None = None)[source]#

Construct a future for a process node being finished.

If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.

Parameters:
  • pk – process pk

  • loop – An event loop

  • poll_interval – optional polling interval, if None, polling is not activated.

  • communicator – optional communicator, if None, will not subscribe to broadcasts.

__module__ = 'aiida.engine.processes.futures'#
_filtered = None#
async _poll_process(node: Node, poll_interval: int | float) None[source]#

Poll whether the process node has reached a terminal state.

cleanup() None[source]#

Clean up the future by removing broadcast subscribers from the communicator if it still exists.

A sub class of plumpy.ProcessLauncher to launch a Process.

class aiida.engine.processes.launcher.ProcessLauncher(loop: AbstractEventLoop | None = None, persister: Persister | None = None, load_context: LoadSaveContext | None = None, loader: ObjectLoader | None = None)[source]#

Bases: ProcessLauncher

A sub class of plumpy.ProcessLauncher to launch a Process.

It overrides the _continue method to make sure the node corresponding to the task can be loaded and that if it is already marked as terminated, it is not continued but the future is reconstructed and returned

__module__ = 'aiida.engine.processes.launcher'#
async _continue(communicator, pid, nowait, tag=None)[source]#

Continue the task.

Note that the task may already have been completed, as indicated from the corresponding the node, in which case it is not continued, but the corresponding future is reconstructed and returned. This scenario may occur when the Process was already completed by another worker that however failed to send the acknowledgment.

Parameters:
  • communicator – the communicator that called this method

  • pid – the pid of the process to continue

  • nowait – if True don’t wait for the process to finish, just return the pid, otherwise wait and return the results

  • tag – the tag of the checkpoint to continue from

static handle_continue_exception(node, exception, message)[source]#

Handle exception raised in _continue call.

If the process state of the node has not yet been put to excepted, the exception was raised before the process instance could be reconstructed, for example when the process class could not be loaded, thereby circumventing the exception handling of the state machine. Raising this exception will then acknowledge the process task with RabbitMQ leaving an uncleaned node in the CREATED state for ever. Therefore we have to perform the node cleaning manually.

Parameters:
  • exception – the exception object

  • message – string message to use for the log message

AiiDA specific implementation of plumpy Ports and PortNamespaces for the ProcessSpec.

class aiida.engine.processes.ports.CalcJobOutputPort(*args, **kwargs)[source]#

Bases: OutputPort

Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.

__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
property pass_to_parser: bool#
class aiida.engine.processes.ports.InputPort(*args, **kwargs)[source]#

Bases: WithMetadata, WithSerialize, WithNonDb, InputPort

Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.

The mixins have to go before the main port class in the superclass order to make sure they have the chance to process their specific keywords.

__annotations__ = {}#
__init__(*args, **kwargs) None[source]#

Override the constructor to check the type of the default if set and warn if not immutable.

__module__ = 'aiida.engine.processes.ports'#
get_description() Dict[str, str][source]#

Return a description of the InputPort, which will be a dictionary of its attributes

Returns:

a dictionary of the stringified InputPort attributes

class aiida.engine.processes.ports.PortNamespace(*args, **kwargs)[source]#

Bases: WithMetadata, WithNonDb, PortNamespace

Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.

__abstractmethods__ = frozenset({})#
__annotations__ = {'_explicitly_set': 'bool', '_is_metadata': 'bool', '_non_db': 'bool', '_non_db_explicitly_set': 'bool', '_ports': "Dict[str, Union[Port, 'PortNamespace']]"}#
__module__ = 'aiida.engine.processes.ports'#
__setitem__(key: str, port: Port) None[source]#

Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.

The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.

Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.

_abc_impl = <_abc._abc_data object>#
serialize(mapping: Dict[str, Any] | None, breadcrumbs: Sequence[str] = ()) Dict[str, Any] | None[source]#

Serialize the given mapping onto this Portnamespace.

It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.

Parameters:
  • mapping – a mapping of values to be serialized

  • breadcrumbs – a tuple with the namespaces of parent namespaces

Returns:

the serialized mapping

static validate_port_name(port_name: str) None[source]#

Validate the given port name.

Valid port names adhere to the following restrictions:

  • Is a valid link label (see below)

  • Does not contain two or more consecutive underscores

Valid link labels adhere to the following restrictions:

  • Has to be a valid python identifier

  • Can only contain alphanumeric characters and underscores

  • Can not start or end with an underscore

Parameters:

port_name – the proposed name of the port to be added

Raises:
  • TypeError – if the port name is not a string type

  • ValueError – if the port name is invalid

class aiida.engine.processes.ports.WithMetadata(*args, **kwargs)[source]#

Bases: object

A mixin that allows an input port to be marked as metadata through the keyword is_metadata.

A metadata input differs from a normal input as in that it is not linked to the ProcessNode as a Data node but rather is stored on the ProcessNode itself (as an attribute, for example).

__annotations__ = {}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that allows an input port to be marked as metadata through the keyword ``is_metadata``.\n\n    A metadata input differs from a normal input as in that it is not linked to the ``ProcessNode`` as a ``Data`` node\n    but rather is stored on the ``ProcessNode`` itself (as an attribute, for example).\n    ', '__init__': <function WithMetadata.__init__>, 'is_metadata_explicitly_set': <property object>, 'is_metadata': <property object>, '__dict__': <attribute '__dict__' of 'WithMetadata' objects>, '__weakref__': <attribute '__weakref__' of 'WithMetadata' objects>, '__annotations__': {'_explicitly_set': 'bool', '_is_metadata': 'bool'}})#
__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
__weakref__#

list of weak references to the object (if defined)

property is_metadata: bool#

Return whether the value of this InputPort should be stored as a Node in the database.

Returns:

True if it should be storable as a Node, False otherwise

property is_metadata_explicitly_set: bool#

Return whether the is_metadata keyword was explicitly passed in the construction of the InputPort.

Returns:

True if is_metadata was explicitly defined during construction, False otherwise

class aiida.engine.processes.ports.WithNonDb(*args, **kwargs)[source]#

Bases: object

A mixin that adds support to a port to flag it should not be stored in the database using the non_db flag.

__annotations__ = {}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that adds support to a port to flag it should not be stored in the database using the ``non_db`` flag.', '__init__': <function WithNonDb.__init__>, 'non_db_explicitly_set': <property object>, 'non_db': <property object>, '__dict__': <attribute '__dict__' of 'WithNonDb' objects>, '__weakref__': <attribute '__weakref__' of 'WithNonDb' objects>, '__annotations__': {'_non_db_explicitly_set': 'bool', '_non_db': 'bool'}})#
__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
__weakref__#

list of weak references to the object (if defined)

property non_db: bool#

Return whether the value of this InputPort should be stored in the database.

Returns:

True if it should be stored, False otherwise

property non_db_explicitly_set: bool#

Return whether the non_db keyword was explicitly passed in the construction of the InputPort.

Returns:

True if non_db was explicitly defined during construction, False otherwise

class aiida.engine.processes.ports.WithSerialize(*args, **kwargs)[source]#

Bases: object

A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.

__annotations__ = {}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that adds support for a serialization function which is automatically applied on inputs\n    that are not AiiDA data types.\n    ', '__init__': <function WithSerialize.__init__>, 'serialize': <function WithSerialize.serialize>, '__dict__': <attribute '__dict__' of 'WithSerialize' objects>, '__weakref__': <attribute '__weakref__' of 'WithSerialize' objects>, '__annotations__': {'_serializer': "Callable[[Any], 'Data']"}})#
__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
__weakref__#

list of weak references to the object (if defined)

serialize(value: Any) Data[source]#

Serialize the given value, unless it is None, already a Data type, or no serializer function is defined.

Parameters:

value – the value to be serialized

Returns:

a serialized version of the value or the unchanged value

The AiiDA process class

class aiida.engine.processes.process.Process(*args: Any, **kwargs: Any)[source]#

Bases: Process

This class represents an AiiDA process which can be executed and will have full provenance saved in the database.

SINGLE_OUTPUT_LINKNAME: str = 'result'#
class SaveKeys(value)[source]#

Bases: Enum

Keys used to identify things in the saved instance state bundle.

CALC_ID: str = 'calc_id'#
__module__ = 'aiida.engine.processes.process'#
__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]'}#
__init__(inputs: Dict[str, Any] | None = None, logger: logging.Logger | None = None, runner: 'Runner' | None = None, parent_pid: int | None = None, enable_persistence: bool = True) None[source]#

Process constructor.

Parameters:
  • inputs – process inputs

  • logger – aiida logger

  • runner – process runner

  • parent_pid – id of parent process

  • enable_persistence – whether to persist this process

__module__ = 'aiida.engine.processes.process'#
_abc_impl = <_abc._abc_data object>#
_auto_persist: Set[str] | None = {'_creation_time', '_enable_persistence', '_event_helper', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
_build_process_label() str[source]#

Construct the process label that should be set on ProcessNode instances for this process class.

Note

By default this returns the name of the process class itself. It can be overridden by Process subclasses to provide a more specific label.

Returns:

The process label to use for ProcessNode instances.

_create_and_setup_db_record() int | UUID[source]#

Create and setup the database record for this process

Returns:

the uuid or pk of the process

_filter_serializable_metadata(port: None | InputPort | PortNamespace, port_value: Any) Any | None[source]#

Return the inputs that correspond to ports with is_metadata=True and that are JSON serializable.

The function is called recursively for any port namespaces.

Parameters:

port – An InputPort or PortNamespace. If an InputPort that specifies is_metadata=True the port_value is returned. For a PortNamespace this method is called recursively for the keys within the namespace and the resulting dictionary is returned, omitting None values. If either port or port_value is None, None is returned.

Returns:

The port_value where all inputs that do no correspond to a metadata port or are not JSON serializable, have been filtered out.

_flat_inputs() Dict[str, Any][source]#

Return a flattened version of the parsed inputs dictionary.

The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.

Returns:

flat dictionary of parsed inputs

_flat_outputs() Dict[str, Any][source]#

Return a flattened version of the registered outputs dictionary.

The eventual keys will be a concatenation of the nested keys.

Returns:

flat dictionary of parsed outputs

_flatten_inputs(port: None | InputPort | PortNamespace, port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]][source]#

Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable

Parameters:
  • port – port against which to map the port value, can be InputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns:

flat list of inputs

_flatten_outputs(port: None | OutputPort | PortNamespace, port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]][source]#

Function that will recursively flatten the outputs dictionary.

Parameters:
  • port – port against which to map the port value, can be OutputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns:

flat list of outputs

static _get_namespace_list(namespace: str | None = None, agglomerate: bool = True) List[str | None][source]#

Get the list of namespaces in a given namespace.

Parameters:
  • namespace – name space

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched.

Returns:

namespace list

_node_class#

alias of ProcessNode

_save_checkpoint() None[source]#

Save the current state in a chechpoint if persistence is enabled and the process state is not terminal

If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.

_setup_db_record() None[source]#

Create the database record for this process and the links with respect to its inputs

This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.

In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.

_setup_inputs() None[source]#

Create the links between the input nodes and the ProcessNode that represents this process.

_setup_metadata(metadata: dict) None[source]#

Store the metadata on the ProcessNode.

_setup_version_info() None[source]#

Store relevant plugin version information.

_spec_class#

alias of ProcessSpec

classmethod build_process_type() str[source]#

The process type.

Returns:

string of the process type

Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659

decode_input_args(encoded: str) Dict[str, Any][source]#

Decode saved input arguments as they came from the saved instance state Bundle

Parameters:

encoded – encoded (serialized) inputs

Returns:

The decoded input args

classmethod define(spec: ProcessSpec) None[source]#

Define the specification of the process, including its inputs, outputs and known exit codes.

A metadata input namespace is defined, with optional ports that are not stored in the database.

encode_input_args(inputs: Dict[str, Any]) str[source]#

Encode input arguments such that they may be saved in a Bundle

Parameters:

inputs – A mapping of the inputs as passed to the process

Returns:

The encoded (serialized) inputs

exit_codes = {'ERROR_INVALID_OUTPUT': (10, 'The process returned an invalid output.', True), 'ERROR_LEGACY_FAILURE': (2, 'The process failed with legacy failure mode.', True), 'ERROR_MISSING_OUTPUT': (11, 'The process did not register a required output.', True), 'ERROR_UNSPECIFIED': (1, 'The process has failed with an unspecified error.', True)}#
exposed_inputs(process_class: Type[Process], namespace: str | None = None, agglomerate: bool = True) AttributeDict[source]#

Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.

Parameters:
  • process_class – Process class whose inputs to try and retrieve

  • namespace – PortNamespace in which to look for the inputs

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for inputs. Inputs in lower-lying namespaces take precedence.

Returns:

exposed inputs

exposed_outputs(node: ProcessNode, process_class: Type[Process], namespace: str | None = None, agglomerate: bool = True) AttributeDict[source]#

Return the outputs which were exposed from the process_class and emitted by the specific node

Parameters:
  • node – process node whose outputs to try and retrieve

  • namespace – Namespace in which to search for exposed outputs.

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for outputs. Outputs in lower-lying namespaces take precedence.

Returns:

exposed outputs

classmethod get_builder() ProcessBuilder[source]#
classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int][source]#

Return the exit status (integers) for the given exit code labels.

Parameters:

exit_code_labels – a list of strings that reference exit code labels of this process class

Returns:

list of exit status integers that correspond to the given exit code labels

Raises:

AttributeError – if at least one of the labels does not correspond to an existing exit code

classmethod get_or_create_db_record() ProcessNode[source]#

Create a process node that represents what happened in this process.

Returns:

A process node

get_parent_calc() ProcessNode | None[source]#

Get the parent process node

Returns:

the parent process node if there is one

get_provenance_inputs_iterator() Iterator[Tuple[str, InputPort | PortNamespace]][source]#

Get provenance input iterator.

Return type:

filter

init() None[source]#
property inputs: AttributesFrozendict#

Return the inputs attribute dictionary or an empty one.

This overrides the property of the base class because that can also return None. This override ensures calling functions that they will always get an instance of AttributesFrozenDict.

classmethod is_valid_cache(node: ProcessNode) bool[source]#

Check if the given node can be cached from.

Overriding this method allows Process sub-classes to modify when corresponding process nodes are considered as a cache.

Warning

When overriding this method, make sure to return False at least in all cases when super()._node.base.caching.is_valid_cache(node) returns False. Otherwise, the invalidates_cache keyword on exit codes may have no effect.

kill(msg: str | None = None) bool | Future[source]#

Kill the process and all the children calculations it called

Parameters:

msg – message

load_instance_state(saved_state: MutableMapping[str, Any], load_context: LoadSaveContext) None[source]#

Load instance state.

Parameters:
  • saved_state – saved instance state

  • load_context

property metadata: AttributeDict#

Return the metadata that were specified when this process instance was launched.

Returns:

metadata dictionary

property node: ProcessNode#

Return the ProcessNode used by this process to represent itself in the database.

Returns:

instance of sub class of ProcessNode

on_create() None[source]#

Called when a Process is created.

on_entered(from_state: State | None) None[source]#

After entering a new state, save a checkpoint and update the latest process state change timestamp.

on_except(exc_info: Tuple[Any, Exception, TracebackType]) None[source]#

Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute

Parameters:

exc_info – the sys.exc_info() object (type, value, traceback)

on_finish(result: int | ExitCode | None, successful: bool) None[source]#

Set the finish status on the process node.

Parameters:
  • result – result of the process

  • successful – whether execution was successful

on_output_emitting(output_port: str, value: Any) None[source]#

The process has emitted a value on the given output port.

Parameters:
  • output_port – The output port name the value was emitted on

  • value – The value emitted

on_paused(msg: str | None = None) None[source]#

The Process was paused so set the paused attribute on the process node

Parameters:

msg – message

on_playing() None[source]#

The Process was unpaused so remove the paused attribute on the process node

on_terminated() None[source]#

Called when a Process enters a terminal state.

out(output_port: str, value: Any | None = None) None[source]#

Attach output to output port.

The name of the port will be used as the link label.

Parameters:
  • output_port – name of output port

  • value – value to put inside output port

out_many(out_dict: Dict[str, Any]) None[source]#

Attach outputs to multiple output ports.

Keys of the dictionary will be used as output port names, values as outputs.

Parameters:

out_dict (dict) – output dictionary

report(msg: str, *args, **kwargs) None[source]#

Log a message to the logger, which should get saved to the database through the attached DbLogHandler.

The pk, class name and function name of the caller are prepended to the given message

Parameters:
  • msg – message to log

  • args – args to pass to the log call

  • kwargs – kwargs to pass to the log call

property runner: Runner#

Get process runner.

save_instance_state(out_state: MutableMapping[str, Any], save_context: LoadSaveContext | None) None[source]#

Save instance state.

See documentation of plumpy.processes.Process.save_instance_state().

set_status(status: str | None) None[source]#

The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.

Parameters:

status – the status message

classmethod spec() ProcessSpec[source]#
spec_metadata = <aiida.engine.processes.ports.PortNamespace object>#
submit(process: Type[Process], inputs: dict[str, Any] | None = None, **kwargs) ProcessNode[source]#

Submit process for execution.

Parameters:
  • process – The process class.

  • inputs – The dictionary of process inputs.

Returns:

The process node.

update_outputs() None[source]#

Attach new outputs to the node since the last call.

Does nothing, if self.metadata.store_provenance is False.

property uuid: str#

Return the UUID of the process which corresponds to the UUID of its associated ProcessNode.

Returns:

the UUID associated to this process instance

aiida.engine.processes.process.get_query_string_from_process_type_string(process_type_string: str) str[source]#

Take the process type string of a Node and create the queryable type string.

Parameters:

process_type_string (str) – the process type string

Returns:

string that can be used to query for subclasses of the process type using ‘LIKE <string>’

Return type:

str

AiiDA specific implementation of plumpy’s ProcessSpec.

class aiida.engine.processes.process_spec.CalcJobProcessSpec[source]#

Bases: ProcessSpec

Process spec intended for the CalcJob process class.

OUTPUT_PORT_TYPE#

alias of CalcJobOutputPort

__annotations__ = {}#
__init__() None[source]#
__module__ = 'aiida.engine.processes.process_spec'#
property default_output_node: str | None#
class aiida.engine.processes.process_spec.ProcessSpec[source]#

Bases: ProcessSpec

Default process spec for process classes defined in aiida-core.

This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.

INPUT_PORT_TYPE#

alias of InputPort

METADATA_KEY: str = 'metadata'#
METADATA_OPTIONS_KEY: str = 'options'#
PORT_NAMESPACE_TYPE#

alias of PortNamespace

__annotations__ = {'METADATA_KEY': <class 'str'>, 'METADATA_OPTIONS_KEY': <class 'str'>, 'NAME_INPUTS_PORT_NAMESPACE': 'str', 'NAME_OUTPUTS_PORT_NAMESPACE': 'str', '_exposed_inputs': 'EXPOSED_TYPE', '_exposed_outputs': 'EXPOSED_TYPE', '_ports': 'PortNamespace', '_sealed': 'bool'}#
__init__() None[source]#
__module__ = 'aiida.engine.processes.process_spec'#
exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None[source]#

Add an exit code to the ProcessSpec

Parameters:
  • status – the exit status integer

  • label – a label by which the exit code can be addressed

  • message – a more detailed description of the exit code

  • invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching

property exit_codes: ExitCodesNamespace#

Return the namespace of exit codes defined for this ProcessSpec

Returns:

ExitCodesNamespace of ExitCode named tuples

property inputs: PortNamespace#

Get the input port namespace of the process specification

Returns:

the input PortNamespace

property metadata_key: str#
property options_key: str#
property outputs: PortNamespace#

Get the output port namespace of the process specification

Returns:

the outputs PortNamespace

property ports: PortNamespace#

Module with utilities.

aiida.engine.processes.utils.prune_mapping(value)[source]#

Prune a nested mapping from all mappings that are completely empty.

Note

A nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.

Parameters:

value – A nested mapping of port values.

Returns:

The same mapping but without any nested namespace that is completely empty.