aiida.engine.processes package#

Module for processes and related utilities.

Submodules#

Convenience classes to help building the input dictionaries for Processes.

class aiida.engine.processes.builder.PrettyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: json.encoder.JSONEncoder

JSON encoder for returning a pretty representation of an AiiDA ProcessBuilder.

__module__ = 'aiida.engine.processes.builder'#
default(o)[source]#

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class aiida.engine.processes.builder.ProcessBuilder(process_class: Type[Process])[source]#

Bases: aiida.engine.processes.builder.ProcessBuilderNamespace

A process builder that helps setting up the inputs for creating a new process.

__abstractmethods__ = frozenset({})#
__init__(process_class: Type[Process])[source]#

Construct a ProcessBuilder instance for the given Process class.

Parameters

process_class – the Process subclass

__module__ = 'aiida.engine.processes.builder'#
_abc_impl = <_abc_data object>#
_repr_pretty_(p, _) str[source]#

Pretty representation for in the IPython console and notebooks.

property process_class: Type[Process]#

Return the process class for which this builder is constructed.

class aiida.engine.processes.builder.ProcessBuilderNamespace(port_namespace: aiida.engine.processes.ports.PortNamespace)[source]#

Bases: collections.abc.MutableMapping

Input namespace for the ProcessBuilder.

Dynamically generates the getters and setters for the input ports of a given PortNamespace

__abstractmethods__ = frozenset({})#
__delattr__(item)[source]#

Implement delattr(self, name).

__delitem__(item)[source]#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.builder', '__doc__': 'Input namespace for the `ProcessBuilder`.\n\n    Dynamically generates the getters and setters for the input ports of a given PortNamespace\n    ', '__init__': <function ProcessBuilderNamespace.__init__>, '__setattr__': <function ProcessBuilderNamespace.__setattr__>, '__repr__': <function ProcessBuilderNamespace.__repr__>, '__dir__': <function ProcessBuilderNamespace.__dir__>, '__iter__': <function ProcessBuilderNamespace.__iter__>, '__len__': <function ProcessBuilderNamespace.__len__>, '__getitem__': <function ProcessBuilderNamespace.__getitem__>, '__setitem__': <function ProcessBuilderNamespace.__setitem__>, '__delitem__': <function ProcessBuilderNamespace.__delitem__>, '__delattr__': <function ProcessBuilderNamespace.__delattr__>, '_recursive_merge': <function ProcessBuilderNamespace._recursive_merge>, '_merge': <function ProcessBuilderNamespace._merge>, '_update': <function ProcessBuilderNamespace._update>, '_inputs': <function ProcessBuilderNamespace._inputs>, '_prune': <function ProcessBuilderNamespace._prune>, '__dict__': <attribute '__dict__' of 'ProcessBuilderNamespace' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessBuilderNamespace' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc_data object>, '__annotations__': {}})#
__dir__()[source]#

Default dir() implementation.

__getitem__(item)[source]#
__init__(port_namespace: aiida.engine.processes.ports.PortNamespace) None[source]#

Dynamically construct the get and set properties for the ports of the given port namespace.

For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.

Parameters

port_namespace – the inputs PortNamespace for which to construct the builder

__iter__()[source]#
__len__()[source]#
__module__ = 'aiida.engine.processes.builder'#
__repr__()[source]#

Return repr(self).

__setattr__(attr: str, value: Any) None[source]#

Assign the given value to the port with key attr.

Note

Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec

__setitem__(item, value)[source]#
__weakref__#

list of weak references to the object (if defined)

_abc_impl = <_abc_data object>#
_inputs(prune: bool = False) dict[source]#

Return the entire mapping of inputs specified for this builder.

Parameters

prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever

Returns

mapping of inputs ports and their input values.

_merge(*args, **kwds)[source]#

Merge the content of a dictionary or keyword arguments in .

Note

This method differs in behavior from _update in that _merge will recursively update the existing dictionary with the one that is specified in the arguments. The _update method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.

The method is prefixed with an underscore in order to not reserve the name for a potential port.

Parameters
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

_prune(value)[source]#

Prune a nested mapping from all mappings that are completely empty.

Note

a nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.

Parameters

value – a nested mapping of port values

Returns

the same mapping but without any nested namespace that is completely empty.

_recursive_merge(dictionary, key, value)[source]#

Recursively merge the contents of dictionary setting its key to value.

_update(*args, **kwds)[source]#

Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.

The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.

Parameters
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

Functions to control and interact with running processes.

exception aiida.engine.processes.control.ProcessTimeoutException[source]#

Bases: aiida.common.exceptions.AiidaException

Raised when action to communicate with a process times out.

__module__ = 'aiida.engine.processes.control'#
aiida.engine.processes.control._perform_actions(processes: list[ProcessNode], action: t.Callable, infinitive: str, present: str, past: str, timeout: float = None, wait: bool = False) None[source]#

Perform an action on a list of processes.

Parameters
  • processes – The list of processes to perform the action on.

  • action – The action to perform.

  • infinitive – The infinitive of the verb that represents the action.

  • present – The present tense of the verb that represents the action.

  • past – The past tense of the verb that represents the action.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control._resolve_futures(futures: dict[concurrent.futures.Future, ProcessNode], infinitive: str, present: str, past: str, wait: bool = False, timeout: float = None) None[source]#

Process a mapping of futures representing an action on an active process.

This function will echo the correct information strings based on the outcomes of the futures and the given verb conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a timeout to put a maximum wait time on the actions.

Parameters
  • futures – The map of action futures and the corresponding processes.

  • infinitive – The infinitive form of the action verb.

  • present – The present tense form of the action verb.

  • past – The past tense form of the action verb.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

aiida.engine.processes.control.get_active_processes(paused: bool = False) list[ProcessNode][source]#

Return all active processes, i.e., those with a process state of created, waiting or running.

Parameters

paused – Boolean, if True, filter for processes that are paused.

Returns

A list of process nodes of active processes.

aiida.engine.processes.control.kill_processes(processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Kill running processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters
  • processes – List of processes to play.

  • all_entries – Kill all active processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.pause_processes(processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Pause running processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters
  • processes – List of processes to play.

  • all_entries – Pause all playing processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.play_processes(processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None[source]#

Play (unpause) paused processes.

Note

Requires the daemon to be running, or processes will be unresponsive.

Parameters
  • processes – List of processes to play.

  • all_entries – Play all paused processes.

  • timeout – Raise a ProcessTimeoutException if the process does not respond within this amount of seconds.

  • wait – Set to True to wait for process response, for False the action is fire-and-forget.

Raises

ProcessTimeoutException – If the processes do not respond within the timeout.

aiida.engine.processes.control.revive_processes(processes: list[ProcessNode], *, wait: bool = False) None[source]#

Revive processes that seem stuck and are no longer reachable.

Warning: Use only as a last resort after you’ve gone through the checklist below.

  1. Does verdi status indicate that both daemon and RabbitMQ are running properly? If not, restart the daemon with verdi daemon restart --reset and restart RabbitMQ.

  2. Try to play the process through play_processes. If a ProcessTimeoutException is raised use this method to attempt to revive it.

Details: When RabbitMQ loses the process task before the process has completed, the process is never picked up by the daemon and will remain “stuck”. This method recreates the task, which can lead to multiple instances of the task being executed and should thus be used with caution.

Note

Requires the daemon to be running.

Parameters
  • processes – List of processes to revive.

  • wait – Set to True to wait for a response, for False the action is fire-and-forget.

A namedtuple and namespace for ExitCodes that can be used to exit from Processes.

class aiida.engine.processes.exit_code.ExitCode(status: int = 0, message: Optional[str] = None, invalidates_cache: bool = False)[source]#

Bases: tuple

A simple data class to define an exit code for a Process.

When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.

Parameters
  • status – positive integer exit status, where a non-zero value indicated the process failed, default is 0

  • message – optional message with more details about the failure mode

  • invalidates_cache – optional flag, indicating that a process should not be used in caching

__annotations__ = {'invalidates_cache': <class 'bool'>, 'message': typing.Union[str, NoneType], 'status': <class 'int'>}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__module__ = 'aiida.engine.processes.exit_code'#
static __new__(_cls, status: int = 0, message: Optional[str] = None, invalidates_cache: bool = False)#

Create new instance of ExitCode(status, message, invalidates_cache)

__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {'invalidates_cache': False, 'message': None, 'status': 0}#
_field_types = {'invalidates_cache': <class 'bool'>, 'message': typing.Union[str, NoneType], 'status': <class 'int'>}#
_fields = ('status', 'message', 'invalidates_cache')#
_fields_defaults = {}#
classmethod _make(iterable)#

Make a new ExitCode object from a sequence or iterable

_replace(**kwds)#

Return a new ExitCode object replacing specified fields with new values

format(**kwargs: str) aiida.engine.processes.exit_code.ExitCode[source]#

Create a clone of this exit code where the template message is replaced by the keyword arguments.

Parameters

kwargs – replacement parameters for the template message

invalidates_cache: bool#

Alias for field number 2

message: Optional[str]#

Alias for field number 1

status: int#

Alias for field number 0

class aiida.engine.processes.exit_code.ExitCodesNamespace(dictionary=None)[source]#

Bases: aiida.common.extendeddicts.AttributeDict

A namespace of ExitCode instances that can be accessed through getattr as well as getitem.

Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.

__call__(identifier: str) aiida.engine.processes.exit_code.ExitCode[source]#

Return a specific exit code identified by either its exit status or label.

Parameters

identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label

Returns

an ExitCode instance

Raises

ValueError – if no exit code with the given label is defined for this process

__module__ = 'aiida.engine.processes.exit_code'#

Class and decorators to generate processes out of simple python functions.

class aiida.engine.processes.functions.FunctionProcess(*args: Any, **kwargs: Any)[source]#

Bases: aiida.engine.processes.process.Process

Function process class used for turning functions into a Process

__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_func_args': typing.Sequence[str], '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]'}#
__init__(*args, **kwargs) None[source]#

Process constructor.

Parameters
  • inputs – process inputs

  • logger – aiida logger

  • runner – process runner

  • parent_pid – id of parent process

  • enable_persistence – whether to persist this process

__module__ = 'aiida.engine.processes.functions'#
_abc_impl = <_abc_data object>#
static _func(*_args, **_kwargs) dict[source]#

This is used internally to store the actual function that is being wrapped and will be replaced by the build method.

_func_args: Sequence[str] = ()#
_setup_db_record() None[source]#

Set up the database record for the process.

classmethod args_to_dict(*args: Any) Dict[str, Any][source]#

Create an input dictionary (of form label -> value) from supplied args.

Parameters

args – The values to use for the dictionary

Returns

A label -> value dictionary

static build(func: Callable[[...], Any], node_class: Type[aiida.orm.nodes.process.process.ProcessNode]) Type[aiida.engine.processes.functions.FunctionProcess][source]#

Build a Process from the given function.

All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.

Parameters
  • func – The function to build a process from

  • node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin FunctionCalculationMixin.

Returns

A Process class that represents the function

classmethod create_inputs(*args: Any, **kwargs: Any) Dict[str, Any][source]#

Create the input args for the FunctionProcess.

execute() Optional[Dict[str, Any]][source]#

Execute the process.

classmethod get_or_create_db_record() aiida.orm.nodes.process.process.ProcessNode[source]#

Create a process node that represents what happened in this process.

Returns

A process node

property process_class: Callable[[...], Any]#

Return the class that represents this Process, for the FunctionProcess this is the function itself.

For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.

Returns

A Process class that represents the function

run() Optional[ExitCode][source]#

Run the process.

classmethod validate_inputs(*args: Any, **kwargs: Any) None[source]#

Validate the positional and keyword arguments passed in the function call.

Raises

TypeError – if more positional arguments are passed than the function defines

aiida.engine.processes.functions.calcfunction(function: Callable[[...], Any]) Callable[[...], Any][source]#

A decorator to turn a standard python function into a calcfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the calcfunction
>>> @calcfunction
>>> def sum(a, b):
>>>    return a + b
>>> # Run it with some input
>>> r = sum(Int(4), Int(5))
>>> print(r)
9
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters

function (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

aiida.engine.processes.functions.process_function(node_class: Type[aiida.orm.nodes.process.process.ProcessNode]) Callable[[Callable[[...], Any]], Callable[[...], Any]][source]#

The base function decorator to create a FunctionProcess out of a normal python function.

Parameters

node_class (aiida.orm.ProcessNode) – the ORM class to be used as the Node record for the FunctionProcess

aiida.engine.processes.functions.workfunction(function: Callable[[...], Any]) Callable[[...], Any][source]#

A decorator to turn a standard python function into a workfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the workfunction
>>> @workfunction
>>> def select(a, b):
>>>    return a
>>> # Run it with some input
>>> r = select(Int(4), Int(5))
>>> print(r)
4
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters

function (callable) – The function to decorate.

Returns

The decorated function.

Futures that can poll or receive broadcasted messages while waiting for a task to be completed.

class aiida.engine.processes.futures.ProcessFuture(pk: int, loop: Optional[asyncio.events.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.communications.Communicator] = None)[source]#

Bases: _asyncio.Future

Future that waits for a process to complete using both polling and listening for broadcast events if possible.

__init__(pk: int, loop: Optional[asyncio.events.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.communications.Communicator] = None)[source]#

Construct a future for a process node being finished.

If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.

Parameters
  • pk – process pk

  • loop – An event loop

  • poll_interval – optional polling interval, if None, polling is not activated.

  • communicator – optional communicator, if None, will not subscribe to broadcasts.

__module__ = 'aiida.engine.processes.futures'#
_filtered = None#
async _poll_process(node: aiida.orm.nodes.node.Node, poll_interval: Union[int, float]) None[source]#

Poll whether the process node has reached a terminal state.

cleanup() None[source]#

Clean up the future by removing broadcast subscribers from the communicator if it still exists.

AiiDA specific implementation of plumpy Ports and PortNamespaces for the ProcessSpec.

class aiida.engine.processes.ports.CalcJobOutputPort(*args, **kwargs)[source]#

Bases: plumpy.ports.OutputPort

Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.

__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
property pass_to_parser: bool#
class aiida.engine.processes.ports.InputPort(*args, **kwargs)[source]#

Bases: aiida.engine.processes.ports.WithSerialize, aiida.engine.processes.ports.WithNonDb, plumpy.ports.InputPort

Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.

__init__(*args, **kwargs) None[source]#

Override the constructor to check the type of the default if set and warn if not immutable.

__module__ = 'aiida.engine.processes.ports'#
get_description() Dict[str, str][source]#

Return a description of the InputPort, which will be a dictionary of its attributes

Returns

a dictionary of the stringified InputPort attributes

class aiida.engine.processes.ports.PortNamespace(*args, **kwargs)[source]#

Bases: aiida.engine.processes.ports.WithNonDb, plumpy.ports.PortNamespace

Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.

__abstractmethods__ = frozenset({})#
__module__ = 'aiida.engine.processes.ports'#
__setitem__(key: str, port: plumpy.ports.Port) None[source]#

Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.

The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.

Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.

_abc_impl = <_abc_data object>#
serialize(mapping: Optional[Dict[str, Any]], breadcrumbs: Sequence[str] = ()) Optional[Dict[str, Any]][source]#

Serialize the given mapping onto this Portnamespace.

It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.

Parameters
  • mapping – a mapping of values to be serialized

  • breadcrumbs – a tuple with the namespaces of parent namespaces

Returns

the serialized mapping

static validate_port_name(port_name: str) None[source]#

Validate the given port name.

Valid port names adhere to the following restrictions:

  • Is a valid link label (see below)

  • Does not contain two or more consecutive underscores

Valid link labels adhere to the following restrictions:

  • Has to be a valid python identifier

  • Can only contain alphanumeric characters and underscores

  • Can not start or end with an underscore

Parameters

port_name – the proposed name of the port to be added

Raises
  • TypeError – if the port name is not a string type

  • ValueError – if the port name is invalid

class aiida.engine.processes.ports.WithNonDb(*args, **kwargs)[source]#

Bases: object

A mixin that adds support to a port to flag that it should not be stored in the database using the non_db=True flag.

The mixins have to go before the main port class in the superclass order to make sure the mixin has the chance to strip out the non_db keyword.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': '\n    A mixin that adds support to a port to flag that it should not be stored\n    in the database using the non_db=True flag.\n\n    The mixins have to go before the main port class in the superclass order\n    to make sure the mixin has the chance to strip out the non_db keyword.\n    ', '__init__': <function WithNonDb.__init__>, 'non_db_explicitly_set': <property object>, 'non_db': <property object>, '__dict__': <attribute '__dict__' of 'WithNonDb' objects>, '__weakref__': <attribute '__weakref__' of 'WithNonDb' objects>, '__annotations__': {'_non_db_explicitly_set': 'bool', '_non_db': 'bool'}})#
__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
__weakref__#

list of weak references to the object (if defined)

property non_db: bool#

Return whether the value of this Port should be stored as a Node in the database.

Returns

boolean, True if it should be storable as a Node, False otherwise

property non_db_explicitly_set: bool#

Return whether the a value for non_db was explicitly passed in the construction of the Port.

Returns

boolean, True if non_db was explicitly defined during construction, False otherwise

class aiida.engine.processes.ports.WithSerialize(*args, **kwargs)[source]#

Bases: object

A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': '\n    A mixin that adds support for a serialization function which is automatically applied on inputs\n    that are not AiiDA data types.\n    ', '__init__': <function WithSerialize.__init__>, 'serialize': <function WithSerialize.serialize>, '__dict__': <attribute '__dict__' of 'WithSerialize' objects>, '__weakref__': <attribute '__weakref__' of 'WithSerialize' objects>, '__annotations__': {'_serializer': "Callable[[Any], 'Data']"}})#
__init__(*args, **kwargs) None[source]#
__module__ = 'aiida.engine.processes.ports'#
__weakref__#

list of weak references to the object (if defined)

serialize(value: Any) aiida.orm.nodes.data.data.Data[source]#

Serialize the given value if it is not already a Data type and a serializer function is defined

Parameters

value – the value to be serialized

Returns

a serialized version of the value or the unchanged value

The AiiDA process class

class aiida.engine.processes.process.Process(*args: Any, **kwargs: Any)[source]#

Bases: plumpy.processes.Process

This class represents an AiiDA process which can be executed and will have full provenance saved in the database.

SINGLE_OUTPUT_LINKNAME: str = 'result'#
class SaveKeys(value)[source]#

Bases: enum.Enum

Keys used to identify things in the saved instance state bundle.

CALC_ID: str = 'calc_id'#
__annotations__ = {'CALC_ID': <class 'str'>}#
__module__ = 'aiida.engine.processes.process'#
__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': <class 'str'>, 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]'}#
__init__(inputs: Optional[Dict[str, Any]] = None, logger: Optional[logging.Logger] = None, runner: Optional[Runner] = None, parent_pid: Optional[int] = None, enable_persistence: bool = True) None[source]#

Process constructor.

Parameters
  • inputs – process inputs

  • logger – aiida logger

  • runner – process runner

  • parent_pid – id of parent process

  • enable_persistence – whether to persist this process

__module__ = 'aiida.engine.processes.process'#
_abc_impl = <_abc_data object>#
_auto_persist: Optional[Set[str]] = {'_creation_time', '_enable_persistence', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
_create_and_setup_db_record() Union[int, uuid.UUID][source]#

Create and setup the database record for this process

Returns

the uuid or pk of the process

_flat_inputs() Dict[str, Any][source]#

Return a flattened version of the parsed inputs dictionary.

The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.

Returns

flat dictionary of parsed inputs

_flat_outputs() Dict[str, Any][source]#

Return a flattened version of the registered outputs dictionary.

The eventual keys will be a concatenation of the nested keys.

Returns

flat dictionary of parsed outputs

_flatten_inputs(port: Union[None, aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]][source]#

Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable

Parameters
  • port – port against which to map the port value, can be InputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns

flat list of inputs

_flatten_outputs(port: Union[None, plumpy.ports.OutputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]][source]#

Function that will recursively flatten the outputs dictionary.

Parameters
  • port – port against which to map the port value, can be OutputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns

flat list of outputs

static _get_namespace_list(namespace: Optional[str] = None, agglomerate: bool = True) List[Optional[str]][source]#

Get the list of namespaces in a given namespace.

Parameters
  • namespace – name space

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched.

Returns

namespace list

_node_class#

alias of aiida.orm.nodes.process.process.ProcessNode

_save_checkpoint() None[source]#

Save the current state in a chechpoint if persistence is enabled and the process state is not terminal

If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.

_setup_db_record() None[source]#

Create the database record for this process and the links with respect to its inputs

This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.

In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.

_setup_inputs() None[source]#

Create the links between the input nodes and the ProcessNode that represents this process.

_setup_metadata(metadata: dict) None[source]#

Store the metadata on the ProcessNode.

_spec_class#

alias of aiida.engine.processes.process_spec.ProcessSpec

classmethod build_process_type() str[source]#

The process type.

Returns

string of the process type

Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659

decode_input_args(encoded: str) Dict[str, Any][source]#

Decode saved input arguments as they came from the saved instance state Bundle

Parameters

encoded – encoded (serialized) inputs

Returns

The decoded input args

classmethod define(spec: aiida.engine.processes.process_spec.ProcessSpec) None[source]#

Define the specification of the process, including its inputs, outputs and known exit codes.

A metadata input namespace is defined, with optional ports that are not stored in the database.

encode_input_args(inputs: Dict[str, Any]) str[source]#

Encode input arguments such that they may be saved in a Bundle

Parameters

inputs – A mapping of the inputs as passed to the process

Returns

The encoded (serialized) inputs

exit_codes = {'ERROR_INVALID_OUTPUT': ExitCode(status=10, message='The process returned an invalid output.', invalidates_cache=False), 'ERROR_LEGACY_FAILURE': ExitCode(status=2, message='The process failed with legacy failure mode.', invalidates_cache=False), 'ERROR_MISSING_OUTPUT': ExitCode(status=11, message='The process did not register a required output.', invalidates_cache=False), 'ERROR_UNSPECIFIED': ExitCode(status=1, message='The process has failed with an unspecified error.', invalidates_cache=False)}#
exposed_inputs(process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict[source]#

Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.

Parameters
  • process_class – Process class whose inputs to try and retrieve

  • namespace – PortNamespace in which to look for the inputs

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for inputs. Inputs in lower-lying namespaces take precedence.

Returns

exposed inputs

exposed_outputs(node: aiida.orm.nodes.process.process.ProcessNode, process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict[source]#

Return the outputs which were exposed from the process_class and emitted by the specific node

Parameters
  • node – process node whose outputs to try and retrieve

  • namespace – Namespace in which to search for exposed outputs.

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for outputs. Outputs in lower-lying namespaces take precedence.

Returns

exposed outputs

classmethod get_builder() aiida.engine.processes.builder.ProcessBuilder[source]#
classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int][source]#

Return the exit status (integers) for the given exit code labels.

Parameters

exit_code_labels – a list of strings that reference exit code labels of this process class

Returns

list of exit status integers that correspond to the given exit code labels

Raises

AttributeError – if at least one of the labels does not correspond to an existing exit code

classmethod get_or_create_db_record() aiida.orm.nodes.process.process.ProcessNode[source]#

Create a process node that represents what happened in this process.

Returns

A process node

get_parent_calc() Optional[aiida.orm.nodes.process.process.ProcessNode][source]#

Get the parent process node

Returns

the parent process node if there is one

get_provenance_inputs_iterator() Iterator[Tuple[str, Union[aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace]]][source]#

Get provenance input iterator.

Return type

filter

init() None[source]#
classmethod is_valid_cache(node: aiida.orm.nodes.process.process.ProcessNode) bool[source]#

Check if the given node can be cached from.

Overriding this method allows Process sub-classes to modify when corresponding process nodes are considered as a cache.

Warning

When overriding this method, make sure to return False at least in all cases when super()._node.base.caching.is_valid_cache(node) returns False. Otherwise, the invalidates_cache keyword on exit codes may have no effect.

kill(msg: Optional[str] = None) Union[bool, _asyncio.Future][source]#

Kill the process and all the children calculations it called

Parameters

msg – message

load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) None[source]#

Load instance state.

Parameters
  • saved_state – saved instance state

  • load_context

property metadata: aiida.common.extendeddicts.AttributeDict#

Return the metadata that were specified when this process instance was launched.

Returns

metadata dictionary

property node: aiida.orm.nodes.process.process.ProcessNode#

Return the ProcessNode used by this process to represent itself in the database.

Returns

instance of sub class of ProcessNode

on_create() None[source]#

Called when a Process is created.

on_entered(from_state: Optional[plumpy.process_states.State]) None[source]#

After entering a new state, save a checkpoint and update the latest process state change timestamp.

on_entering(state: plumpy.process_states.State) None[source]#
on_except(exc_info: Tuple[Any, Exception, types.TracebackType]) None[source]#

Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute

Parameters

exc_info – the sys.exc_info() object (type, value, traceback)

on_finish(result: Union[int, aiida.engine.processes.exit_code.ExitCode], successful: bool) None[source]#

Set the finish status on the process node.

Parameters
  • result – result of the process

  • successful – whether execution was successful

on_output_emitting(output_port: str, value: Any) None[source]#

The process has emitted a value on the given output port.

Parameters
  • output_port – The output port name the value was emitted on

  • value – The value emitted

on_paused(msg: Optional[str] = None) None[source]#

The Process was paused so set the paused attribute on the process node

Parameters

msg – message

on_playing() None[source]#

The Process was unpaused so remove the paused attribute on the process node

on_terminated() None[source]#

Called when a Process enters a terminal state.

out(output_port: str, value: Optional[Any] = None) None[source]#

Attach output to output port.

The name of the port will be used as the link label.

Parameters
  • output_port – name of output port

  • value – value to put inside output port

out_many(out_dict: Dict[str, Any]) None[source]#

Attach outputs to multiple output ports.

Keys of the dictionary will be used as output port names, values as outputs.

Parameters

out_dict (dict) – output dictionary

report(msg: str, *args, **kwargs) None[source]#

Log a message to the logger, which should get saved to the database through the attached DbLogHandler.

The pk, class name and function name of the caller are prepended to the given message

Parameters
  • msg – message to log

  • args – args to pass to the log call

  • kwargs – kwargs to pass to the log call

property runner: Runner#

Get process runner.

save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) None[source]#

Save instance state.

See documentation of plumpy.processes.Process.save_instance_state().

set_status(status: Optional[str]) None[source]#

The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.

Parameters

status – the status message

classmethod spec() aiida.engine.processes.process_spec.ProcessSpec[source]#
spec_metadata = <aiida.engine.processes.ports.PortNamespace object>#
submit(process: Type[aiida.engine.processes.process.Process], *args, **kwargs) aiida.orm.nodes.process.process.ProcessNode[source]#

Submit process for execution.

Parameters

process – process

Returns

the calculation node of the process

update_node_state(state: plumpy.process_states.State) None[source]#
update_outputs() None[source]#

Attach new outputs to the node since the last call.

Does nothing, if self.metadata.store_provenance is False.

property uuid: str#

Return the UUID of the process which corresponds to the UUID of its associated ProcessNode.

Returns

the UUID associated to this process instance

aiida.engine.processes.process.get_query_string_from_process_type_string(process_type_string: str) str[source]#

Take the process type string of a Node and create the queryable type string.

Parameters

process_type_string (str) – the process type string

Returns

string that can be used to query for subclasses of the process type using ‘LIKE <string>’

Return type

str

AiiDA specific implementation of plumpy’s ProcessSpec.

class aiida.engine.processes.process_spec.CalcJobProcessSpec[source]#

Bases: aiida.engine.processes.process_spec.ProcessSpec

Process spec intended for the CalcJob process class.

OUTPUT_PORT_TYPE#

alias of aiida.engine.processes.ports.CalcJobOutputPort

__init__() None[source]#
__module__ = 'aiida.engine.processes.process_spec'#
property default_output_node: Optional[str]#
class aiida.engine.processes.process_spec.ProcessSpec[source]#

Bases: plumpy.process_spec.ProcessSpec

Default process spec for process classes defined in aiida-core.

This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.

INPUT_PORT_TYPE#

alias of aiida.engine.processes.ports.InputPort

METADATA_KEY: str = 'metadata'#
METADATA_OPTIONS_KEY: str = 'options'#
PORT_NAMESPACE_TYPE#

alias of aiida.engine.processes.ports.PortNamespace

__annotations__ = {'METADATA_KEY': <class 'str'>, 'METADATA_OPTIONS_KEY': <class 'str'>, 'NAME_INPUTS_PORT_NAMESPACE': 'str', 'NAME_OUTPUTS_PORT_NAMESPACE': 'str', '_exposed_inputs': 'EXPOSED_TYPE', '_exposed_outputs': 'EXPOSED_TYPE', '_ports': 'PortNamespace', '_sealed': 'bool'}#
__init__() None[source]#
__module__ = 'aiida.engine.processes.process_spec'#
exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None[source]#

Add an exit code to the ProcessSpec

Parameters
  • status – the exit status integer

  • label – a label by which the exit code can be addressed

  • message – a more detailed description of the exit code

  • invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching

property exit_codes: aiida.engine.processes.exit_code.ExitCodesNamespace#

Return the namespace of exit codes defined for this ProcessSpec

Returns

ExitCodesNamespace of ExitCode named tuples

property inputs: aiida.engine.processes.ports.PortNamespace#

Get the input port namespace of the process specification

Returns

the input PortNamespace

property metadata_key: str#
property options_key: str#
property outputs: aiida.engine.processes.ports.PortNamespace#

Get the output port namespace of the process specification

Returns

the outputs PortNamespace

property ports: aiida.engine.processes.ports.PortNamespace#