aiida.engine.processes.workchains package#

Module for the WorkChain process and related utilities.

Submodules#

Enums and function for the awaitables of Processes.

class aiida.engine.processes.workchains.awaitable.Awaitable[源代码]#

基类:AttributesDict

An attribute dictionary that represents an action that a Process could be waiting for to finish.

__module__ = 'aiida.engine.processes.workchains.awaitable'#
class aiida.engine.processes.workchains.awaitable.AwaitableAction(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]#

基类:Enum

Enum that describes the action to be taken for a given awaitable.

APPEND = 'append'#
ASSIGN = 'assign'#
__module__ = 'aiida.engine.processes.workchains.awaitable'#
class aiida.engine.processes.workchains.awaitable.AwaitableTarget(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]#

基类:Enum

Enum that describes the class of the target a given awaitable.

PROCESS = 'process'#
__module__ = 'aiida.engine.processes.workchains.awaitable'#
aiida.engine.processes.workchains.awaitable.construct_awaitable(target: Awaitable | ProcessNode) Awaitable[源代码]#

Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.

The awaitable is a simple dictionary with the following keys

  • pk: the pk of the node that is being waited on

  • action: the context action to be performed upon completion

  • outputs: a boolean that toggles whether the node itself

Currently the only awaitable classes are ProcessNode and Workflow The only awaitable actions are the Assign and Append operators

Convenience functions to add awaitables to the Context of a WorkChain.

aiida.engine.processes.workchains.context.append_(target: Awaitable | ProcessNode) Awaitable[源代码]#

Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later

参数:

target – an instance of a Process or Awaitable

返回:

the awaitable

aiida.engine.processes.workchains.context.assign_(target: Awaitable | ProcessNode) Awaitable[源代码]#

Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later

参数:

target – an instance of a Process or Awaitable

返回:

the awaitable

Base implementation of WorkChain class that implements a simple automated restart mechanism for sub processes.

class aiida.engine.processes.workchains.restart.BaseRestartWorkChain(*args: Any, **kwargs: Any)[源代码]#

基类:WorkChain

Base restart work chain.

This work chain serves as the starting point for more complex work chains that will be designed to run a sub process that might need multiple restarts to come to a successful end. These restarts may be necessary because a single process run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which are recoverable.

This work chain implements the most basic functionality to achieve this goal. It will launch the sub process, restarting until it is completed successfully or the maximum number of iterations is reached. After completion of the sub process it will be inspected, and a list of process handlers are called successively. These process handlers are defined as class methods that are decorated with process_handler().

The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few outline methods. The minimally required outline would look something like the following:

cls.setup
while_(cls.should_run_process)(
    cls.run_process,
    cls.inspect_process,
)

Each of these methods can of course be overriden but they should be general enough to fit most process cycles. The run_process method will take the inputs for the process from the context under the key inputs. The user should, therefore, make sure that before the run_process method is called, that the to be used inputs are stored under self.ctx.inputs. One can update the inputs based on the results from a prior process by calling an outline method just before the run_process step, for example:

cls.setup
while_(cls.should_run_process)(
    cls.prepare_inputs,
    cls.run_process,
    cls.inspect_process,
)

Where in the prepare_calculation method, the inputs dictionary at self.ctx.inputs is updated before the next process will be run with those inputs.

The _process_class attribute should be set to the Process class that should be run in the loop. Finally, to define handlers that will be called during the inspect_process simply define a class method with the signature (self, node) and decorate it with the process_handler decorator, for example:

@process_handler
def handle_problem(self, node):
    if some_problem:
        self.ctx.inputs = improved_inputs
        return ProcessHandlerReport()

The process_handler and ProcessHandlerReport support various arguments to control the flow of the logic of the inspect_process. Refer to their respective documentation for details.

__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_awaitables': 'list[Awaitable]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_process_class': typing.Optional[typing.Type[ForwardRef('Process')]], '_state': 'Optional[State]', '_status': 'Optional[str]', '_stepper': 'Stepper | None', '_uuid': 'Optional[uuid.UUID]'}#
__init__(*args, **kwargs) None[源代码]#

Construct the instance.

__module__ = 'aiida.engine.processes.workchains.restart'#
_abc_impl = <_abc._abc_data object>#
_attach_outputs(node) Mapping[str, Node][源代码]#

Attach the outputs of the given calculation job to the work chain.

参数:

node – The CalcJobNode whose outputs to attach.

返回:

The mapping of output nodes that were attached.

_considered_handlers_extra = 'considered_handlers'#
_process_class: Type[Process] | None = None#
_wrap_bare_dict_inputs(port_namespace: PortNamespace, inputs: Dict[str, Any]) AttributeDict[源代码]#

Wrap bare dictionaries in inputs in a Dict node if dictated by the corresponding inputs portnamespace.

参数:
  • port_namespace – a PortNamespace

  • inputs – a dictionary of inputs intended for submission of the process

返回:

an attribute dictionary with all bare dictionaries wrapped in Dict if dictated by the port namespace

classmethod define(spec: ProcessSpec) None[源代码]#

Define the process specification.

get_outputs(node) Mapping[str, Node][源代码]#

Return a mapping of the outputs that should be attached as outputs to the work chain.

By default this method returns the outputs of the last completed calculation job. This method can be overridden if the implementation wants to update those outputs before attaching them. Make sure that if the content of an output node is modified that this is done through a calcfunction in order to not lose the provenance.

classmethod get_process_handlers() List[function][源代码]#
get_process_handlers_by_priority() List[Tuple[int, function]][源代码]#

Return list of process handlers where overrides from inputs.handler_overrides are taken into account.

inspect_process() ExitCode | None[源代码]#

Analyse the results of the previous process and call the handlers when necessary.

If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that the error was handled, it is considered an unhandled process failure and the process is relaunched. If this happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the following matrix determines the logic that is followed:

Process Handler Handler Action result report? exit code —————————————– Success yes == 0 Restart Success yes != 0 Abort Failed yes == 0 Restart Failed yes != 0 Abort

If no handler returned a report and the process finished successfully, the work chain’s work is considered done and it will move on to the next step that directly follows the while conditional, if there is one defined in the outline.

classmethod is_process_handler(process_handler_name: str | function) bool[源代码]#

Return whether the given method name corresponds to a process handler of this class.

参数:

process_handler_name – string name of the instance method

返回:

boolean, True if corresponds to process handler, False otherwise

on_terminated()[源代码]#

Clean the working directories of all child calculation jobs if clean_workdir=True in the inputs.

property process_class: Type[Process]#

Return the process class to run in the loop.

results() ExitCode | None[源代码]#

Attach the outputs specified in the output specification from the last completed process.

run_process() dict[源代码]#

Run the next process, taking the input dictionary from the context at self.ctx.inputs.

setup() None[源代码]#

Initialize context variables that are used during the logical flow of the BaseRestartWorkChain.

should_run_process() bool[源代码]#

Return whether a new process should be run.

This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded.

aiida.engine.processes.workchains.restart.validate_handler_overrides(process_class: BaseRestartWorkChain, handler_overrides: Dict | None, ctx: PortNamespace) str | None[源代码]#

Validator for the handler_overrides input port of the BaseRestartWorkChain.

The handler_overrides should be a dictionary where keys are strings that are the name of a process handler, i.e. an instance method of the process_class that has been decorated with the process_handler decorator. The values should be a dictionary that can specify the keys enabled and priority.

备注

the normal signature of a port validator is (value, ctx) but since for the validation here we need a reference to the process class, we add it and the class is bound to the method in the port declaration in the define method.

参数:
  • process_class – the BaseRestartWorkChain (sub) class

  • handler_overrides – the input Dict node

  • ctx – the PortNamespace in which the port is embedded

Utilities for WorkChain implementations.

class aiida.engine.processes.workchains.utils.ProcessHandlerReport(do_break: bool = False, exit_code: ExitCode = (0, None, False))[源代码]#

基类:NamedTuple

A namedtuple to define a process handler report for a aiida.engine.BaseRestartWorkChain.

This namedtuple should be returned by a process handler of a work chain instance if the condition of the handler was met by the completed process. If no further handling should be performed after this method the do_break field should be set to True. If the handler encountered a fatal error and the work chain needs to be terminated, an ExitCode with non-zero exit status can be set. This exit code is what will be set on the work chain itself. This works because the value of the exit_code field returned by the handler, will in turn be returned by the inspect_process step and returning a non-zero exit code from any work chain step will instruct the engine to abort the work chain.

参数:
  • do_break – boolean, set to True if no further process handlers should be called, default is False

  • exit_code – an instance of the ExitCode tuple. If not explicitly set, the default ExitCode will be instantiated, which has status 0 meaning that the work chain step will be considered successful and the work chain will continue to the next step.

__annotations__ = {'do_break': <class 'bool'>, 'exit_code': <class 'aiida.engine.processes.exit_code.ExitCode'>}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('do_break', 'exit_code')#
__module__ = 'aiida.engine.processes.workchains.utils'#
static __new__(_cls, do_break: bool = False, exit_code: ExitCode = (0, None, False))#

Create new instance of ProcessHandlerReport(do_break, exit_code)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {'do_break': False, 'exit_code': (0, None, False)}#
_fields = ('do_break', 'exit_code')#
classmethod _make(iterable)#

Make a new ProcessHandlerReport object from a sequence or iterable

_replace(**kwds)#

Return a new ProcessHandlerReport object replacing specified fields with new values

do_break: bool#

Alias for field number 0

exit_code: ExitCode#

Alias for field number 1

aiida.engine.processes.workchains.utils.process_handler(wrapped: function | None = None, *, priority: int = 0, exit_codes: None | ExitCode | List[ExitCode] = None, enabled: bool = True) function[源代码]#

Decorator to register a BaseRestartWorkChain instance method as a process handler.

The decorator will validate the priority and exit_codes optional keyword arguments and then add itself as an attribute to the wrapped instance method. This is used in the inspect_process to return all instance methods of the class that have been decorated by this function and therefore are considered to be process handlers.

Requirements on the function signature of process handling functions. The function to which the decorator is applied needs to take two arguments:

  • self: This is the instance of the work chain itself

  • node: This is the process node that finished and is to be investigated

The function body typically consists of a conditional that will check for a particular problem that might have occurred for the sub process. If a particular problem is handled, the process handler should return an instance of the aiida.engine.ProcessHandlerReport tuple. If no other process handlers should be considered, the set do_break attribute should be set to True. If the work chain is to be aborted entirely, the exit_code of the report can be set to an ExitCode instance with a non-zero status.

参数:
  • wrapped – the work chain method to register the process handler with

  • priority – optional integer that defines the order in which registered handlers will be called during the handling of a finished process. Higher priorities will be handled first. Default value is 0. Multiple handlers with the same priority is allowed, but the order of those is not well defined.

  • exit_codes – single or list of ExitCode instances. If defined, the handler will return None if the exit code set on the node does not appear in the exit_codes. This is useful to have a handler called only when the process failed with a specific exit code.

  • enabled – boolean, by default True, which will cause the handler to be called during inspect_process. When set to False, the handler will be skipped. This static value can be overridden on a per work chain instance basis through the input handler_overrides.

Components for the WorkChain concept of the workflow engine.

class aiida.engine.processes.workchains.workchain.Protect(name, bases, namespace, **kwargs)[源代码]#

基类:ProcessStateMachineMeta

Metaclass that allows protecting class methods from being overridden by subclasses.

Usage as follows:

class SomeClass(metaclass=Protect):

    @Protect.final
    def private_method(self):
        "This method cannot be overridden by a subclass."

If a subclass is imported that overrides the subclass, a RuntimeError is raised.

__SENTINEL = <object object>#
__annotations__ = {}#
classmethod __is_final(method) bool#

Return whether the method has been decorated by the final classmethod.

返回:

Boolean, True if the method is marked as final, False otherwise.

__module__ = 'aiida.engine.processes.workchains.workchain'#
static __new__(mcs, name, bases, namespace, **kwargs)[源代码]#

Collect all methods that were marked as protected and raise if the subclass defines it.

抛出:

RuntimeError – If the new class defines (i.e. overrides) a method that was decorated with final.

classmethod final(method: MethodType) MethodType[源代码]#

Decorate a method with this method to protect it from being overridden.

Adds the __SENTINEL object as the __final private attribute to the given method and wraps it in the typing.final decorator. The latter indicates to typing systems that it cannot be overridden in subclasses.

class aiida.engine.processes.workchains.workchain.WorkChain(*args: Any, **kwargs: Any)[源代码]#

基类:Process

The WorkChain class is the principle component to implement workflows in AiiDA.

_CONTEXT = 'CONTEXT'#
_STEPPER_STATE = 'stepper_state'#
__abstractmethods__ = frozenset({})#
__annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_awaitables': 'list[Awaitable]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_stepper': 'Stepper | None', '_uuid': 'Optional[uuid.UUID]'}#
__init__(inputs: dict | None = None, logger: logging.Logger | None = None, runner: 'Runner' | None = None, enable_persistence: bool = True) None[源代码]#

Construct a WorkChain instance.

Construct the instance only if it is a sub class of WorkChain, otherwise raise InvalidOperation.

参数:
  • inputs – work chain inputs

  • logger – aiida logger

  • runner – work chain runner

  • enable_persistence – whether to persist this work chain

__module__ = 'aiida.engine.processes.workchains.workchain'#
_abc_impl = <_abc._abc_data object>#
_action_awaitables() None[源代码]#

Handle the awaitables that are currently registered with the work chain.

Depending on the class type of the awaitable’s target a different callback function will be bound with the awaitable and the runner will be asked to call it when the target is completed

_auto_persist: Set[str] | None = {'_awaitables', '_creation_time', '_enable_persistence', '_event_helper', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
_do_step() Any[源代码]#

Execute the next step in the outline and return the result.

If the stepper returns a non-finished status and the return value is of type ToContext, the contents of the ToContext container will be turned into awaitables if necessary. If any awaitables were created, the process will enter in the Wait state, otherwise it will go to Continue. When the stepper returns that it is done, the stepper result will be converted to None and returned, unless it is an integer or instance of ExitCode.

_insert_awaitable(awaitable: Awaitable) None[源代码]#

Insert an awaitable that should be terminated before before continuing to the next step.

参数:

awaitable – the thing to await

_node_class#

WorkChainNode 的别名

_on_awaitable_finished(awaitable: Awaitable) None[源代码]#

Callback function, for when an awaitable process instance is completed.

The awaitable will be effectuated on the context of the work chain and removed from the internal list. If all awaitables have been dealt with, the work chain process is resumed.

参数:

awaitable – an Awaitable instance

_resolve_awaitable(awaitable: Awaitable, value: Any) None[源代码]#

Resolve an awaitable.

Precondition: must be an awaitable that was previously inserted.

参数:

awaitable – the awaitable to resolve

_resolve_nested_context(key: str) tuple[AttributeDict, str][源代码]#

Returns a reference to a sub-dictionary of the context and the last key, after resolving a potentially segmented key where required sub-dictionaries are created as needed.

参数:

key – A key into the context, where words before a dot are interpreted as a key for a sub-dictionary

_spec_class#

WorkChainSpec 的别名

_store_nodes(data: Any) None[源代码]#

Recurse through a data structure and store any unstored nodes that are found along the way

参数:

data – a data structure potentially containing unstored nodes

_update_process_status() None[源代码]#

Set the process status with a message accounting the current sub processes that we are waiting for.

property ctx: AttributeDict#

Get the context.

load_instance_state(saved_state, load_context)[源代码]#

Load instance state.

参数:
  • saved_state – saved instance state

  • load_context

property node: WorkChainNode#

Return the ProcessNode used by this process to represent itself in the database.

返回:

instance of sub class of ProcessNode

on_exiting() None[源代码]#

Ensure that any unstored nodes in the context are stored, before the state is exited

After the state is exited the next state will be entered and if persistence is enabled, a checkpoint will be saved. If the context contains unstored nodes, the serialization necessary for checkpointing will fail.

on_run()[源代码]#
on_wait(awaitables: Sequence[Awaitable])[源代码]#

Entering the WAITING state.

run() Any[源代码]#

This function will be run when the process is triggered. It should be overridden by a subclass.

save_instance_state(out_state, save_context)[源代码]#

Save instance state.

参数:
  • out_state – state to save in

  • save_context (plumpy.persistence.LoadSaveContext) –

classmethod spec() WorkChainSpec[源代码]#
to_context(**kwargs: Awaitable | ProcessNode) None[源代码]#

Add a dictionary of awaitables to the context.

This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the work chain.

class aiida.engine.processes.workchains.workchain.WorkChainSpec[源代码]#

基类:ProcessSpec, WorkChainSpec

__annotations__ = {}#
__module__ = 'aiida.engine.processes.workchains.workchain'#