aiida.engine.processes package#
Module for processes and related utilities.
Subpackages#
- aiida.engine.processes.calcjobs package
- Submodules
CalcJob
CalcJob.__abstractmethods__
CalcJob.__annotations__
CalcJob.__init__()
CalcJob.__module__
CalcJob._abc_impl
CalcJob._node_class
CalcJob._perform_dry_run()
CalcJob._perform_import()
CalcJob._setup_inputs()
CalcJob._setup_metadata()
CalcJob._spec_class
CalcJob.define()
CalcJob.get_importer()
CalcJob.get_state_classes()
CalcJob.link_label_retrieved
CalcJob.node
CalcJob.on_terminated()
CalcJob.options
CalcJob.parse()
CalcJob.parse_retrieved_output()
CalcJob.parse_scheduler_output()
CalcJob.prepare_for_submission()
CalcJob.presubmit()
CalcJob.run()
CalcJob.spec_options
CalcJob.terminate()
validate_additional_retrieve_list()
validate_calc_job()
validate_monitors()
validate_parser()
validate_stash_options()
CalcJobImporter
JobManager
JobsList
JobsList.__dict__
JobsList.__init__()
JobsList.__module__
JobsList.__weakref__
JobsList._ensure_updating()
JobsList._get_jobs_from_scheduler()
JobsList._get_jobs_with_scheduler()
JobsList._get_next_update_delay()
JobsList._has_job_state_changed()
JobsList._update_job_info()
JobsList._update_requests_outstanding()
JobsList.get_minimum_update_interval()
JobsList.last_updated
JobsList.logger
JobsList.request_job_info_update()
CalcJobMonitor
CalcJobMonitor.__annotations__
CalcJobMonitor.__dataclass_fields__
CalcJobMonitor.__dataclass_params__
CalcJobMonitor.__dict__
CalcJobMonitor.__eq__()
CalcJobMonitor.__hash__
CalcJobMonitor.__init__()
CalcJobMonitor.__match_args__
CalcJobMonitor.__module__
CalcJobMonitor.__post_init__()
CalcJobMonitor.__repr__()
CalcJobMonitor.__weakref__
CalcJobMonitor.call_timestamp
CalcJobMonitor.disabled
CalcJobMonitor.entry_point
CalcJobMonitor.kwargs
CalcJobMonitor.load_entry_point()
CalcJobMonitor.minimum_poll_interval
CalcJobMonitor.priority
CalcJobMonitor.validate()
CalcJobMonitorAction
CalcJobMonitorResult
CalcJobMonitorResult.__annotations__
CalcJobMonitorResult.__dataclass_fields__
CalcJobMonitorResult.__dataclass_params__
CalcJobMonitorResult.__dict__
CalcJobMonitorResult.__eq__()
CalcJobMonitorResult.__hash__
CalcJobMonitorResult.__init__()
CalcJobMonitorResult.__match_args__
CalcJobMonitorResult.__module__
CalcJobMonitorResult.__post_init__()
CalcJobMonitorResult.__repr__()
CalcJobMonitorResult.__weakref__
CalcJobMonitorResult.action
CalcJobMonitorResult.key
CalcJobMonitorResult.message
CalcJobMonitorResult.outputs
CalcJobMonitorResult.override_exit_code
CalcJobMonitorResult.parse
CalcJobMonitorResult.retrieve
CalcJobMonitorResult.validate()
CalcJobMonitors
PreSubmitException
Waiting
Waiting.__annotations__
Waiting.__init__()
Waiting.__module__
Waiting._auto_persist
Waiting._kill_job()
Waiting._launch_task()
Waiting._monitor_job()
Waiting.execute()
Waiting.interrupt()
Waiting.load_instance_state()
Waiting.monitors
Waiting.parse()
Waiting.process
Waiting.retrieve()
Waiting.stash()
Waiting.submit()
Waiting.update()
Waiting.upload()
task_kill_job()
task_monitor_job()
task_retrieve_job()
task_stash_job()
task_submit_job()
task_update_job()
task_upload_job()
- Submodules
- aiida.engine.processes.workchains package
- Submodules
Awaitable
AwaitableAction
AwaitableTarget
construct_awaitable()
append_()
assign_()
BaseRestartWorkChain
BaseRestartWorkChain.__abstractmethods__
BaseRestartWorkChain.__annotations__
BaseRestartWorkChain.__init__()
BaseRestartWorkChain.__module__
BaseRestartWorkChain._abc_impl
BaseRestartWorkChain._attach_outputs()
BaseRestartWorkChain._considered_handlers_extra
BaseRestartWorkChain._process_class
BaseRestartWorkChain._wrap_bare_dict_inputs()
BaseRestartWorkChain.define()
BaseRestartWorkChain.get_outputs()
BaseRestartWorkChain.get_process_handlers()
BaseRestartWorkChain.get_process_handlers_by_priority()
BaseRestartWorkChain.inspect_process()
BaseRestartWorkChain.is_process_handler()
BaseRestartWorkChain.on_terminated()
BaseRestartWorkChain.process_class
BaseRestartWorkChain.results()
BaseRestartWorkChain.run_process()
BaseRestartWorkChain.setup()
BaseRestartWorkChain.should_run_process()
validate_handler_overrides()
ProcessHandlerReport
ProcessHandlerReport.__annotations__
ProcessHandlerReport.__getnewargs__()
ProcessHandlerReport.__match_args__
ProcessHandlerReport.__module__
ProcessHandlerReport.__new__()
ProcessHandlerReport.__orig_bases__
ProcessHandlerReport.__repr__()
ProcessHandlerReport.__slots__
ProcessHandlerReport._asdict()
ProcessHandlerReport._field_defaults
ProcessHandlerReport._fields
ProcessHandlerReport._make()
ProcessHandlerReport._replace()
ProcessHandlerReport.do_break
ProcessHandlerReport.exit_code
process_handler()
Protect
WorkChain
WorkChain._CONTEXT
WorkChain._STEPPER_STATE
WorkChain.__abstractmethods__
WorkChain.__annotations__
WorkChain.__init__()
WorkChain.__module__
WorkChain._abc_impl
WorkChain._action_awaitables()
WorkChain._auto_persist
WorkChain._do_step()
WorkChain._insert_awaitable()
WorkChain._node_class
WorkChain._on_awaitable_finished()
WorkChain._resolve_awaitable()
WorkChain._resolve_nested_context()
WorkChain._spec_class
WorkChain._store_nodes()
WorkChain._update_process_status()
WorkChain.ctx
WorkChain.load_instance_state()
WorkChain.node
WorkChain.on_exiting()
WorkChain.on_run()
WorkChain.on_wait()
WorkChain.run()
WorkChain.save_instance_state()
WorkChain.spec()
WorkChain.to_context()
WorkChainSpec
- Submodules
Submodules#
Convenience classes to help building the input dictionaries for Processes.
- class aiida.engine.processes.builder.PrettyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[源代码]#
基类:
JSONEncoder
JSON encoder for returning a pretty representation of an AiiDA
ProcessBuilder
.- __module__ = 'aiida.engine.processes.builder'#
- default(o)[源代码]#
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
- class aiida.engine.processes.builder.ProcessBuilder(process_class: Type[Process])[源代码]#
-
A process builder that helps setting up the inputs for creating a new process.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {}#
- __init__(process_class: Type[Process])[源代码]#
Construct a ProcessBuilder instance for the given Process class.
- 参数:
process_class – the Process subclass
- __module__ = 'aiida.engine.processes.builder'#
- _abc_impl = <_abc._abc_data object>#
- class aiida.engine.processes.builder.ProcessBuilderNamespace(port_namespace: PortNamespace)[源代码]#
-
Input namespace for the ProcessBuilder.
Dynamically generates the getters and setters for the input ports of a given PortNamespace
- __abstractmethods__ = frozenset({})#
- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.builder', '__doc__': 'Input namespace for the `ProcessBuilder`.\n\n Dynamically generates the getters and setters for the input ports of a given PortNamespace\n ', '__init__': <function ProcessBuilderNamespace.__init__>, '__setattr__': <function ProcessBuilderNamespace.__setattr__>, '__repr__': <function ProcessBuilderNamespace.__repr__>, '__dir__': <function ProcessBuilderNamespace.__dir__>, '__iter__': <function ProcessBuilderNamespace.__iter__>, '__len__': <function ProcessBuilderNamespace.__len__>, '__getitem__': <function ProcessBuilderNamespace.__getitem__>, '__setitem__': <function ProcessBuilderNamespace.__setitem__>, '__delitem__': <function ProcessBuilderNamespace.__delitem__>, '__delattr__': <function ProcessBuilderNamespace.__delattr__>, '_recursive_merge': <function ProcessBuilderNamespace._recursive_merge>, '_merge': <function ProcessBuilderNamespace._merge>, '_update': <function ProcessBuilderNamespace._update>, '_inputs': <function ProcessBuilderNamespace._inputs>, '__dict__': <attribute '__dict__' of 'ProcessBuilderNamespace' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessBuilderNamespace' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>, '__annotations__': {}})#
- __init__(port_namespace: PortNamespace) None [源代码]#
Dynamically construct the get and set properties for the ports of the given port namespace.
For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.
- 参数:
port_namespace – the inputs PortNamespace for which to construct the builder
- __module__ = 'aiida.engine.processes.builder'#
- __setattr__(attr: str, value: Any) None [源代码]#
Assign the given value to the port with key attr.
备注
Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec
- __weakref__#
list of weak references to the object (if defined)
- _abc_impl = <_abc._abc_data object>#
- _inputs(prune: bool = False) dict [源代码]#
Return the entire mapping of inputs specified for this builder.
- 参数:
prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever
- 返回:
mapping of inputs ports and their input values.
- _merge(*args, **kwds)[源代码]#
Merge the content of a dictionary or keyword arguments in .
备注
This method differs in behavior from
_update
in that_merge
will recursively update the existing dictionary with the one that is specified in the arguments. The_update
method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.The method is prefixed with an underscore in order to not reserve the name for a potential port.
- 参数:
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
- _recursive_merge(dictionary, key, value)[源代码]#
Recursively merge the contents of
dictionary
setting itskey
tovalue
.
- _update(*args, **kwds)[源代码]#
Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.
The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.
- 参数:
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
Functions to control and interact with running processes.
- exception aiida.engine.processes.control.ProcessTimeoutException[源代码]#
-
Raised when action to communicate with a process times out.
- __annotations__ = {}#
- __module__ = 'aiida.engine.processes.control'#
- aiida.engine.processes.control._perform_actions(processes: list[ProcessNode], action: Callable, infinitive: str, present: str, timeout: float | None = None, wait: bool = False, **kwargs: Any) None [源代码]#
Perform an action on a list of processes.
- 参数:
processes – The list of processes to perform the action on.
action – The action to perform.
infinitive – The infinitive of the verb that represents the action.
present – The present tense of the verb that represents the action.
past – The past tense of the verb that represents the action.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.kwargs – Keyword arguments that will be passed to the method
action
.
- 抛出:
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control._resolve_futures(futures: dict[Future, ProcessNode], infinitive: str, present: str, wait: bool = False, timeout: float | None = None) None [源代码]#
Process a mapping of futures representing an action on an active process.
This function will echo the correct information strings based on the outcomes of the futures and the given verb conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a timeout to put a maximum wait time on the actions.
- 参数:
futures – The map of action futures and the corresponding processes.
infinitive – The infinitive form of the action verb.
present – The present tense form of the action verb.
wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.
- aiida.engine.processes.control.get_active_processes(paused: bool = False, project: str | list[str] = '*') list[ProcessNode] | list[Any] [源代码]#
Return all active processes, i.e., those with a process state of created, waiting or running.
- 参数:
paused – Boolean, if True, filter for processes that are paused.
project – Single or list of properties to project. By default projects the entire node.
- 返回:
A list of process nodes of active processes.
- aiida.engine.processes.control.get_process_tasks(profile: Profile, communicator: kiwipy.rmq.RmqCommunicator) list[int] [源代码]#
Return the list of process pks that have a process task in the RabbitMQ process queue.
- 返回:
A list of process pks that have a corresponding process task with RabbitMQ.
- aiida.engine.processes.control.iterate_process_tasks(profile: Profile, communicator: kiwipy.rmq.RmqCommunicator) collections.abc.Iterator[kiwipy.rmq.RmqIncomingTask] [源代码]#
Return the list of process pks that have a process task in the RabbitMQ process queue.
- 返回:
A list of process pks that have a corresponding process task with RabbitMQ.
- aiida.engine.processes.control.kill_processes(processes: list[ProcessNode] | None = None, *, message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [源代码]#
Kill running processes.
备注
Requires the daemon to be running, or processes will be unresponsive.
- 参数:
processes – List of processes to play.
all_entries – Kill all active processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- 抛出:
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.pause_processes(processes: list[ProcessNode] | None = None, *, message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [源代码]#
Pause running processes.
备注
Requires the daemon to be running, or processes will be unresponsive.
- 参数:
processes – List of processes to play.
all_entries – Pause all playing processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- 抛出:
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.play_processes(processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False) None [源代码]#
Play (unpause) paused processes.
备注
Requires the daemon to be running, or processes will be unresponsive.
- 参数:
processes – List of processes to play.
all_entries – Play all paused processes.
timeout – Raise a
ProcessTimeoutException
if the process does not respond within this amount of seconds.wait – Set to
True
to wait for process response, forFalse
the action is fire-and-forget.
- 抛出:
ProcessTimeoutException – If the processes do not respond within the timeout.
- aiida.engine.processes.control.revive_processes(processes: list[ProcessNode], *, wait: bool = False) None [源代码]#
Revive processes that seem stuck and are no longer reachable.
Warning: Use only as a last resort after you’ve gone through the checklist below.
Does
verdi status
indicate that both daemon and RabbitMQ are running properly? If not, restart the daemon withverdi daemon restart --reset
and restart RabbitMQ.Try to play the process through
play_processes
. If aProcessTimeoutException
is raised use this method to attempt to revive it.
Details: When RabbitMQ loses the process task before the process has completed, the process is never picked up by the daemon and will remain “stuck”. This method recreates the task, which can lead to multiple instances of the task being executed and should thus be used with caution.
备注
Requires the daemon to be running.
- 参数:
processes – List of processes to revive.
wait – Set to
True
to wait for a response, forFalse
the action is fire-and-forget.
A namedtuple and namespace for ExitCodes that can be used to exit from Processes.
- class aiida.engine.processes.exit_code.ExitCode(status: int = 0, message: str | None = None, invalidates_cache: bool = False)[源代码]#
基类:
NamedTuple
A simple data class to define an exit code for a
Process
.When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.
- 参数:
status – positive integer exit status, where a non-zero value indicated the process failed, default is 0
message – optional message with more details about the failure mode
invalidates_cache – optional flag, indicating that a process should not be used in caching
- __annotations__ = {'invalidates_cache': <class 'bool'>, 'message': typing.Optional[str], 'status': <class 'int'>}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('status', 'message', 'invalidates_cache')#
- __module__ = 'aiida.engine.processes.exit_code'#
- static __new__(_cls, status: int = 0, message: str | None = None, invalidates_cache: bool = False)#
Create new instance of ExitCode(status, message, invalidates_cache)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {'invalidates_cache': False, 'message': None, 'status': 0}#
- _fields = ('status', 'message', 'invalidates_cache')#
- classmethod _make(iterable)#
Make a new ExitCode object from a sequence or iterable
- _replace(**kwds)#
Return a new ExitCode object replacing specified fields with new values
- class aiida.engine.processes.exit_code.ExitCodesNamespace(dictionary=None)[源代码]#
-
A namespace of ExitCode instances that can be accessed through getattr as well as getitem.
Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.
- __annotations__ = {}#
- __call__(identifier: int | str) ExitCode [源代码]#
Return a specific exit code identified by either its exit status or label.
- 参数:
identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label
- 返回:
an ExitCode instance
- 抛出:
ValueError – if no exit code with the given label is defined for this process
- __module__ = 'aiida.engine.processes.exit_code'#
Class and decorators to generate processes out of simple python functions.
- class aiida.engine.processes.functions.FunctionProcess(*args: Any, **kwargs: Any)[源代码]#
基类:
Process
Function process class used for turning functions into a Process
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_func_args': 't.Sequence[str]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]', '_var_keyword': 'str | None', '_var_positional': 'str | None'}#
- __init__(*args, **kwargs) None [源代码]#
Process constructor.
- 参数:
inputs – process inputs
logger – aiida logger
runner – process runner
parent_pid – id of parent process
enable_persistence – whether to persist this process
- __module__ = 'aiida.engine.processes.functions'#
- _abc_impl = <_abc._abc_data object>#
- static _func(*_args, **_kwargs) dict [源代码]#
This is used internally to store the actual function that is being wrapped and will be replaced by the build method.
- _func_args: t.Sequence[str] = ()#
- static build(func: FunctionType, node_class: Type[ProcessNode]) Type[FunctionProcess] [源代码]#
Build a Process from the given function.
All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.
- 参数:
func – The function to build a process from
node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin
FunctionCalculationMixin
.
- 返回:
A Process class that represents the function
- classmethod create_inputs(*args: Any, **kwargs: Any) dict[str, Any] [源代码]#
Create the input dictionary for the
FunctionProcess
.
- classmethod get_or_create_db_record() ProcessNode [源代码]#
Create a process node that represents what happened in this process.
- 返回:
A process node
- property process_class: Callable[[...], Any]#
Return the class that represents this Process, for the FunctionProcess this is the function itself.
For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.
- 返回:
A Process class that represents the function
- class aiida.engine.processes.functions.ProcessFunctionType(*args, **kwargs)[源代码]#
基类:
Protocol
,Generic
[P
,R_co
,N
]Protocol for a decorated process function.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.functions', '__annotations__': {'is_process_function': 'bool', 'node_class': 't.Type[N]', 'process_class': 't.Type[Process]', 'recreate_from': 't.Callable[[N], Process]', 'spec': 't.Callable[[], ProcessSpec]'}, '__doc__': 'Protocol for a decorated process function.', '__call__': <function ProcessFunctionType.__call__>, 'run': <function ProcessFunctionType.run>, 'run_get_pk': <function ProcessFunctionType.run_get_pk>, 'run_get_node': <function ProcessFunctionType.run_get_node>, '__orig_bases__': (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N]), '__dict__': <attribute '__dict__' of 'ProcessFunctionType' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessFunctionType' objects>, '__parameters__': (~P, +R_co, ~N), '_is_protocol': True, '__subclasshook__': <function Protocol.__init_subclass__.<locals>._proto_hook>, '__init__': <function _no_init_or_replace_init>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc._abc_data object>})#
- __init__(*args, **kwargs)#
- __module__ = 'aiida.engine.processes.functions'#
- __orig_bases__ = (<class 'typing.Protocol'>, typing.Generic[~P, +R_co, ~N])#
- __parameters__ = (~P, +R_co, ~N)#
- __subclasshook__()#
Abstract classes can override this to customize issubclass().
This is invoked early on by abc.ABCMeta.__subclasscheck__(). It should return True, False or NotImplemented. If it returns NotImplemented, the normal algorithm is used. Otherwise, it overrides the normal algorithm (and the outcome is cached).
- __weakref__#
list of weak references to the object (if defined)
- _abc_impl = <_abc._abc_data object>#
- _is_protocol = True#
- spec: Callable[[], ProcessSpec]#
- aiida.engine.processes.functions.calcfunction(function: Callable[[P], R_co]) ProcessFunctionType[P, R_co, CalcFunctionNode] [源代码]#
A decorator to turn a standard python function into a calcfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the calcfunction >>> @calcfunction >>> def sum(a, b): >>> return a + b >>> # Run it with some input >>> r = sum(Int(4), Int(5)) >>> print(r) 9 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- 参数:
function – The function to decorate.
- 返回:
The decorated function.
- aiida.engine.processes.functions.get_stack_size(size: int = 2) int [源代码]#
Return the stack size for the caller’s frame.
This solution is taken from https://stackoverflow.com/questions/34115298/ as a more performant alternative to the naive
len(inspect.stack())` solution. This implementation is about three orders of magnitude faster compared to the naive solution and it scales especially well for larger stacks, which will be usually the case for the usage of ``aiida-core
. However, it does use the internal_getframe
of thesys
standard library. It this ever were to stop working, simply switch to usinglen(inspect.stack())
.- 参数:
size – Hint for the expected stack size.
- 返回:
The stack size for caller’s frame.
- aiida.engine.processes.functions.infer_valid_type_from_type_annotation(annotation: Any) tuple[Any, ...] [源代码]#
Infer the value for the
valid_type
of an input port from the given function argument annotation.- 参数:
annotation – The annotation of a function argument as returned by
inspect.get_annotation
.- 返回:
A tuple of valid types. If no valid types were defined or they could not be successfully parsed, an empty tuple is returned.
- aiida.engine.processes.functions.process_function(node_class: Type[ProcessNode]) Callable[[FunctionType], FunctionType] [源代码]#
The base function decorator to create a FunctionProcess out of a normal python function.
- 参数:
node_class – the ORM class to be used as the Node record for the FunctionProcess
- aiida.engine.processes.functions.workfunction(function: Callable[[P], R_co]) ProcessFunctionType[P, R_co, WorkFunctionNode] [源代码]#
A decorator to turn a standard python function into a workfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the workfunction >>> @workfunction >>> def select(a, b): >>> return a >>> # Run it with some input >>> r = select(Int(4), Int(5)) >>> print(r) 4 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- 参数:
function – The function to decorate.
- 返回:
The decorated function.
Futures that can poll or receive broadcasted messages while waiting for a task to be completed.
- class aiida.engine.processes.futures.ProcessFuture(pk: int, loop: AbstractEventLoop | None = None, poll_interval: None | int | float = None, communicator: Communicator | None = None)[源代码]#
基类:
Future
Future that waits for a process to complete using both polling and listening for broadcast events if possible.
- __init__(pk: int, loop: AbstractEventLoop | None = None, poll_interval: None | int | float = None, communicator: Communicator | None = None)[源代码]#
Construct a future for a process node being finished.
If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.
- 参数:
pk – process pk
loop – An event loop
poll_interval – optional polling interval, if None, polling is not activated.
communicator – optional communicator, if None, will not subscribe to broadcasts.
- __module__ = 'aiida.engine.processes.futures'#
- _filtered = None#
AiiDA specific implementation of plumpy Ports and PortNamespaces for the ProcessSpec.
- class aiida.engine.processes.ports.CalcJobOutputPort(*args, **kwargs)[源代码]#
基类:
OutputPort
Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.
- __module__ = 'aiida.engine.processes.ports'#
- class aiida.engine.processes.ports.InputPort(*args, **kwargs)[源代码]#
基类:
WithMetadata
,WithSerialize
,WithNonDb
,InputPort
Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.
The mixins have to go before the main port class in the superclass order to make sure they have the chance to process their specific keywords.
- __annotations__ = {}#
- __init__(*args, **kwargs) None [源代码]#
Override the constructor to check the type of the default if set and warn if not immutable.
- __module__ = 'aiida.engine.processes.ports'#
- class aiida.engine.processes.ports.PortNamespace(*args, **kwargs)[源代码]#
基类:
WithMetadata
,WithNonDb
,PortNamespace
Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'_explicitly_set': 'bool', '_is_metadata': 'bool', '_non_db': 'bool', '_non_db_explicitly_set': 'bool', '_ports': "Dict[str, Union[Port, 'PortNamespace']]"}#
- __module__ = 'aiida.engine.processes.ports'#
- __setitem__(key: str, port: Port) None [源代码]#
Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.
The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.
Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.
- _abc_impl = <_abc._abc_data object>#
- serialize(mapping: Dict[str, Any] | None, breadcrumbs: Sequence[str] = ()) Dict[str, Any] | None [源代码]#
Serialize the given mapping onto this Portnamespace.
It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.
- 参数:
mapping – a mapping of values to be serialized
breadcrumbs – a tuple with the namespaces of parent namespaces
- 返回:
the serialized mapping
- static validate_port_name(port_name: str) None [源代码]#
Validate the given port name.
Valid port names adhere to the following restrictions:
Is a valid link label (see below)
Does not contain two or more consecutive underscores
Valid link labels adhere to the following restrictions:
Has to be a valid python identifier
Can only contain alphanumeric characters and underscores
Can not start or end with an underscore
- 参数:
port_name – the proposed name of the port to be added
- 抛出:
TypeError – if the port name is not a string type
ValueError – if the port name is invalid
- class aiida.engine.processes.ports.WithMetadata(*args, **kwargs)[源代码]#
基类:
object
A mixin that allows an input port to be marked as metadata through the keyword
is_metadata
.A metadata input differs from a normal input as in that it is not linked to the
ProcessNode
as aData
node but rather is stored on theProcessNode
itself (as an attribute, for example).- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that allows an input port to be marked as metadata through the keyword ``is_metadata``.\n\n A metadata input differs from a normal input as in that it is not linked to the ``ProcessNode`` as a ``Data`` node\n but rather is stored on the ``ProcessNode`` itself (as an attribute, for example).\n ', '__init__': <function WithMetadata.__init__>, 'is_metadata_explicitly_set': <property object>, 'is_metadata': <property object>, '__dict__': <attribute '__dict__' of 'WithMetadata' objects>, '__weakref__': <attribute '__weakref__' of 'WithMetadata' objects>, '__annotations__': {'_explicitly_set': 'bool', '_is_metadata': 'bool'}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
- class aiida.engine.processes.ports.WithNonDb(*args, **kwargs)[源代码]#
基类:
object
A mixin that adds support to a port to flag it should not be stored in the database using the
non_db
flag.- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that adds support to a port to flag it should not be stored in the database using the ``non_db`` flag.', '__init__': <function WithNonDb.__init__>, 'non_db_explicitly_set': <property object>, 'non_db': <property object>, '__dict__': <attribute '__dict__' of 'WithNonDb' objects>, '__weakref__': <attribute '__weakref__' of 'WithNonDb' objects>, '__annotations__': {'_non_db_explicitly_set': 'bool', '_non_db': 'bool'}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
- class aiida.engine.processes.ports.WithSerialize(*args, **kwargs)[源代码]#
基类:
object
A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.
- __annotations__ = {}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.processes.ports', '__doc__': 'A mixin that adds support for a serialization function which is automatically applied on inputs\n that are not AiiDA data types.\n ', '__init__': <function WithSerialize.__init__>, 'serialize': <function WithSerialize.serialize>, '__dict__': <attribute '__dict__' of 'WithSerialize' objects>, '__weakref__': <attribute '__weakref__' of 'WithSerialize' objects>, '__annotations__': {'_serializer': "Callable[[Any], 'Data']"}})#
- __module__ = 'aiida.engine.processes.ports'#
- __weakref__#
list of weak references to the object (if defined)
The AiiDA process class
- class aiida.engine.processes.process.Process(*args: Any, **kwargs: Any)[源代码]#
基类:
Process
This class represents an AiiDA process which can be executed and will have full provenance saved in the database.
- class SaveKeys(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]#
基类:
Enum
Keys used to identify things in the saved instance state bundle.
- __module__ = 'aiida.engine.processes.process'#
- __abstractmethods__ = frozenset({})#
- __annotations__ = {'CLASS_NAME': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]'}#
- __init__(inputs: Dict[str, Any] | None = None, logger: logging.Logger | None = None, runner: 'Runner' | None = None, parent_pid: int | None = None, enable_persistence: bool = True) None [源代码]#
Process constructor.
- 参数:
inputs – process inputs
logger – aiida logger
runner – process runner
parent_pid – id of parent process
enable_persistence – whether to persist this process
- __module__ = 'aiida.engine.processes.process'#
- _abc_impl = <_abc._abc_data object>#
- _auto_persist: Set[str] | None = {'_creation_time', '_enable_persistence', '_event_helper', '_future', '_parent_pid', '_paused', '_pid', '_pre_paused_status', '_status'}#
- _build_process_label() str [源代码]#
Construct the process label that should be set on
ProcessNode
instances for this process class.备注
By default this returns the name of the process class itself. It can be overridden by
Process
subclasses to provide a more specific label.- 返回:
The process label to use for
ProcessNode
instances.
- _create_and_setup_db_record() int | UUID [源代码]#
Create and setup the database record for this process
- 返回:
the uuid or pk of the process
- _filter_serializable_metadata(port: None | InputPort | PortNamespace, port_value: Any) Any | None [源代码]#
Return the inputs that correspond to ports with
is_metadata=True
and that are JSON serializable.The function is called recursively for any port namespaces.
- 参数:
port – An
InputPort
orPortNamespace
. If anInputPort
that specifiesis_metadata=True
theport_value
is returned. For aPortNamespace
this method is called recursively for the keys within the namespace and the resulting dictionary is returned, omittingNone
values. If eitherport
orport_value
isNone
,None
is returned.- 返回:
The
port_value
where all inputs that do no correspond to a metadata port or are not JSON serializable, have been filtered out.
- _flat_inputs() Dict[str, Any] [源代码]#
Return a flattened version of the parsed inputs dictionary.
The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.
- 返回:
flat dictionary of parsed inputs
- _flat_outputs() Dict[str, Any] [源代码]#
Return a flattened version of the registered outputs dictionary.
The eventual keys will be a concatenation of the nested keys.
- 返回:
flat dictionary of parsed outputs
- _flatten_inputs(port: None | InputPort | PortNamespace, port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]] [源代码]#
Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable
- 参数:
port – port against which to map the port value, can be InputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- 返回:
flat list of inputs
- _flatten_outputs(port: None | OutputPort | PortNamespace, port_value: Any, parent_name: str = '', separator: str = '__') List[Tuple[str, Any]] [源代码]#
Function that will recursively flatten the outputs dictionary.
- 参数:
port – port against which to map the port value, can be OutputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- 返回:
flat list of outputs
- static _get_namespace_list(namespace: str | None = None, agglomerate: bool = True) List[str | None] [源代码]#
Get the list of namespaces in a given namespace.
- 参数:
namespace – name space
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched.
- 返回:
namespace list
- _node_class#
ProcessNode
的别名
- _save_checkpoint() None [源代码]#
Save the current state in a chechpoint if persistence is enabled and the process state is not terminal
If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.
- _setup_db_record() None [源代码]#
Create the database record for this process and the links with respect to its inputs
This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.
In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.
- _setup_inputs() None [源代码]#
Create the links between the input nodes and the ProcessNode that represents this process.
- _spec_class#
ProcessSpec
的别名
- classmethod build_process_type() str [源代码]#
The process type.
- 返回:
string of the process type
Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659
- decode_input_args(encoded: str) Dict[str, Any] [源代码]#
Decode saved input arguments as they came from the saved instance state Bundle
- 参数:
encoded – encoded (serialized) inputs
- 返回:
The decoded input args
- classmethod define(spec: ProcessSpec) None [源代码]#
Define the specification of the process, including its inputs, outputs and known exit codes.
A metadata input namespace is defined, with optional ports that are not stored in the database.
- encode_input_args(inputs: Dict[str, Any]) str [源代码]#
Encode input arguments such that they may be saved in a Bundle
- 参数:
inputs – A mapping of the inputs as passed to the process
- 返回:
The encoded (serialized) inputs
- exit_codes = {'ERROR_INVALID_OUTPUT': (10, 'The process returned an invalid output.', True), 'ERROR_LEGACY_FAILURE': (2, 'The process failed with legacy failure mode.', True), 'ERROR_MISSING_OUTPUT': (11, 'The process did not register a required output.', True), 'ERROR_UNSPECIFIED': (1, 'The process has failed with an unspecified error.', True)}#
- exposed_inputs(process_class: Type[Process], namespace: str | None = None, agglomerate: bool = True) AttributeDict [源代码]#
Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.
- 参数:
process_class – Process class whose inputs to try and retrieve
namespace – PortNamespace in which to look for the inputs
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for inputs. Inputs in lower-lying namespaces take precedence.
- 返回:
exposed inputs
- exposed_outputs(node: ProcessNode, process_class: Type[Process], namespace: str | None = None, agglomerate: bool = True) AttributeDict [源代码]#
Return the outputs which were exposed from the
process_class
and emitted by the specificnode
- 参数:
node – process node whose outputs to try and retrieve
namespace – Namespace in which to search for exposed outputs.
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for outputs. Outputs in lower-lying namespaces take precedence.
- 返回:
exposed outputs
- classmethod get_builder() ProcessBuilder [源代码]#
- classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int] [源代码]#
Return the exit status (integers) for the given exit code labels.
- 参数:
exit_code_labels – a list of strings that reference exit code labels of this process class
- 返回:
list of exit status integers that correspond to the given exit code labels
- 抛出:
AttributeError – if at least one of the labels does not correspond to an existing exit code
- classmethod get_or_create_db_record() ProcessNode [源代码]#
Create a process node that represents what happened in this process.
- 返回:
A process node
- get_parent_calc() ProcessNode | None [源代码]#
Get the parent process node
- 返回:
the parent process node if there is one
- get_provenance_inputs_iterator() Iterator[Tuple[str, InputPort | PortNamespace]] [源代码]#
Get provenance input iterator.
- 返回类型:
filter
- property inputs: AttributesFrozendict#
Return the inputs attribute dictionary or an empty one.
This overrides the property of the base class because that can also return
None
. This override ensures calling functions that they will always get an instance ofAttributesFrozenDict
.
- classmethod is_valid_cache(node: ProcessNode) bool [源代码]#
Check if the given node can be cached from.
Overriding this method allows
Process
sub-classes to modify when corresponding process nodes are considered as a cache.警告
When overriding this method, make sure to return
False
at least in all cases whensuper()._node.base.caching.is_valid_cache(node)
returnsFalse
. Otherwise, theinvalidates_cache
keyword on exit codes may have no effect.
- kill(msg: str | None = None) bool | Future [源代码]#
Kill the process and all the children calculations it called
- 参数:
msg – message
- load_instance_state(saved_state: MutableMapping[str, Any], load_context: LoadSaveContext) None [源代码]#
Load instance state.
- 参数:
saved_state – saved instance state
load_context –
- property metadata: AttributeDict#
Return the metadata that were specified when this process instance was launched.
- 返回:
metadata dictionary
- property node: ProcessNode#
Return the ProcessNode used by this process to represent itself in the database.
- 返回:
instance of sub class of ProcessNode
- on_entered(from_state: State | None) None [源代码]#
After entering a new state, save a checkpoint and update the latest process state change timestamp.
- on_except(exc_info: Tuple[Any, Exception, TracebackType]) None [源代码]#
Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute
- 参数:
exc_info – the sys.exc_info() object (type, value, traceback)
- on_finish(result: int | ExitCode | None, successful: bool) None [源代码]#
Set the finish status on the process node.
- 参数:
result – result of the process
successful – whether execution was successful
- on_output_emitting(output_port: str, value: Any) None [源代码]#
The process has emitted a value on the given output port.
- 参数:
output_port – The output port name the value was emitted on
value – The value emitted
- on_paused(msg: str | None = None) None [源代码]#
The Process was paused so set the paused attribute on the process node
- 参数:
msg – message
- on_playing() None [源代码]#
The Process was unpaused so remove the paused attribute on the process node
- out(output_port: str, value: Any = None) None [源代码]#
Attach output to output port.
The name of the port will be used as the link label.
- 参数:
output_port – name of output port
value – value to put inside output port
- out_many(out_dict: Dict[str, Any]) None [源代码]#
Attach outputs to multiple output ports.
Keys of the dictionary will be used as output port names, values as outputs.
- 参数:
out_dict (dict) – output dictionary
- report(msg: str, *args, **kwargs) None [源代码]#
Log a message to the logger, which should get saved to the database through the attached DbLogHandler.
The pk, class name and function name of the caller are prepended to the given message
- 参数:
msg – message to log
args – args to pass to the log call
kwargs – kwargs to pass to the log call
- save_instance_state(out_state: MutableMapping[str, Any], save_context: LoadSaveContext | None) None [源代码]#
Save instance state.
See documentation of
plumpy.processes.Process.save_instance_state()
.
- set_status(status: str | None) None [源代码]#
The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.
- 参数:
status – the status message
- classmethod spec() ProcessSpec [源代码]#
- spec_metadata = <aiida.engine.processes.ports.PortNamespace object>#
- submit(process: Type[Process], inputs: dict[str, Any] | None = None, **kwargs) ProcessNode [源代码]#
Submit process for execution.
- 参数:
process – The process class.
inputs – The dictionary of process inputs.
- 返回:
The process node.
- aiida.engine.processes.process.get_query_string_from_process_type_string(process_type_string: str) str [源代码]#
Take the process type string of a Node and create the queryable type string.
AiiDA specific implementation of plumpy’s ProcessSpec.
- class aiida.engine.processes.process_spec.CalcJobProcessSpec[源代码]#
基类:
ProcessSpec
Process spec intended for the CalcJob process class.
- OUTPUT_PORT_TYPE#
- __annotations__ = {}#
- __module__ = 'aiida.engine.processes.process_spec'#
- class aiida.engine.processes.process_spec.ProcessSpec[源代码]#
基类:
ProcessSpec
Default process spec for process classes defined in aiida-core.
This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.
- PORT_NAMESPACE_TYPE#
PortNamespace
的别名
- __annotations__ = {'METADATA_KEY': <class 'str'>, 'METADATA_OPTIONS_KEY': <class 'str'>, 'NAME_INPUTS_PORT_NAMESPACE': 'str', 'NAME_OUTPUTS_PORT_NAMESPACE': 'str', '_exposed_inputs': 'EXPOSED_TYPE', '_exposed_outputs': 'EXPOSED_TYPE', '_ports': 'PortNamespace', '_sealed': 'bool'}#
- __module__ = 'aiida.engine.processes.process_spec'#
- exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None [源代码]#
Add an exit code to the ProcessSpec
- 参数:
status – the exit status integer
label – a label by which the exit code can be addressed
message – a more detailed description of the exit code
invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching
- property exit_codes: ExitCodesNamespace#
Return the namespace of exit codes defined for this ProcessSpec
- 返回:
ExitCodesNamespace of ExitCode named tuples
- property inputs: PortNamespace#
Get the input port namespace of the process specification
- 返回:
the input PortNamespace
- property outputs: PortNamespace#
Get the output port namespace of the process specification
- 返回:
the outputs PortNamespace
- property ports: PortNamespace#
Module with utilities.
- aiida.engine.processes.utils.prune_mapping(value)[源代码]#
Prune a nested mapping from all mappings that are completely empty.
备注
A nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.
- 参数:
value – A nested mapping of port values.
- 返回:
The same mapping but without any nested namespace that is completely empty.