aiida.engine.processes.workchains package#
Module for the WorkChain process and related utilities.
Submodules#
Enums and function for the awaitables of Processes.
- class aiida.engine.processes.workchains.awaitable.Awaitable[source]#
Bases:
plumpy.utils.AttributesDict
An attribute dictionary that represents an action that a Process could be waiting for to finish.
- __module__ = 'aiida.engine.processes.workchains.awaitable'#
- class aiida.engine.processes.workchains.awaitable.AwaitableAction(value)[source]#
Bases:
enum.Enum
Enum that describes the action to be taken for a given awaitable.
- APPEND = 'append'#
- ASSIGN = 'assign'#
- __module__ = 'aiida.engine.processes.workchains.awaitable'#
- class aiida.engine.processes.workchains.awaitable.AwaitableTarget(value)[source]#
Bases:
enum.Enum
Enum that describes the class of the target a given awaitable.
- PROCESS = 'process'#
- __module__ = 'aiida.engine.processes.workchains.awaitable'#
- aiida.engine.processes.workchains.awaitable.construct_awaitable(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable [source]#
Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.
The awaitable is a simple dictionary with the following keys
pk: the pk of the node that is being waited on
action: the context action to be performed upon completion
outputs: a boolean that toggles whether the node itself
Currently the only awaitable classes are ProcessNode and Workflow The only awaitable actions are the Assign and Append operators
Convenience functions to add awaitables to the Context of a WorkChain.
- aiida.engine.processes.workchains.context.append_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable [source]#
Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later
- Parameters
target – an instance of a Process or Awaitable
- Returns
the awaitable
- aiida.engine.processes.workchains.context.assign_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.nodes.process.process.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable [source]#
Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later
- Parameters
target – an instance of a Process or Awaitable
- Returns
the awaitable
Base implementation of WorkChain class that implements a simple automated restart mechanism for sub processes.
- class aiida.engine.processes.workchains.restart.BaseRestartWorkChain(*args: Any, **kwargs: Any)[source]#
Bases:
aiida.engine.processes.workchains.workchain.WorkChain
Base restart work chain.
This work chain serves as the starting point for more complex work chains that will be designed to run a sub process that might need multiple restarts to come to a successful end. These restarts may be necessary because a single process run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which are recoverable.
This work chain implements the most basic functionality to achieve this goal. It will launch the sub process, restarting until it is completed successfully or the maximum number of iterations is reached. After completion of the sub process it will be inspected, and a list of process handlers are called successively. These process handlers are defined as class methods that are decorated with
process_handler()
.The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few outline methods. The minimally required outline would look something like the following:
cls.setup while_(cls.should_run_process)( cls.run_process, cls.inspect_process, )
Each of these methods can of course be overriden but they should be general enough to fit most process cycles. The run_process method will take the inputs for the process from the context under the key inputs. The user should, therefore, make sure that before the run_process method is called, that the to be used inputs are stored under self.ctx.inputs. One can update the inputs based on the results from a prior process by calling an outline method just before the run_process step, for example:
cls.setup while_(cls.should_run_process)( cls.prepare_inputs, cls.run_process, cls.inspect_process, )
Where in the prepare_calculation method, the inputs dictionary at self.ctx.inputs is updated before the next process will be run with those inputs.
The _process_class attribute should be set to the Process class that should be run in the loop. Finally, to define handlers that will be called during the inspect_process simply define a class method with the signature (self, node) and decorate it with the process_handler decorator, for example:
@process_handler def handle_problem(self, node): if some_problem: self.ctx.inputs = improved_inputs return ProcessHandlerReport()
The process_handler and ProcessHandlerReport support various arguments to control the flow of the logic of the inspect_process. Refer to their respective documentation for details.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_awaitables': 'list[Awaitable]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_process_class': typing.Optional[typing.Type[ForwardRef('Process')]], '_state': 'Optional[State]', '_status': 'Optional[str]', '_stepper': 'Stepper | None', '_uuid': 'Optional[uuid.UUID]'}#
- __module__ = 'aiida.engine.processes.workchains.restart'#
- _abc_impl = <_abc._abc_data object>#
- _considered_handlers_extra = 'considered_handlers'#
- _wrap_bare_dict_inputs(port_namespace: PortNamespace, inputs: Dict[str, Any]) aiida.common.extendeddicts.AttributeDict [source]#
Wrap bare dictionaries in inputs in a Dict node if dictated by the corresponding inputs portnamespace.
- Parameters
port_namespace – a PortNamespace
inputs – a dictionary of inputs intended for submission of the process
- Returns
an attribute dictionary with all bare dictionaries wrapped in Dict if dictated by the port namespace
- classmethod define(spec: ProcessSpec) None [source]#
Define the process specification.
- get_outputs(node) Mapping[str, aiida.orm.nodes.node.Node] [source]#
Return a mapping of the outputs that should be attached as outputs to the work chain.
By default this method returns the outputs of the last completed calculation job. This method can be overridden if the implementation wants to update those outputs before attaching them. Make sure that if the content of an output node is modified that this is done through a calcfunction in order to not lose the provenance.
- get_process_handlers_by_priority() List[Tuple[int, function]] [source]#
Return list of process handlers where overrides from
inputs.handler_overrides
are taken into account.
- inspect_process() Optional[ExitCode] [source]#
Analyse the results of the previous process and call the handlers when necessary.
If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that the error was handled, it is considered an unhandled process failure and the process is relaunched. If this happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the following matrix determines the logic that is followed:
Process Handler Handler Action result report? exit code —————————————– Success yes == 0 Restart Success yes != 0 Abort Failed yes == 0 Restart Failed yes != 0 Abort
If no handler returned a report and the process finished successfully, the work chain’s work is considered done and it will move on to the next step that directly follows the while conditional, if there is one defined in the outline.
- classmethod is_process_handler(process_handler_name: Union[str, function]) bool [source]#
Return whether the given method name corresponds to a process handler of this class.
- Parameters
process_handler_name – string name of the instance method
- Returns
boolean, True if corresponds to process handler, False otherwise
- on_terminated()[source]#
Clean the working directories of all child calculation jobs if clean_workdir=True in the inputs.
- results() Optional[ExitCode] [source]#
Attach the outputs specified in the output specification from the last completed process.
- run_process() dict [source]#
Run the next process, taking the input dictionary from the context at self.ctx.inputs.
- aiida.engine.processes.workchains.restart.validate_handler_overrides(process_class: BaseRestartWorkChain, handler_overrides: Optional[aiida.orm.nodes.data.dict.Dict], ctx: PortNamespace) Optional[str] [source]#
Validator for the
handler_overrides
input port of theBaseRestartWorkChain
.The
handler_overrides
should be a dictionary where keys are strings that are the name of a process handler, i.e. an instance method of theprocess_class
that has been decorated with theprocess_handler
decorator. The values should be a dictionary that can specify the keysenabled
andpriority
.Note
the normal signature of a port validator is
(value, ctx)
but since for the validation here we need a reference to the process class, we add it and the class is bound to the method in the port declaration in thedefine
method.- Parameters
process_class – the
BaseRestartWorkChain
(sub) classhandler_overrides – the input
Dict
nodectx – the
PortNamespace
in which the port is embedded
Utilities for WorkChain implementations.
- class aiida.engine.processes.workchains.utils.ProcessHandlerReport(do_break: bool = False, exit_code: aiida.engine.processes.exit_code.ExitCode = ExitCode(status=0, message=None, invalidates_cache=False))[source]#
Bases:
NamedTuple
A namedtuple to define a process handler report for a
aiida.engine.BaseRestartWorkChain
.This namedtuple should be returned by a process handler of a work chain instance if the condition of the handler was met by the completed process. If no further handling should be performed after this method the do_break field should be set to True. If the handler encountered a fatal error and the work chain needs to be terminated, an ExitCode with non-zero exit status can be set. This exit code is what will be set on the work chain itself. This works because the value of the exit_code field returned by the handler, will in turn be returned by the inspect_process step and returning a non-zero exit code from any work chain step will instruct the engine to abort the work chain.
- Parameters
do_break – boolean, set to True if no further process handlers should be called, default is False
exit_code – an instance of the
ExitCode
tuple. If not explicitly set, the default ExitCode will be instantiated, which has status 0 meaning that the work chain step will be considered successful and the work chain will continue to the next step.
- __annotations__ = {'do_break': <class 'bool'>, 'exit_code': <class 'aiida.engine.processes.exit_code.ExitCode'>}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('do_break', 'exit_code')#
- __module__ = 'aiida.engine.processes.workchains.utils'#
- static __new__(_cls, do_break: bool = False, exit_code: aiida.engine.processes.exit_code.ExitCode = ExitCode(status=0, message=None, invalidates_cache=False))#
Create new instance of ProcessHandlerReport(do_break, exit_code)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {'do_break': False, 'exit_code': ExitCode(status=0, message=None, invalidates_cache=False)}#
- _fields = ('do_break', 'exit_code')#
- classmethod _make(iterable)#
Make a new ProcessHandlerReport object from a sequence or iterable
- _replace(**kwds)#
Return a new ProcessHandlerReport object replacing specified fields with new values
- exit_code: aiida.engine.processes.exit_code.ExitCode#
Alias for field number 1
- aiida.engine.processes.workchains.utils.process_handler(wrapped: Optional[function] = None, *, priority: int = 0, exit_codes: Union[None, aiida.engine.processes.exit_code.ExitCode, List[aiida.engine.processes.exit_code.ExitCode]] = None, enabled: bool = True) function [source]#
Decorator to register a
BaseRestartWorkChain
instance method as a process handler.The decorator will validate the priority and exit_codes optional keyword arguments and then add itself as an attribute to the wrapped instance method. This is used in the inspect_process to return all instance methods of the class that have been decorated by this function and therefore are considered to be process handlers.
Requirements on the function signature of process handling functions. The function to which the decorator is applied needs to take two arguments:
self: This is the instance of the work chain itself
node: This is the process node that finished and is to be investigated
The function body typically consists of a conditional that will check for a particular problem that might have occurred for the sub process. If a particular problem is handled, the process handler should return an instance of the
aiida.engine.ProcessHandlerReport
tuple. If no other process handlers should be considered, the set do_break attribute should be set to True. If the work chain is to be aborted entirely, the exit_code of the report can be set to an ExitCode instance with a non-zero status.- Parameters
wrapped – the work chain method to register the process handler with
priority – optional integer that defines the order in which registered handlers will be called during the handling of a finished process. Higher priorities will be handled first. Default value is 0. Multiple handlers with the same priority is allowed, but the order of those is not well defined.
exit_codes – single or list of ExitCode instances. If defined, the handler will return None if the exit code set on the node does not appear in the exit_codes. This is useful to have a handler called only when the process failed with a specific exit code.
enabled – boolean, by default True, which will cause the handler to be called during inspect_process. When set to False, the handler will be skipped. This static value can be overridden on a per work chain instance basis through the input handler_overrides.
Components for the WorkChain concept of the workflow engine.
- class aiida.engine.processes.workchains.workchain.Protect(name, bases, namespace, **kwargs)[source]#
Bases:
plumpy.processes.ProcessStateMachineMeta
Metaclass that allows protecting class methods from being overridden by subclasses.
Usage as follows:
class SomeClass(metaclass=Protect): @Protect.final def private_method(self): "This method cannot be overridden by a subclass."
If a subclass is imported that overrides the subclass, a
RuntimeError
is raised.- __SENTINEL = <object object>#
- __annotations__ = {}#
- classmethod __is_final(method) bool #
Return whether the method has been decorated by the
final
classmethod.- Returns
Boolean,
True
if the method is marked as final,False
otherwise.
- __module__ = 'aiida.engine.processes.workchains.workchain'#
- static __new__(cls, name, bases, namespace, **kwargs)[source]#
Collect all methods that were marked as protected and raise if the subclass defines it.
- Raises
RuntimeError – If the new class defines (i.e. overrides) a method that was decorated with
final
.
- classmethod final(method: Any)[source]#
Decorate a method with this method to protect it from being overridden.
Adds the
__SENTINEL
object as the__final
private attribute to the givenmethod
and wraps it in thetyping.final
decorator. The latter indicates to typing systems that it cannot be overridden in subclasses.
- class aiida.engine.processes.workchains.workchain.WorkChain(*args: Any, **kwargs: Any)[source]#
Bases:
aiida.engine.processes.process.Process
The WorkChain class is the principle component to implement workflows in AiiDA.
- _CONTEXT = 'CONTEXT'#
- _STEPPER_STATE = 'stepper_state'#
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_awaitables': 'list[Awaitable]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_stepper': 'Stepper | None', '_uuid': 'Optional[uuid.UUID]'}#
- __init__(inputs: dict | None = None, logger: logging.Logger | None = None, runner: 'Runner' | None = None, enable_persistence: bool = True) None [source]#
Construct a WorkChain instance.
Construct the instance only if it is a sub class of WorkChain, otherwise raise InvalidOperation.
- Parameters
inputs – work chain inputs
logger – aiida logger
runner – work chain runner
enable_persistence – whether to persist this work chain
- __module__ = 'aiida.engine.processes.workchains.workchain'#
- _abc_impl = <_abc._abc_data object>#
- _action_awaitables() None [source]#
Handle the awaitables that are currently registered with the work chain.
Depending on the class type of the awaitable’s target a different callback function will be bound with the awaitable and the runner will be asked to call it when the target is completed
- _auto_persist: Optional[Set[str]] = {'_awaitables', '_creation_time', '_enable_persistence', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
- _do_step() Any [source]#
Execute the next step in the outline and return the result.
If the stepper returns a non-finished status and the return value is of type ToContext, the contents of the ToContext container will be turned into awaitables if necessary. If any awaitables were created, the process will enter in the Wait state, otherwise it will go to Continue. When the stepper returns that it is done, the stepper result will be converted to None and returned, unless it is an integer or instance of ExitCode.
- _insert_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None [source]#
Insert an awaitable that should be terminated before before continuing to the next step.
- Parameters
awaitable – the thing to await
- _node_class#
alias of
aiida.orm.nodes.process.workflow.workchain.WorkChainNode
- _on_awaitable_finished(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None [source]#
Callback function, for when an awaitable process instance is completed.
The awaitable will be effectuated on the context of the work chain and removed from the internal list. If all awaitables have been dealt with, the work chain process is resumed.
- Parameters
awaitable – an Awaitable instance
- _resolve_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable, value: Any) None [source]#
Resolve an awaitable.
Precondition: must be an awaitable that was previously inserted.
- Parameters
awaitable – the awaitable to resolve
- _resolve_nested_context(key: str) tuple[aiida.common.extendeddicts.AttributeDict, str] [source]#
Returns a reference to a sub-dictionary of the context and the last key, after resolving a potentially segmented key where required sub-dictionaries are created as needed.
- Parameters
key – A key into the context, where words before a dot are interpreted as a key for a sub-dictionary
- _spec_class#
alias of
aiida.engine.processes.workchains.workchain.WorkChainSpec
- _store_nodes(data: Any) None [source]#
Recurse through a data structure and store any unstored nodes that are found along the way
- Parameters
data – a data structure potentially containing unstored nodes
- _update_process_status() None [source]#
Set the process status with a message accounting the current sub processes that we are waiting for.
- property ctx: aiida.common.extendeddicts.AttributeDict#
Get the context.
- load_instance_state(saved_state, load_context)[source]#
Load instance state.
- Parameters
saved_state – saved instance state
load_context –
- property node: aiida.orm.nodes.process.workflow.workchain.WorkChainNode#
Return the ProcessNode used by this process to represent itself in the database.
- Returns
instance of sub class of ProcessNode
- on_exiting() None [source]#
Ensure that any unstored nodes in the context are stored, before the state is exited
After the state is exited the next state will be entered and if persistence is enabled, a checkpoint will be saved. If the context contains unstored nodes, the serialization necessary for checkpointing will fail.
- on_wait(awaitables: Sequence[aiida.engine.processes.workchains.awaitable.Awaitable])[source]#
Entering the WAITING state.
- run() Any [source]#
This function will be run when the process is triggered. It should be overridden by a subclass.
- save_instance_state(out_state, save_context)[source]#
Save instance state.
- Parameters
out_state – state to save in
save_context (
plumpy.persistence.LoadSaveContext
) –
- classmethod spec() aiida.engine.processes.workchains.workchain.WorkChainSpec [source]#
- to_context(**kwargs: aiida.engine.processes.workchains.awaitable.Awaitable | aiida.orm.nodes.process.process.ProcessNode) None [source]#
Add a dictionary of awaitables to the context.
This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the work chain.
- class aiida.engine.processes.workchains.workchain.WorkChainSpec[source]#
Bases:
aiida.engine.processes.process_spec.ProcessSpec
,plumpy.workchains.WorkChainSpec
- __annotations__ = {}#
- __module__ = 'aiida.engine.processes.workchains.workchain'#