aiida.engine.processes.workchains package#

Module for the WorkChain process and related utilities.


Enums and function for the awaitables of Processes.

class aiida.engine.processes.workchains.awaitable.Awaitable[source]#

Bases: plumpy.utils.AttributesDict

An attribute dictionary that represents an action that a Process could be waiting for to finish.

__module__ = 'aiida.engine.processes.workchains.awaitable'#
class aiida.engine.processes.workchains.awaitable.AwaitableAction(value)[source]#

Bases: enum.Enum

Enum that describes the action to be taken for a given awaitable.

APPEND = 'append'#
ASSIGN = 'assign'#
__module__ = 'aiida.engine.processes.workchains.awaitable'#
class aiida.engine.processes.workchains.awaitable.AwaitableTarget(value)[source]#

Bases: enum.Enum

Enum that describes the class of the target a given awaitable.

PROCESS = 'process'#
__module__ = 'aiida.engine.processes.workchains.awaitable'#
aiida.engine.processes.workchains.awaitable.construct_awaitable(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable[source]#

Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.

The awaitable is a simple dictionary with the following keys

  • pk: the pk of the node that is being waited on

  • action: the context action to be performed upon completion

  • outputs: a boolean that toggles whether the node itself

Currently the only awaitable classes are ProcessNode and Workflow The only awaitable actions are the Assign and Append operators

Convenience functions to add awaitables to the Context of a WorkChain.

aiida.engine.processes.workchains.context.append_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable[source]#

Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later


target – an instance of a Process or Awaitable


the awaitable

aiida.engine.processes.workchains.context.assign_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable[source]#

Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later


target – an instance of a Process or Awaitable


the awaitable

Base implementation of WorkChain class that implements a simple automated restart mechanism for sub processes.

class aiida.engine.processes.workchains.restart.BaseRestartWorkChain(*args: Any, **kwargs: Any)[source]#

Bases: aiida.engine.processes.workchains.workchain.WorkChain

Base restart work chain.

This work chain serves as the starting point for more complex work chains that will be designed to run a sub process that might need multiple restarts to come to a successful end. These restarts may be necessary because a single process run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which are recoverable.

This work chain implements the most basic functionality to achieve this goal. It will launch the sub process, restarting until it is completed successfully or the maximum number of iterations is reached. After completion of the sub process it will be inspected, and a list of process handlers are called successively. These process handlers are defined as class methods that are decorated with process_handler().

The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few outline methods. The minimally required outline would look something like the following:


Each of these methods can of course be overriden but they should be general enough to fit most process cycles. The run_process method will take the inputs for the process from the context under the key inputs. The user should, therefore, make sure that before the run_process method is called, that the to be used inputs are stored under self.ctx.inputs. One can update the inputs based on the results from a prior process by calling an outline method just before the run_process step, for example:


Where in the prepare_calculation method, the inputs dictionary at self.ctx.inputs is updated before the next process will be run with those inputs.

The _process_class attribute should be set to the Process class that should be run in the loop. Finally, to define handlers that will be called during the inspect_process simply define a class method with the signature (self, node) and decorate it with the process_handler decorator, for example:

def handle_problem(self, node):
    if some_problem:
        self.ctx.inputs = improved_inputs
        return ProcessHandlerReport()

The process_handler and ProcessHandlerReport support various arguments to control the flow of the logic of the inspect_process. Refer to their respective documentation for details.

__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_awaitables': 'List[Awaitable]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_process_class': typing.Union[typing.Type[ForwardRef('Process')], NoneType], '_state': 'Optional[State]', '_status': 'Optional[str]', '_stepper': 'Optional[Stepper]', '_uuid': 'Optional[uuid.UUID]'}#
__init__(*args, **kwargs) None[source]#

Construct the instance.

__module__ = 'aiida.engine.processes.workchains.restart'#
_abc_impl = <_abc_data object>#
_considered_handlers_extra = 'considered_handlers'#
_process_class: Optional[Type[Process]] = None#
_wrap_bare_dict_inputs(port_namespace: PortNamespace, inputs: Dict[str, Any]) aiida.common.extendeddicts.AttributeDict[source]#

Wrap bare dictionaries in inputs in a Dict node if dictated by the corresponding inputs portnamespace.

  • port_namespace – a PortNamespace

  • inputs – a dictionary of inputs intended for submission of the process


an attribute dictionary with all bare dictionaries wrapped in Dict if dictated by the port namespace

classmethod define(spec: ProcessSpec) None[source]#

Define the process specification.

get_outputs(node) Mapping[str, aiida.orm.nodes.node.Node][source]#

Return a mapping of the outputs that should be attached as outputs to the work chain.

By default this method returns the outputs of the last completed calculation job. This method can be overridden if the implementation wants to update those outputs before attaching them. Make sure that if the content of an output node is modified that this is done through a calcfunction in order to not lose the provenance.

classmethod get_process_handlers() List[function][source]#
get_process_handlers_by_priority() List[Tuple[int, function]][source]#

Return list of process handlers where overrides from inputs.handler_overrides are taken into account.

inspect_process() Optional[ExitCode][source]#

Analyse the results of the previous process and call the handlers when necessary.

If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that the error was handled, it is considered an unhandled process failure and the process is relaunched. If this happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the following matrix determines the logic that is followed:

Process Handler Handler Action result report? exit code —————————————– Success yes == 0 Restart Success yes != 0 Abort Failed yes == 0 Restart Failed yes != 0 Abort

If no handler returned a report and the process finished successfully, the work chain’s work is considered done and it will move on to the next step that directly follows the while conditional, if there is one defined in the outline.

classmethod is_process_handler(process_handler_name: Union[str, function]) bool[source]#

Return whether the given method name corresponds to a process handler of this class.


process_handler_name – string name of the instance method


boolean, True if corresponds to process handler, False otherwise


Clean the working directories of all child calculation jobs if clean_workdir=True in the inputs.

property process_class: Type[Process]#

Return the process class to run in the loop.

results() Optional[ExitCode][source]#

Attach the outputs specified in the output specification from the last completed process.

run_process() dict[source]#

Run the next process, taking the input dictionary from the context at self.ctx.inputs.

setup() None[source]#

Initialize context variables that are used during the logical flow of the BaseRestartWorkChain.

should_run_process() bool[source]#

Return whether a new process should be run.

This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded.

aiida.engine.processes.workchains.restart.validate_handler_overrides(process_class: BaseRestartWorkChain, handler_overrides: Optional[], ctx: PortNamespace) Optional[str][source]#

Validator for the handler_overrides input port of the BaseRestartWorkChain.

The handler_overrides should be a dictionary where keys are strings that are the name of a process handler, i.e. an instance method of the process_class that has been decorated with the process_handler decorator. The values should be a dictionary that can specify the keys enabled and priority.


the normal signature of a port validator is (value, ctx) but since for the validation here we need a reference to the process class, we add it and the class is bound to the method in the port declaration in the define method.

  • process_class – the BaseRestartWorkChain (sub) class

  • handler_overrides – the input Dict node

  • ctx – the PortNamespace in which the port is embedded

Utilities for WorkChain implementations.

class aiida.engine.processes.workchains.utils.ProcessHandlerReport(do_break: bool = False, exit_code: aiida.engine.processes.exit_code.ExitCode = ExitCode(status=0, message=None, invalidates_cache=False))[source]#

Bases: tuple

A namedtuple to define a process handler report for a aiida.engine.BaseRestartWorkChain.

This namedtuple should be returned by a process handler of a work chain instance if the condition of the handler was met by the completed process. If no further handling should be performed after this method the do_break field should be set to True. If the handler encountered a fatal error and the work chain needs to be terminated, an ExitCode with non-zero exit status can be set. This exit code is what will be set on the work chain itself. This works because the value of the exit_code field returned by the handler, will in turn be returned by the inspect_process step and returning a non-zero exit code from any work chain step will instruct the engine to abort the work chain.

  • do_break – boolean, set to True if no further process handlers should be called, default is False

  • exit_code – an instance of the ExitCode tuple. If not explicitly set, the default ExitCode will be instantiated, which has status 0 meaning that the work chain step will be considered successful and the work chain will continue to the next step.

__annotations__ = {'do_break': <class 'bool'>, 'exit_code': <class 'aiida.engine.processes.exit_code.ExitCode'>}#

Return self as a plain tuple. Used by copy and pickle.

__module__ = 'aiida.engine.processes.workchains.utils'#
static __new__(_cls, do_break: bool = False, exit_code: aiida.engine.processes.exit_code.ExitCode = ExitCode(status=0, message=None, invalidates_cache=False))#

Create new instance of ProcessHandlerReport(do_break, exit_code)


Return a nicely formatted representation string

__slots__ = ()#

Return a new dict which maps field names to their values.

_field_defaults = {'do_break': False, 'exit_code': ExitCode(status=0, message=None, invalidates_cache=False)}#
_field_types = {'do_break': <class 'bool'>, 'exit_code': <class 'aiida.engine.processes.exit_code.ExitCode'>}#
_fields = ('do_break', 'exit_code')#
_fields_defaults = {}#
classmethod _make(iterable)#

Make a new ProcessHandlerReport object from a sequence or iterable


Return a new ProcessHandlerReport object replacing specified fields with new values

do_break: bool#

Alias for field number 0

exit_code: aiida.engine.processes.exit_code.ExitCode#

Alias for field number 1

aiida.engine.processes.workchains.utils.process_handler(wrapped: Optional[function] = None, *, priority: int = 0, exit_codes: Union[None, aiida.engine.processes.exit_code.ExitCode, List[aiida.engine.processes.exit_code.ExitCode]] = None, enabled: bool = True) function[source]#

Decorator to register a BaseRestartWorkChain instance method as a process handler.

The decorator will validate the priority and exit_codes optional keyword arguments and then add itself as an attribute to the wrapped instance method. This is used in the inspect_process to return all instance methods of the class that have been decorated by this function and therefore are considered to be process handlers.

Requirements on the function signature of process handling functions. The function to which the decorator is applied needs to take two arguments:

  • self: This is the instance of the work chain itself

  • node: This is the process node that finished and is to be investigated

The function body typically consists of a conditional that will check for a particular problem that might have occurred for the sub process. If a particular problem is handled, the process handler should return an instance of the aiida.engine.ProcessHandlerReport tuple. If no other process handlers should be considered, the set do_break attribute should be set to True. If the work chain is to be aborted entirely, the exit_code of the report can be set to an ExitCode instance with a non-zero status.

  • wrapped – the work chain method to register the process handler with

  • priority – optional integer that defines the order in which registered handlers will be called during the handling of a finished process. Higher priorities will be handled first. Default value is 0. Multiple handlers with the same priority is allowed, but the order of those is not well defined.

  • exit_codes – single or list of ExitCode instances. If defined, the handler will return None if the exit code set on the node does not appear in the exit_codes. This is useful to have a handler called only when the process failed with a specific exit code.

  • enabled – boolean, by default True, which will cause the handler to be called during inspect_process. When set to False, the handler will be skipped. This static value can be overridden on a per work chain instance basis through the input handler_overrides.

Components for the WorkChain concept of the workflow engine.

class aiida.engine.processes.workchains.workchain.WorkChain(*args: Any, **kwargs: Any)[source]#

Bases: aiida.engine.processes.process.Process

The WorkChain class is the principle component to implement workflows in AiiDA.

_STEPPER_STATE = 'stepper_state'#
__abstractmethods__ = frozenset({})#
__called: bool#
__init__(inputs: Optional[dict] = None, logger: Optional[logging.Logger] = None, runner: Optional[Runner] = None, enable_persistence: bool = True) None[source]#

Construct a WorkChain instance.

Construct the instance only if it is a sub class of WorkChain, otherwise raise InvalidOperation.

  • inputs – work chain inputs

  • logger – aiida logger

  • runner – work chain runner

  • enable_persistence – whether to persist this work chain

__module__ = 'aiida.engine.processes.workchains.workchain'#
_abc_impl = <_abc_data object>#
_auto_persist: Optional[Set[str]] = {'_awaitables', '_creation_time', '_enable_persistence', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
_creation_time: Optional[float]#
_do_step() Any[source]#

Execute the next step in the outline and return the result.

If the stepper returns a non-finished status and the return value is of type ToContext, the contents of the ToContext container will be turned into awaitables if necessary. If any awaitables were created, the process will enter in the Wait state, otherwise it will go to Continue. When the stepper returns that it is done, the stepper result will be converted to None and returned, unless it is an integer or instance of ExitCode.

_event_callbacks: Dict[Hashable, List[EVENT_CALLBACK_TYPE]]#
_node: Optional[orm.ProcessNode]#

alias of aiida.orm.nodes.process.workflow.workchain.WorkChainNode

_outputs: Dict[str, Any]#
_parsed_inputs: Optional[utils.AttributesFrozendict]#
_pre_paused_status: Optional[str]#
_resolve_nested_context(key: str) Tuple[aiida.common.extendeddicts.AttributeDict, str][source]#

Returns a reference to a sub-dictionary of the context and the last key, after resolving a potentially segmented key where required sub-dictionaries are created as needed.


key – A key into the context, where words before a dot are interpreted as a key for a sub-dictionary


alias of aiida.engine.processes.workchains.workchain.WorkChainSpec

_state: Optional[State]#
_status: Optional[str]#
_store_nodes(data: Any) None[source]#

Recurse through a data structure and store any unstored nodes that are found along the way


data – a data structure potentially containing unstored nodes

_update_process_status() None[source]#

Set the process status with a message accounting the current sub processes that we are waiting for.

_uuid: Optional[uuid.UUID]#
action_awaitables() None[source]#

Handle the awaitables that are currently registered with the work chain.

Depending on the class type of the awaitable’s target a different callback function will be bound with the awaitable and the runner will be asked to call it when the target is completed

property ctx: aiida.common.extendeddicts.AttributeDict#

Get the context.

insert_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None[source]#

Insert an awaitable that should be terminated before before continuing to the next step.


awaitable – the thing to await

load_instance_state(saved_state, load_context)[source]#

Load instance state.

  • saved_state – saved instance state

  • load_context

on_exiting() None[source]#

Ensure that any unstored nodes in the context are stored, before the state is exited

After the state is exited the next state will be entered and if persistence is enabled, a checkpoint will be saved. If the context contains unstored nodes, the serialization necessary for checkpointing will fail.

on_process_finished(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None[source]#

Callback function called by the runner when the process instance identified by pk is completed.

The awaitable will be effectuated on the context of the work chain and removed from the internal list. If all awaitables have been dealt with, the work chain process is resumed.


awaitable – an Awaitable instance

on_wait(awaitables: Sequence[aiida.engine.processes.workchains.awaitable.Awaitable])[source]#

Entering the WAITING state.

resolve_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable, value: Any) None[source]#

Resolve an awaitable.

Precondition: must be an awaitable that was previously inserted.


awaitable – the awaitable to resolve

run() Any[source]#

This function will be run when the process is triggered. It should be overridden by a subclass.

save_instance_state(out_state, save_context)[source]#

Save instance state.

  • out_state – state to save in

  • save_context (plumpy.persistence.LoadSaveContext) –

classmethod spec() aiida.engine.processes.workchains.workchain.WorkChainSpec[source]#
to_context(**kwargs: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) None[source]#

Add a dictionary of awaitables to the context.

This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the work chain.

class aiida.engine.processes.workchains.workchain.WorkChainSpec[source]#

Bases: aiida.engine.processes.process_spec.ProcessSpec, plumpy.workchains.WorkChainSpec

__module__ = 'aiida.engine.processes.workchains.workchain'#
_exposed_inputs: EXPOSED_TYPE#
_exposed_outputs: EXPOSED_TYPE#
_ports: PortNamespace#
_sealed: bool#