aiida.engine.processes package#
Module for processes and related utilities.
Subpackages#
Submodules#
Convenience classes to help building the input dictionaries for Processes.
- class aiida.engine.processes.builder.PrettyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#
Bases:
json.encoder.JSONEncoder
JSON encoder for returning a pretty representation of an AiiDA
ProcessBuilder
.- __module__ = 'aiida.engine.processes.builder'#
- default(o)[source]#
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
- class aiida.engine.processes.builder.ProcessBuilder(process_class: Type[Process])[source]#
Bases:
aiida.engine.processes.builder.ProcessBuilderNamespace
A process builder that helps setting up the inputs for creating a new process.
- __abstractmethods__ = frozenset({})#
- __init__(process_class: Type[Process])[source]#
Construct a ProcessBuilder instance for the given Process class.
- Parameters
process_class – the Process subclass
- __module__ = 'aiida.engine.processes.builder'#
- _abc_impl = <_abc._abc_data object>#
- class aiida.engine.processes.builder.ProcessBuilderNamespace(port_namespace: aiida.engine.processes.ports.PortNamespace)[source]#
Bases:
collections.abc.MutableMapping
Input namespace for the ProcessBuilder.
Dynamically generates the getters and setters for the input ports of a given PortNamespace
- __abstractmethods__ = frozenset({})#
- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.builder', '__doc__': 'Input namespace for the `ProcessBuilder`.\n\n Dynamically generates the getters and setters for the input ports of a given PortNamespace\n ', '__init__': <function ProcessBuilderNamespace.__init__>, '__setattr__': <function ProcessBuilderNamespace.__setattr__>, '__repr__': <function ProcessBuilderNamespace.__repr__>, '__dir__': <function ProcessBuilderNamespace.__dir__>, '__iter__': <function ProcessBuilderNamespace.__iter__>, '__len__': <function ProcessBuilderNamespace.__len__>, '__getitem__': <function ProcessBuilderNamespace.__getitem__>, '__setitem__': <function ProcessBuilderNamespace.__setitem__>, '__delitem__': <function ProcessBuilderNamespace.__delitem__>, '__delattr__': <function ProcessBuilderNamespace.__delattr__>, '_recursive_merge': <function ProcessBuilderNamespace._recursive_merge>, '_merge': <function ProcessBuilderNamespace._merge>, '_update': <function ProcessBuilderNamespace._update>, '_inputs': <function ProcessBuilderNamespace._inputs>, '__dict__': <attribute '__dict__' of 'ProcessBuilderNamespace' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessBuilderNamespace' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>, '__annotations__': {}})#
- __init__(port_namespace: aiida.engine.processes.ports.PortNamespace) None [source]#
Dynamically construct the get and set properties for the ports of the given port namespace.
For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.
- Parameters
port_namespace – the inputs PortNamespace for which to construct the builder
- __module__ = 'aiida.engine.processes.builder'#
- __setattr__(attr: str, value: Any) None [source]#
Assign the given value to the port with key attr.
Note
Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec
- __weakref__#
list of weak references to the object (if defined)
- _abc_impl = <_abc._abc_data object>#
- _inputs(prune: bool = False) dict [source]#
Return the entire mapping of inputs specified for this builder.
- Parameters
prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever
- Returns
mapping of inputs ports and their input values.
- _merge(*args, **kwds)[source]#
Merge the content of a dictionary or keyword arguments in .
Note
This method differs in behavior from
_update
in that_merge
will recursively update the existing dictionary with the one that is specified in the arguments. The_update
method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.The method is prefixed with an underscore in order to not reserve the name for a potential port.
- Parameters
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
- _recursive_merge(dictionary, key, value)[source]#
Recursively merge the contents of
dictionary
setting itskey
tovalue
.
- _update(*args, **kwds)[source]#
Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.
The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.
- Parameters
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
Functions to control and interact with running processes.
- exception aiida.engine.processes.control.ProcessTimeoutException[source]#
Bases:
aiida.common.exceptions.AiidaException
Raised when action to communicate with a process times out.
- __annotations__ = {}#
- __module__ = 'aiida.engine.processes.control'#
- aiida.engine.processes.control._perform_actions(processes: list[aiida.orm.nodes.process.process.ProcessNode], action: Callable, infinitive: str, present: str, timeout: Optional[float] = None, wait: bool = False, **kwargs: Any) None [source]#
Perform an action on a list of processes.
- Parameters
processes – The list of processes to perform the action on.
action – The action to perform.
infinitive – The infinitive of the verb that represents the action.
present – The present tense of the verb that represents the action.
past – The past tense of the verb that represents the action.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.kwargs – Keyword arguments that will be passed to the method
action
.
- Raises
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control._resolve_futures(futures: dict[concurrent.futures._base.Future, aiida.orm.nodes.process.process.ProcessNode], infinitive: str, present: str, wait: bool = False, timeout: Optional[float] = None) None [source]#
Process a mapping of futures representing an action on an active process.
This function will echo the correct information strings based on the outcomes of the futures and the given verb conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a timeout to put a maximum wait time on the actions.
- Parameters
futures – The map of action futures and the corresponding processes.
infinitive – The infinitive form of the action verb.
present – The present tense form of the action verb.
wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.
- aiida.engine.processes.control.get_active_processes(paused: bool = False) list[aiida.orm.nodes.process.process.ProcessNode] [source]#
Return all active processes, i.e., those with a process state of created, waiting or running.
- Parameters
paused – Boolean, if True, filter for processes that are paused.
- Returns
A list of process nodes of active processes.
- aiida.engine.processes.control.kill_processes(processes: Optional[list[aiida.orm.nodes.process.process.ProcessNode]] = None, *, message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [source]#
Kill running processes.
Note
Requires the daemon to be running, or processes will be unresponsive.
- Parameters
processes – List of processes to play.
all_entries – Kill all active processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- Raises
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.pause_processes(processes: Optional[list[aiida.orm.nodes.process.process.ProcessNode]] = None, *, message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [source]#
Pause running processes.
Note
Requires the daemon to be running, or processes will be unresponsive.
- Parameters
processes – List of processes to play.
all_entries – Pause all playing processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- Raises
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.play_processes(processes: Optional[list[aiida.orm.nodes.process.process.ProcessNode]] = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [source]#
Play (unpause) paused processes.
Note
Requires the daemon to be running, or processes will be unresponsive.
- Parameters
processes – List of processes to play.
all_entries – Play all paused processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- Raises
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.revive_processes(processes: list[aiida.orm.nodes.process.process.ProcessNode], *, wait: bool = False) None [source]#
Revive processes that seem stuck and are no longer reachable.
Warning: Use only as a last resort after you’ve gone through the checklist below.
Does
verdi status
indicate that both daemon and RabbitMQ are running properly? If not, restart the daemon withverdi daemon restart --reset
and restart RabbitMQ.Try to play the process through
play_processes
. If aProcessTimeoutException
is raised use this method to attempt to revive it.
Details: When RabbitMQ loses the process task before the process has completed, the process is never picked up by the daemon and will remain “stuck”. This method recreates the task, which can lead to multiple instances of the task being executed and should thus be used with caution.
Note
Requires the daemon to be running.
- Parameters
processes – List of processes to revive.
wait – Set to
True
to wait for a response, forFalse
the action is fire-and-forget.
A namedtuple and namespace for ExitCodes that can be used to exit from Processes.
- class aiida.engine.processes.exit_code.ExitCode(status: int = 0, message: Optional[str] = None, invalidates_cache: bool = False)[source]#
Bases:
NamedTuple
A simple data class to define an exit code for a
Process
.When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.
- Parameters
status – positive integer exit status, where a non-zero value indicated the process failed, default is 0
message – optional message with more details about the failure mode
invalidates_cache – optional flag, indicating that a process should not be used in caching
- __annotations__ = {'invalidates_cache': <class 'bool'>, 'message': typing.Optional[str], 'status': <class 'int'>}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('status', 'message', 'invalidates_cache')#
- __module__ = 'aiida.engine.processes.exit_code'#
- static __new__(_cls, status: int = 0, message: Optional[str] = None, invalidates_cache: bool = False)#
Create new instance of ExitCode(status, message, invalidates_cache)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {'invalidates_cache': False, 'message': None, 'status': 0}#
- _fields = ('status', 'message', 'invalidates_cache')#
- classmethod _make(iterable)#
Make a new ExitCode object from a sequence or iterable
- _replace(**kwds)#
Return a new ExitCode object replacing specified fields with new values
- format(**kwargs: str) aiida.engine.processes.exit_code.ExitCode [source]#
Create a clone of this exit code where the template message is replaced by the keyword arguments.
- Parameters
kwargs – replacement parameters for the template message
- class aiida.engine.processes.exit_code.ExitCodesNamespace(dictionary=None)[source]#
Bases:
aiida.common.extendeddicts.AttributeDict
A namespace of ExitCode instances that can be accessed through getattr as well as getitem.
Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.
- __annotations__ = {}#
- __call__(identifier: Union[int, str]) aiida.engine.processes.exit_code.ExitCode [source]#
Return a specific exit code identified by either its exit status or label.
- Parameters
identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label
- Returns
an ExitCode instance
- Raises
ValueError – if no exit code with the given label is defined for this process
- __module__ = 'aiida.engine.processes.exit_code'#
Class and decorators to generate processes out of simple python functions.
- class aiida.engine.processes.functions.FunctionProcess(*args: Any, **kwargs: Any)[source]#
Bases:
aiida.engine.processes.process.Process
Function process class used for turning functions into a Process
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_func_args': 't.Sequence[str]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]', '_varargs': 'str | None'}#
- __init__(*args, **kwargs) None [source]#
Process constructor.
- Parameters
inputs – process inputs
logger – aiida logger
runner – process runner
parent_pid – id of parent process
enable_persistence – whether to persist this process
- __module__ = 'aiida.engine.processes.functions'#
- _abc_impl = <_abc._abc_data object>#
- static _func(*_args, **_kwargs) dict [source]#
This is used internally to store the actual function that is being wrapped and will be replaced by the build method.
- _func_args: t.Sequence[str] = ()#
- classmethod args_to_dict(*args: Any) dict[str, Any] [source]#
Create an input dictionary (of form label -> value) from supplied args.
- Parameters
args – The values to use for the dictionary
- Returns
A label -> value dictionary
- static build(func: aiida.engine.processes.functions.FunctionType, node_class: Type[aiida.orm.nodes.process.process.ProcessNode]) Type[aiida.engine.processes.functions.FunctionProcess] [source]#
Build a Process from the given function.
All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.
- Parameters
func – The function to build a process from
node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin
FunctionCalculationMixin
.
- Returns
A Process class that represents the function
- classmethod create_inputs(*args: Any, **kwargs: Any) dict[str, Any] [source]#
Create the input args for the FunctionProcess.
- classmethod get_or_create_db_record() aiida.orm.nodes.process.process.ProcessNode [source]#
Create a process node that represents what happened in this process.
- Returns
A process node
- property process_class: Callable[[...], Any]#
Return the class that represents this Process, for the FunctionProcess this is the function itself.
For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.
- Returns
A Process class that represents the function
- class aiida.engine.processes.functions.ProcessFunctionType(*args, **kwargs)[source]#
Bases:
Protocol
,Generic
[aiida.engine.processes.functions.P
,aiida.engine.processes.functions.R_co
,aiida.engine.processes.functions.N
]Protocol for a decorated process function.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}#
- __call__(*args: typing.~P, **kwargs: typing.~P) aiida.engine.processes.functions.R_co [source]#
Call self as a function.
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.functions', '__annotations__': {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}, '__doc__': 'Protocol for a decorated process function.', '__call__': <function ProcessFunctionType.__call__>, 'run': <function ProcessFunctionType.run>, 'run_get_pk': <function ProcessFunctionType.run_get_pk>, 'run_get_node': <function ProcessFunctionType.run_get_node>, '__orig_bases__': (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N]), '__dict__': <attribute '__dict__' of 'ProcessFunctionType' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessFunctionType' objects>, '__parameters__': (~P, +R_co, ~N), '_is_protocol': True, '__subclasshook__': <function Protocol.__init_subclass__.<locals>._proto_hook>, '__init__': <function _no_init_or_replace_init>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>})#
- __init__(*args, **kwargs)#
- __module__ = 'aiida.engine.processes.functions'#
- __orig_bases__ = (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N])#
- __parameters__ = (~P, +R_co, ~N)#
- __subclasshook__()#
Abstract classes can override this to customize issubclass().
This is invoked early on by abc.ABCMeta.__subclasscheck__(). It should return True, False or NotImplemented. If it returns NotImplemented, the normal algorithm is used. Otherwise, it overrides the normal algorithm (and the outcome is cached).
- __weakref__#
list of weak references to the object (if defined)
- _abc_impl = <_abc._abc_data object>#
- _is_protocol = True#
- node_class: Type[aiida.engine.processes.functions.N]#
- process_class: Type[aiida.engine.processes.process.Process]#
- recreate_from: Callable[[aiida.engine.processes.functions.N], aiida.engine.processes.process.Process]#
- spec: Callable[[], aiida.engine.processes.process_spec.ProcessSpec]#
- aiida.engine.processes.functions.calcfunction(function: Callable[[P], aiida.engine.processes.functions.R_co]) aiida.engine.processes.functions.ProcessFunctionType[P, aiida.engine.processes.functions.R_co, aiida.orm.nodes.process.calculation.calcfunction.CalcFunctionNode] [source]#
A decorator to turn a standard python function into a calcfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the calcfunction >>> @calcfunction >>> def sum(a, b): >>> return a + b >>> # Run it with some input >>> r = sum(Int(4), Int(5)) >>> print(r) 9 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- Parameters
function – The function to decorate.
- Returns
The decorated function.
- aiida.engine.processes.functions.get_stack_size(size: int = 2) int [source]#
Return the stack size for the caller’s frame.
This solution is taken from https://stackoverflow.com/questions/34115298/ as a more performant alternative to the naive
len(inspect.stack())` solution. This implementation is about three orders of magnitude faster compared to the naive solution and it scales especially well for larger stacks, which will be usually the case for the usage of ``aiida-core
. However, it does use the internal_getframe
of thesys
standard library. It this ever were to stop working, simply switch to usinglen(inspect.stack())
.- Parameters
size – Hint for the expected stack size.
- Returns
The stack size for caller’s frame.
- aiida.engine.processes.functions.infer_valid_type_from_type_annotation(annotation: Any) tuple[Any, ...] [source]#
Infer the value for the
valid_type
of an input port from the given function argument annotation.- Parameters
annotation – The annotation of a function argument as returned by
inspect.get_annotation
.- Returns
A tuple of valid types. If no valid types were defined or they could not be successfully parsed, an empty tuple is returned.
- aiida.engine.processes.functions.process_function(node_class: Type[aiida.orm.nodes.process.process.ProcessNode]) Callable[[aiida.engine.processes.functions.FunctionType], aiida.engine.processes.functions.FunctionType] [source]#
The base function decorator to create a FunctionProcess out of a normal python function.
- Parameters
node_class – the ORM class to be used as the Node record for the FunctionProcess
- aiida.engine.processes.functions.workfunction(function: Callable[[P], aiida.engine.processes.functions.R_co]) aiida.engine.processes.functions.ProcessFunctionType[P, aiida.engine.processes.functions.R_co, aiida.orm.nodes.process.workflow.workfunction.WorkFunctionNode] [source]#
A decorator to turn a standard python function into a workfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the workfunction >>> @workfunction >>> def select(a, b): >>> return a >>> # Run it with some input >>> r = select(Int(4), Int(5)) >>> print(r) 4 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- Parameters
function – The function to decorate.
- Returns
The decorated function.
Futures that can poll or receive broadcasted messages while waiting for a task to be completed.
- class aiida.engine.processes.futures.ProcessFuture(pk: int, loop: Optional[asyncio.events.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.communications.Communicator] = None)[source]#
Bases:
_asyncio.Future
Future that waits for a process to complete using both polling and listening for broadcast events if possible.
- __init__(pk: int, loop: Optional[asyncio.events.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.communications.Communicator] = None)[source]#
Construct a future for a process node being finished.
If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.
- Parameters
pk – process pk
loop – An event loop
poll_interval – optional polling interval, if None, polling is not activated.
communicator – optional communicator, if None, will not subscribe to broadcasts.
- __module__ = 'aiida.engine.processes.futures'#
- _filtered = None#
AiiDA specific implementation of plumpy Ports and PortNamespaces for the ProcessSpec.
- class aiida.engine.processes.ports.CalcJobOutputPort(*args, **kwargs)[source]#
Bases:
plumpy.ports.OutputPort
Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.
- __module__ = 'aiida.engine.processes.ports'#
- class aiida.engine.processes.ports.InputPort(*args, **kwargs)[source]#
Bases:
aiida.engine.processes.ports.WithMetadata
,aiida.engine.processes.ports.WithSerialize
,aiida.engine.processes.ports.WithNonDb
,plumpy.ports.InputPort
Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.
The mixins have to go before the main port class in the superclass order to make sure they have the chance to process their specific keywords.
- __annotations__ = {}#
- __init__(*args, **kwargs) None [source]#
Override the constructor to check the type of the default if set and warn if not immutable.
- __module__ = 'aiida.engine.processes.ports'#
- class aiida.engine.processes.ports.PortNamespace(*args, **kwargs)[source]#
Bases:
aiida.engine.processes.ports.WithMetadata
,aiida.engine.processes.ports.WithNonDb
,plumpy.ports.PortNamespace
Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'_explicitly_set': 'bool', '_is_metadata': 'bool', '_non_db': 'bool', '_non_db_explicitly_set': 'bool', '_ports': "Dict[str, Union[Port, 'PortNamespace']]"}#
- __module__ = 'aiida.engine.processes.ports'#
- __setitem__(key: str, port: plumpy.ports.Port) None [source]#
Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.
The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.
Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.
- _abc_impl = <_abc._abc_data object>#
- serialize(mapping: Optional[Dict[str, Any]], breadcrumbs: Sequence[str] = ()) Optional[Dict[str, Any]] [source]#
Serialize the given mapping onto this Portnamespace.
It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.
- Parameters
mapping – a mapping of values to be serialized
breadcrumbs – a tuple with the namespaces of parent namespaces
- Returns
the serialized mapping
- static validate_port_name(port_name: str) None [source]#
Validate the given port name.
Valid port names adhere to the following restrictions:
Is a valid link label (see below)
Does not contain two or more consecutive underscores
Valid link labels adhere to the following restrictions:
Has to be a valid python identifier
Can only contain alphanumeric characters and underscores
Can not start or end with an underscore
- Parameters
port_name – the proposed name of the port to be added
- Raises
TypeError – if the port name is not a string type
ValueError – if the port name is invalid
- class aiida.engine.processes.ports.WithMetadata(*args, **kwargs)[source]#
Bases:
object
A mixin that allows an input port to be marked as metadata through the keyword
is_metadata
.A metadata input differs from a normal input as in that it is not linked to the
ProcessNode
as aData
node but rather is stored on theProcessNode
itself (as an attribute, for example).- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that allows an input port to be marked as metadata through the keyword ``is_metadata``.\n\n A metadata input differs from a normal input as in that it is not linked to the ``ProcessNode`` as a ``Data`` node\n but rather is stored on the ``ProcessNode`` itself (as an attribute, for example).\n ', '__init__': <function WithMetadata.__init__>, 'is_metadata_explicitly_set': <property object>, 'is_metadata': <property object>, '__dict__': <attribute '__dict__' of 'WithMetadata' objects>, '__weakref__': <attribute '__weakref__' of 'WithMetadata' objects>, '__annotations__': {'_explicitly_set': 'bool', '_is_metadata': 'bool'}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
- class aiida.engine.processes.ports.WithNonDb(*args, **kwargs)[source]#
Bases:
object
A mixin that adds support to a port to flag it should not be stored in the database using the
non_db
flag.- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that adds support to a port to flag it should not be stored in the database using the ``non_db`` flag.', '__init__': <function WithNonDb.__init__>, 'non_db_explicitly_set': <property object>, 'non_db': <property object>, '__dict__': <attribute '__dict__' of 'WithNonDb' objects>, '__weakref__': <attribute '__weakref__' of 'WithNonDb' objects>, '__annotations__': {'_non_db_explicitly_set': 'bool', '_non_db': 'bool'}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
- class aiida.engine.processes.ports.WithSerialize(*args, **kwargs)[source]#
Bases:
object
A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.
- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': '\n A mixin that adds support for a serialization function which is automatically applied on inputs\n that are not AiiDA data types.\n ', '__init__': <function WithSerialize.__init__>, 'serialize': <function WithSerialize.serialize>, '__dict__': <attribute '__dict__' of 'WithSerialize' objects>, '__weakref__': <attribute '__weakref__' of 'WithSerialize' objects>, '__annotations__': {'_serializer': "Callable[[Any], 'Data']"}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
- serialize(value: Any) aiida.orm.nodes.data.data.Data [source]#
Serialize the given value, unless it is
None
, already a Data type, or no serializer function is defined.- Parameters
value – the value to be serialized
- Returns
a serialized version of the value or the unchanged value
The AiiDA process class
- class aiida.engine.processes.process.Process(*args: Any, **kwargs: Any)[source]#
Bases:
plumpy.processes.Process
This class represents an AiiDA process which can be executed and will have full provenance saved in the database.
- class SaveKeys(value)[source]#
Bases:
enum.Enum
Keys used to identify things in the saved instance state bundle.
- __module__ = 'aiida.engine.processes.process'#
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': <class 'str'>, 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]'}#
- __init__(inputs: Optional[Dict[str, Any]] = None, logger: Optional[logging.Logger] = None, runner: Optional[Runner] = None, parent_pid: Optional[int] = None, enable_persistence: bool = True) None [source]#
Process constructor.
- Parameters
inputs – process inputs
logger – aiida logger
runner – process runner
parent_pid – id of parent process
enable_persistence – whether to persist this process
- __module__ = 'aiida.engine.processes.process'#
- _abc_impl = <_abc._abc_data object>#
- _auto_persist: Optional[Set[str]] = {'_creation_time', '_enable_persistence', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
- _build_process_label() str [source]#
Construct the process label that should be set on
ProcessNode
instances for this process class.Note
By default this returns the name of the process class itself. It can be overridden by
Process
subclasses to provide a more specific label.- Returns
The process label to use for
ProcessNode
instances.
- _create_and_setup_db_record() Union[int, uuid.UUID] [source]#
Create and setup the database record for this process
- Returns
the uuid or pk of the process
- _filter_serializable_metadata(port: Union[None, aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any) Optional[Any] [source]#
Return the inputs that correspond to ports with
is_metadata=True
and that are JSON serializable.The function is called recursively for any port namespaces.
- Parameters
port – An
InputPort
orPortNamespace
. If anInputPort
that specifiesis_metadata=True
theport_value
is returned. For aPortNamespace
this method is called recursively for the keys within the namespace and the resulting dictionary is returned, omittingNone
values. If eitherport
orport_value
isNone
,None
is returned.- Returns
The
port_value
where all inputs that do no correspond to a metadata port or are not JSON serializable, have been filtered out.
- _flat_inputs() Dict[str, Any] [source]#
Return a flattened version of the parsed inputs dictionary.
The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.
- Returns
flat dictionary of parsed inputs
- _flat_outputs() Dict[str, Any] [source]#
Return a flattened version of the registered outputs dictionary.
The eventual keys will be a concatenation of the nested keys.
- Returns
flat dictionary of parsed outputs
- _flatten_inputs(port: Union[None, aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]] [source]#
Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable
- Parameters
port – port against which to map the port value, can be InputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- Returns
flat list of inputs
- _flatten_outputs(port: Union[None, plumpy.ports.OutputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]] [source]#
Function that will recursively flatten the outputs dictionary.
- Parameters
port – port against which to map the port value, can be OutputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- Returns
flat list of outputs
- static _get_namespace_list(namespace: Optional[str] = None, agglomerate: bool = True) List[Optional[str]] [source]#
Get the list of namespaces in a given namespace.
- Parameters
namespace – name space
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched.
- Returns
namespace list
- _node_class#
- _save_checkpoint() None [source]#
Save the current state in a chechpoint if persistence is enabled and the process state is not terminal
If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.
- _setup_db_record() None [source]#
Create the database record for this process and the links with respect to its inputs
This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.
In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.
- _setup_inputs() None [source]#
Create the links between the input nodes and the ProcessNode that represents this process.
- _spec_class#
- classmethod build_process_type() str [source]#
The process type.
- Returns
string of the process type
Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659
- decode_input_args(encoded: str) Dict[str, Any] [source]#
Decode saved input arguments as they came from the saved instance state Bundle
- Parameters
encoded – encoded (serialized) inputs
- Returns
The decoded input args
- classmethod define(spec: aiida.engine.processes.process_spec.ProcessSpec) None [source]#
Define the specification of the process, including its inputs, outputs and known exit codes.
A metadata input namespace is defined, with optional ports that are not stored in the database.
- encode_input_args(inputs: Dict[str, Any]) str [source]#
Encode input arguments such that they may be saved in a Bundle
- Parameters
inputs – A mapping of the inputs as passed to the process
- Returns
The encoded (serialized) inputs
- exit_codes = {'ERROR_INVALID_OUTPUT': ExitCode(status=10, message='The process returned an invalid output.', invalidates_cache=True), 'ERROR_LEGACY_FAILURE': ExitCode(status=2, message='The process failed with legacy failure mode.', invalidates_cache=True), 'ERROR_MISSING_OUTPUT': ExitCode(status=11, message='The process did not register a required output.', invalidates_cache=True), 'ERROR_UNSPECIFIED': ExitCode(status=1, message='The process has failed with an unspecified error.', invalidates_cache=True)}#
- exposed_inputs(process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict [source]#
Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.
- Parameters
process_class – Process class whose inputs to try and retrieve
namespace – PortNamespace in which to look for the inputs
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for inputs. Inputs in lower-lying namespaces take precedence.
- Returns
exposed inputs
- exposed_outputs(node: aiida.orm.nodes.process.process.ProcessNode, process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict [source]#
Return the outputs which were exposed from the
process_class
and emitted by the specificnode
- Parameters
node – process node whose outputs to try and retrieve
namespace – Namespace in which to search for exposed outputs.
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for outputs. Outputs in lower-lying namespaces take precedence.
- Returns
exposed outputs
- classmethod get_builder() aiida.engine.processes.builder.ProcessBuilder [source]#
- classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int] [source]#
Return the exit status (integers) for the given exit code labels.
- Parameters
exit_code_labels – a list of strings that reference exit code labels of this process class
- Returns
list of exit status integers that correspond to the given exit code labels
- Raises
AttributeError – if at least one of the labels does not correspond to an existing exit code
- classmethod get_or_create_db_record() aiida.orm.nodes.process.process.ProcessNode [source]#
Create a process node that represents what happened in this process.
- Returns
A process node
- get_parent_calc() Optional[aiida.orm.nodes.process.process.ProcessNode] [source]#
Get the parent process node
- Returns
the parent process node if there is one
- get_provenance_inputs_iterator() Iterator[Tuple[str, Union[aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace]]] [source]#
Get provenance input iterator.
- Return type
filter
- property inputs: plumpy.utils.AttributesFrozendict#
Return the inputs attribute dictionary or an empty one.
This overrides the property of the base class because that can also return
None
. This override ensures calling functions that they will always get an instance ofAttributesFrozenDict
.
- classmethod is_valid_cache(node: aiida.orm.nodes.process.process.ProcessNode) bool [source]#
Check if the given node can be cached from.
Overriding this method allows
Process
sub-classes to modify when corresponding process nodes are considered as a cache.Warning
When overriding this method, make sure to return
False
at least in all cases whensuper()._node.base.caching.is_valid_cache(node)
returnsFalse
. Otherwise, theinvalidates_cache
keyword on exit codes may have no effect.
- kill(msg: Optional[str] = None) Union[bool, _asyncio.Future] [source]#
Kill the process and all the children calculations it called
- Parameters
msg – message
- load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) None [source]#
Load instance state.
- Parameters
saved_state – saved instance state
load_context –
- property metadata: aiida.common.extendeddicts.AttributeDict#
Return the metadata that were specified when this process instance was launched.
- Returns
metadata dictionary
- property node: aiida.orm.nodes.process.process.ProcessNode#
Return the ProcessNode used by this process to represent itself in the database.
- Returns
instance of sub class of ProcessNode
- on_entered(from_state: Optional[plumpy.process_states.State]) None [source]#
After entering a new state, save a checkpoint and update the latest process state change timestamp.
- on_except(exc_info: Tuple[Any, Exception, types.TracebackType]) None [source]#
Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute
- Parameters
exc_info – the sys.exc_info() object (type, value, traceback)
- on_finish(result: Optional[Union[int, aiida.engine.processes.exit_code.ExitCode]], successful: bool) None [source]#
Set the finish status on the process node.
- Parameters
result – result of the process
successful – whether execution was successful
- on_output_emitting(output_port: str, value: Any) None [source]#
The process has emitted a value on the given output port.
- Parameters
output_port – The output port name the value was emitted on
value – The value emitted
- on_paused(msg: Optional[str] = None) None [source]#
The Process was paused so set the paused attribute on the process node
- Parameters
msg – message
- on_playing() None [source]#
The Process was unpaused so remove the paused attribute on the process node
- out(output_port: str, value: Optional[Any] = None) None [source]#
Attach output to output port.
The name of the port will be used as the link label.
- Parameters
output_port – name of output port
value – value to put inside output port
- out_many(out_dict: Dict[str, Any]) None [source]#
Attach outputs to multiple output ports.
Keys of the dictionary will be used as output port names, values as outputs.
- Parameters
out_dict (dict) – output dictionary
- report(msg: str, *args, **kwargs) None [source]#
Log a message to the logger, which should get saved to the database through the attached DbLogHandler.
The pk, class name and function name of the caller are prepended to the given message
- Parameters
msg – message to log
args – args to pass to the log call
kwargs – kwargs to pass to the log call
- save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) None [source]#
Save instance state.
See documentation of
plumpy.processes.Process.save_instance_state()
.
- set_status(status: Optional[str]) None [source]#
The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.
- Parameters
status – the status message
- classmethod spec() aiida.engine.processes.process_spec.ProcessSpec [source]#
- spec_metadata = <aiida.engine.processes.ports.PortNamespace object>#
- submit(process: Type[aiida.engine.processes.process.Process], **kwargs) aiida.orm.nodes.process.process.ProcessNode [source]#
Submit process for execution.
- Parameters
process – process
- Returns
the calculation node of the process
- aiida.engine.processes.process.get_query_string_from_process_type_string(process_type_string: str) str [source]#
Take the process type string of a Node and create the queryable type string.
AiiDA specific implementation of plumpy’s ProcessSpec.
- class aiida.engine.processes.process_spec.CalcJobProcessSpec[source]#
Bases:
aiida.engine.processes.process_spec.ProcessSpec
Process spec intended for the CalcJob process class.
- OUTPUT_PORT_TYPE#
- __annotations__ = {}#
- __module__ = 'aiida.engine.processes.process_spec'#
- class aiida.engine.processes.process_spec.ProcessSpec[source]#
Bases:
plumpy.process_spec.ProcessSpec
Default process spec for process classes defined in aiida-core.
This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.
- INPUT_PORT_TYPE#
- PORT_NAMESPACE_TYPE#
- __annotations__ = {'METADATA_KEY': <class 'str'>, 'METADATA_OPTIONS_KEY': <class 'str'>, 'NAME_INPUTS_PORT_NAMESPACE': 'str', 'NAME_OUTPUTS_PORT_NAMESPACE': 'str', '_exposed_inputs': 'EXPOSED_TYPE', '_exposed_outputs': 'EXPOSED_TYPE', '_ports': 'PortNamespace', '_sealed': 'bool'}#
- __module__ = 'aiida.engine.processes.process_spec'#
- exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None [source]#
Add an exit code to the ProcessSpec
- Parameters
status – the exit status integer
label – a label by which the exit code can be addressed
message – a more detailed description of the exit code
invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching
- property exit_codes: aiida.engine.processes.exit_code.ExitCodesNamespace#
Return the namespace of exit codes defined for this ProcessSpec
- Returns
ExitCodesNamespace of ExitCode named tuples
- property inputs: aiida.engine.processes.ports.PortNamespace#
Get the input port namespace of the process specification
- Returns
the input PortNamespace
- property outputs: aiida.engine.processes.ports.PortNamespace#
Get the output port namespace of the process specification
- Returns
the outputs PortNamespace
- property ports: aiida.engine.processes.ports.PortNamespace#
Module with utilities.
- aiida.engine.processes.utils.prune_mapping(value)[source]#
Prune a nested mapping from all mappings that are completely empty.
Note
A nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.
- Parameters
value – A nested mapping of port values.
- Returns
The same mapping but without any nested namespace that is completely empty.