aiida.engine package#
Module with all the internals that make up the engine of aiida-core.
Subpackages#
- aiida.engine.daemon package
- Submodules
ControllerProtocol
DaemonClient
DaemonClient._DAEMON_NAME
DaemonClient._ENDPOINT_PROTOCOL
DaemonClient.__dict__
DaemonClient.__init__()
DaemonClient.__module__
DaemonClient.__weakref__
DaemonClient._await_condition()
DaemonClient._check_pid_file()
DaemonClient._clean_potentially_stale_pid_file()
DaemonClient._is_pid_file_stale
DaemonClient._start_daemon()
DaemonClient._verdi_bin
DaemonClient.call_client()
DaemonClient.circus_log_file
DaemonClient.circus_pid_file
DaemonClient.circus_port_file
DaemonClient.circus_socket_endpoints
DaemonClient.circus_socket_file
DaemonClient.cmd_start_daemon()
DaemonClient.cmd_start_daemon_worker
DaemonClient.daemon_log_file
DaemonClient.daemon_name
DaemonClient.daemon_pid_file
DaemonClient.decrease_workers()
DaemonClient.delete_circus_socket_directory()
DaemonClient.get_available_port()
DaemonClient.get_circus_port()
DaemonClient.get_circus_socket_directory()
DaemonClient.get_client()
DaemonClient.get_controller_endpoint()
DaemonClient.get_daemon_info()
DaemonClient.get_daemon_pid()
DaemonClient.get_env()
DaemonClient.get_ipc_endpoint()
DaemonClient.get_numprocesses()
DaemonClient.get_pubsub_endpoint()
DaemonClient.get_stats_endpoint()
DaemonClient.get_status()
DaemonClient.get_tcp_endpoint()
DaemonClient.get_worker_info()
DaemonClient.increase_workers()
DaemonClient.is_daemon_running
DaemonClient.loglevel
DaemonClient.profile
DaemonClient.restart_daemon()
DaemonClient.start_daemon()
DaemonClient.stop_daemon()
DaemonClient.virtualenv
DaemonException
DaemonNotRunningException
DaemonStalePidException
DaemonTimeoutException
get_daemon_client()
_copy_local_files()
_copy_remote_files()
_copy_sandbox_files()
_find_data_node()
kill_calculation()
retrieve_calculation()
retrieve_files_from_list()
stash_calculation()
submit_calculation()
upload_calculation()
shutdown_worker()
start_daemon_worker()
- Submodules
- aiida.engine.processes package
- Subpackages
- Submodules
PrettyEncoder
ProcessBuilder
ProcessBuilderNamespace
ProcessBuilderNamespace.__abstractmethods__
ProcessBuilderNamespace.__annotations__
ProcessBuilderNamespace.__delattr__()
ProcessBuilderNamespace.__delitem__()
ProcessBuilderNamespace.__dict__
ProcessBuilderNamespace.__dir__()
ProcessBuilderNamespace.__getitem__()
ProcessBuilderNamespace.__init__()
ProcessBuilderNamespace.__iter__()
ProcessBuilderNamespace.__len__()
ProcessBuilderNamespace.__module__
ProcessBuilderNamespace.__repr__()
ProcessBuilderNamespace.__setattr__()
ProcessBuilderNamespace.__setitem__()
ProcessBuilderNamespace.__weakref__
ProcessBuilderNamespace._abc_impl
ProcessBuilderNamespace._inputs()
ProcessBuilderNamespace._merge()
ProcessBuilderNamespace._recursive_merge()
ProcessBuilderNamespace._update()
ProcessTimeoutException
_perform_actions()
_resolve_futures()
get_active_processes()
get_process_tasks()
iterate_process_tasks()
kill_processes()
pause_processes()
play_processes()
revive_processes()
ExitCode
ExitCode.__annotations__
ExitCode.__getnewargs__()
ExitCode.__match_args__
ExitCode.__module__
ExitCode.__new__()
ExitCode.__orig_bases__
ExitCode.__repr__()
ExitCode.__slots__
ExitCode._asdict()
ExitCode._field_defaults
ExitCode._fields
ExitCode._make()
ExitCode._replace()
ExitCode.format()
ExitCode.invalidates_cache
ExitCode.message
ExitCode.status
ExitCodesNamespace
FunctionProcess
FunctionProcess.__abstractmethods__
FunctionProcess.__annotations__
FunctionProcess.__init__()
FunctionProcess.__module__
FunctionProcess._abc_impl
FunctionProcess._func()
FunctionProcess._func_args
FunctionProcess._setup_db_record()
FunctionProcess._var_keyword
FunctionProcess._var_positional
FunctionProcess.build()
FunctionProcess.create_inputs()
FunctionProcess.execute()
FunctionProcess.get_or_create_db_record()
FunctionProcess.process_class
FunctionProcess.run()
FunctionProcess.validate_inputs()
ProcessFunctionType
ProcessFunctionType.__abstractmethods__
ProcessFunctionType.__annotations__
ProcessFunctionType.__call__()
ProcessFunctionType.__dict__
ProcessFunctionType.__init__()
ProcessFunctionType.__module__
ProcessFunctionType.__orig_bases__
ProcessFunctionType.__parameters__
ProcessFunctionType.__subclasshook__()
ProcessFunctionType.__weakref__
ProcessFunctionType._abc_impl
ProcessFunctionType._is_protocol
ProcessFunctionType.is_process_function
ProcessFunctionType.node_class
ProcessFunctionType.process_class
ProcessFunctionType.recreate_from
ProcessFunctionType.run()
ProcessFunctionType.run_get_node()
ProcessFunctionType.run_get_pk()
ProcessFunctionType.spec
calcfunction()
get_stack_size()
infer_valid_type_from_type_annotation()
process_function()
workfunction()
ProcessFuture
ProcessLauncher
CalcJobOutputPort
InputPort
PortNamespace
WithMetadata
WithNonDb
WithSerialize
Process
Process.SINGLE_OUTPUT_LINKNAME
Process.SaveKeys
Process.__abstractmethods__
Process.__annotations__
Process.__init__()
Process.__module__
Process._abc_impl
Process._auto_persist
Process._build_process_label()
Process._create_and_setup_db_record()
Process._filter_serializable_metadata()
Process._flat_inputs()
Process._flat_outputs()
Process._flatten_inputs()
Process._flatten_outputs()
Process._get_namespace_list()
Process._node_class
Process._save_checkpoint()
Process._setup_db_record()
Process._setup_inputs()
Process._setup_metadata()
Process._setup_version_info()
Process._spec_class
Process.build_process_type()
Process.decode_input_args()
Process.define()
Process.encode_input_args()
Process.exit_codes
Process.exposed_inputs()
Process.exposed_outputs()
Process.get_builder()
Process.get_exit_statuses()
Process.get_or_create_db_record()
Process.get_parent_calc()
Process.get_provenance_inputs_iterator()
Process.init()
Process.inputs
Process.is_valid_cache()
Process.kill()
Process.load_instance_state()
Process.metadata
Process.node
Process.on_create()
Process.on_entered()
Process.on_except()
Process.on_finish()
Process.on_output_emitting()
Process.on_paused()
Process.on_playing()
Process.on_terminated()
Process.out()
Process.out_many()
Process.report()
Process.runner
Process.save_instance_state()
Process.set_status()
Process.spec()
Process.spec_metadata
Process.submit()
Process.update_outputs()
Process.uuid
get_query_string_from_process_type_string()
CalcJobProcessSpec
ProcessSpec
ProcessSpec.INPUT_PORT_TYPE
ProcessSpec.METADATA_KEY
ProcessSpec.METADATA_OPTIONS_KEY
ProcessSpec.PORT_NAMESPACE_TYPE
ProcessSpec.__annotations__
ProcessSpec.__init__()
ProcessSpec.__module__
ProcessSpec.exit_code()
ProcessSpec.exit_codes
ProcessSpec.inputs
ProcessSpec.metadata_key
ProcessSpec.options_key
ProcessSpec.outputs
ProcessSpec.ports
prune_mapping()
Submodules#
Exceptions that can be thrown by parts of the workflow engine.
- exception aiida.engine.exceptions.PastException[source]#
Bases:
AiidaException
Raised when an attempt is made to continue a Process that has already excepted before.
- __annotations__ = {}#
- __module__ = 'aiida.engine.exceptions'#
Top level functions that can be used to launch a Process.
- aiida.engine.launch.await_processes(nodes: Sequence[ProcessNode], wait_interval: int = 1) None [source]#
Run a loop until all processes are terminated.
- Parameters:
nodes – Sequence of nodes that represent the processes to await.
wait_interval – The interval between each iteration of checking the status of all processes.
- aiida.engine.launch.run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any] [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
the outputs of the process
- aiida.engine.launch.run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) tuple[dict[str, Any], ProcessNode] [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the process node
- aiida.engine.launch.run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk [source]#
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and process node pk
- aiida.engine.launch.submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, *, wait: bool = False, wait_interval: int = 5, **kwargs: Any) ProcessNode [source]#
Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
- Parameters:
process – the process class, instance or builder to submit
inputs – the input dictionary to be passed to the process
wait – when set to
True
, the submission will be blocking and wait for the process to complete at which point the function returns the calculation node.wait_interval – the number of seconds to wait between checking the state of the process when
wait=True
.kwargs – inputs to be passed to the process. This is an alternative to the positional
inputs
argument.
- Returns:
the calculation node of the process
Definition of AiiDA’s process persister and the necessary object loaders.
- class aiida.engine.persistence.AiiDAPersister[source]#
Bases:
Persister
Persister to take saved process instance states and persisting them to the database.
- __abstractmethods__ = frozenset({})#
- __module__ = 'aiida.engine.persistence'#
- _abc_impl = <_abc._abc_data object>#
- delete_checkpoint(pid: Hashable, tag: str | None = None) None [source]#
Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.
- Parameters:
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- delete_process_checkpoints(pid: Hashable)[source]#
Delete all persisted checkpoints related to the given process id.
- Parameters:
pid – the process id of the
aiida.engine.processes.process.Process
- get_checkpoints()[source]#
Return a list of all the current persisted process checkpoints
- Returns:
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- get_process_checkpoints(pid: Hashable)[source]#
Return a list of all the current persisted process checkpoints for the specified process.
- Parameters:
pid – the process pid
- Returns:
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- load_checkpoint(pid: Hashable, tag: str | None = None) Bundle [source]#
Load a process from a persisted checkpoint by its process id.
- Parameters:
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- Returns:
a bundle with the process state
- Return type:
- Raises:
PersistenceError
Raised if there was a problem loading the checkpoint
- save_checkpoint(process: Process, tag: str | None = None)[source]#
Persist a Process instance.
- Parameters:
process –
aiida.engine.Process
tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process
- Raises:
PersistenceError
Raised if there was a problem saving the checkpoint
- class aiida.engine.persistence.ObjectLoader[source]#
Bases:
DefaultObjectLoader
Custom object loader for aiida-core.
- __abstractmethods__ = frozenset({})#
- __module__ = 'aiida.engine.persistence'#
- _abc_impl = <_abc._abc_data object>#
- load_object(identifier: str) Any [source]#
Attempt to load the object identified by the given identifier.
Note
We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.
- Parameters:
identifier – concatenation of module and resource name
- Returns:
loaded object
- Raises:
ImportError – if the object cannot be loaded
- aiida.engine.persistence.get_object_loader() ObjectLoader [source]#
Return the global AiiDA object loader.
- Returns:
The global object loader
Runners that can run and submit processes.
- class aiida.engine.runners.ResultAndNode(result, node)[source]#
Bases:
NamedTuple
- __annotations__ = {'node': ForwardRef('ProcessNode'), 'result': ForwardRef('Dict[str, Any]')}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('result', 'node')#
- __module__ = 'aiida.engine.runners'#
- static __new__(_cls, result: Dict[str, Any], node: ProcessNode)#
Create new instance of ResultAndNode(result, node)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {}#
- _fields = ('result', 'node')#
- classmethod _make(iterable)#
Make a new ResultAndNode object from a sequence or iterable
- _replace(**kwds)#
Return a new ResultAndNode object replacing specified fields with new values
- node: ProcessNode#
Alias for field number 1
- class aiida.engine.runners.ResultAndPk(result, pk)[source]#
Bases:
NamedTuple
- __annotations__ = {'pk': ForwardRef('int | None'), 'result': ForwardRef('Dict[str, Any]')}#
- __getnewargs__()#
Return self as a plain tuple. Used by copy and pickle.
- __match_args__ = ('result', 'pk')#
- __module__ = 'aiida.engine.runners'#
- static __new__(_cls, result: Dict[str, Any], pk: int | None)#
Create new instance of ResultAndPk(result, pk)
- __orig_bases__ = (<function NamedTuple>,)#
- __repr__()#
Return a nicely formatted representation string
- __slots__ = ()#
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {}#
- _fields = ('result', 'pk')#
- classmethod _make(iterable)#
Make a new ResultAndPk object from a sequence or iterable
- _replace(**kwds)#
Return a new ResultAndPk object replacing specified fields with new values
- class aiida.engine.runners.Runner(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[source]#
Bases:
object
Class that can launch processes by running in the current interpreter or by submitting them to the daemon.
- __annotations__ = {'_closed': 'bool', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_persister': 'Optional[Persister]'}#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.runners', '__annotations__': {'_persister': 'Optional[Persister]', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_closed': 'bool'}, '__doc__': 'Class that can launch processes by running in the current interpreter or by submitting them to the daemon.', '_persister': None, '_communicator': None, '_controller': None, '_closed': False, '__init__': <function Runner.__init__>, '__enter__': <function Runner.__enter__>, '__exit__': <function Runner.__exit__>, 'loop': <property object>, 'transport': <property object>, 'persister': <property object>, 'communicator': <property object>, 'plugin_version_provider': <property object>, 'job_manager': <property object>, 'controller': <property object>, 'is_daemon_runner': <property object>, 'is_closed': <function Runner.is_closed>, 'start': <function Runner.start>, 'stop': <function Runner.stop>, 'run_until_complete': <function Runner.run_until_complete>, 'close': <function Runner.close>, 'instantiate_process': <function Runner.instantiate_process>, 'submit': <function Runner.submit>, 'schedule': <function Runner.schedule>, '_run': <function Runner._run>, 'run': <function Runner.run>, 'run_get_node': <function Runner.run_get_node>, 'run_get_pk': <function Runner.run_get_pk>, 'call_on_process_finish': <function Runner.call_on_process_finish>, 'get_process_future': <function Runner.get_process_future>, '_poll_process': <function Runner._poll_process>, '__dict__': <attribute '__dict__' of 'Runner' objects>, '__weakref__': <attribute '__weakref__' of 'Runner' objects>})#
- __init__(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[source]#
Construct a new runner.
- Parameters:
poll_interval – interval in seconds between polling for status of active sub processes
loop – an asyncio event loop, if none is suppled a new one will be created
communicator – the communicator to use
broker_submit – if True, processes will be submitted to the broker, otherwise they will be scheduled here
persister – the persister to use to persist processes
- __module__ = 'aiida.engine.runners'#
- __weakref__#
list of weak references to the object
- _communicator: Communicator | None = None#
- _controller: RemoteProcessThreadController | None = None#
- _poll_process(node, callback)[source]#
Check whether the process state of the node is terminated and call the callback or reschedule it.
- Parameters:
node – the process node
callback – callback to be called when process is terminated
- _run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Tuple[Dict[str, Any], ProcessNode] [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the calculation node
- call_on_process_finish(pk: int, callback: Callable[[], Any]) None [source]#
Schedule a callback when the process of the given pk is terminated.
This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.
- Parameters:
pk – pk of the process
callback – function to be called upon process termination
- property communicator: Communicator | None#
Get the communicator used by this runner.
- property controller: RemoteProcessThreadController | None#
Get the controller used by this runner.
- get_process_future(pk: int) ProcessFuture [source]#
Return a future for a process.
The future will have the process node as the result when finished.
- Returns:
A future representing the completion of the process node
- property is_daemon_runner: bool#
Return whether the runner is a daemon runner, which means it submits processes over a broker.
- Returns:
True if the runner is a daemon runner
- property job_manager: JobManager#
- property loop: AbstractEventLoop#
Get the event loop of this runner.
- property plugin_version_provider: PluginVersionProvider#
- run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Dict[str, Any] [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
the outputs of the process
- run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndNode [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the calculation node
- run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk [source]#
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and process node pk
- run_until_complete(future: Future) Any [source]#
Run the loop until the future has finished and return the result.
- schedule(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ProcessNode [source]#
Schedule a process to be executed by this runner.
- Parameters:
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns:
the calculation node of the process
- submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any)[source]#
Submit the process with the supplied inputs to this runner immediately returning control to the interpreter.
The return value will be the calculation node of the submitted process
- Parameters:
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns:
the calculation node of the process
- property transport: TransportQueue#
A transport queue to batch process multiple tasks that require a Transport.
- class aiida.engine.transports.TransportQueue(loop: AbstractEventLoop | None = None)[source]#
Bases:
object
A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at some point in the future.
Internally the class will wait for a specific interval at the end of which it will open the transport and give it to all the clients that asked for it up to that point. This way opening of transports (a costly operation) can be minimised.
- __dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'A queue to get transport objects from authinfo. This class allows clients\n to register their interest in a transport object which will be provided at\n some point in the future.\n\n Internally the class will wait for a specific interval at the end of which\n it will open the transport and give it to all the clients that asked for it\n up to that point. This way opening of transports (a costly operation) can\n be minimised.\n ', '__init__': <function TransportQueue.__init__>, 'loop': <property object>, 'request_transport': <function TransportQueue.request_transport>, '__dict__': <attribute '__dict__' of 'TransportQueue' objects>, '__weakref__': <attribute '__weakref__' of 'TransportQueue' objects>, '__annotations__': {'_transport_requests': 'Dict[Hashable, TransportRequest]'}})#
- __init__(loop: AbstractEventLoop | None = None)[source]#
- Parameters:
loop – An asyncio event, will use asyncio.get_event_loop() if not supplied
- __module__ = 'aiida.engine.transports'#
- __weakref__#
list of weak references to the object
- property loop: AbstractEventLoop#
Get the loop being used by this transport queue
- request_transport(authinfo: AuthInfo) Iterator[Awaitable[Transport]] [source]#
Request a transport from an authinfo. Because the client is not allowed to request a transport immediately they will instead be given back a future that can be awaited to get the transport:
async def transport_task(transport_queue, authinfo): with transport_queue.request_transport(authinfo) as request: transport = await request # Do some work with the transport
- Parameters:
authinfo – The authinfo to be used to get transport
- Returns:
A future that can be yielded to give the transport
- class aiida.engine.transports.TransportRequest[source]#
Bases:
object
Information kept about request for a transport object
- __dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'Information kept about request for a transport object', '__init__': <function TransportRequest.__init__>, '__dict__': <attribute '__dict__' of 'TransportRequest' objects>, '__weakref__': <attribute '__weakref__' of 'TransportRequest' objects>, '__annotations__': {'future': 'asyncio.Future'}})#
- __module__ = 'aiida.engine.transports'#
- __weakref__#
list of weak references to the object
Utilities for the workflow engine.
- class aiida.engine.utils.InterruptableFuture(*, loop=None)[source]#
Bases:
Future
A future that can be interrupted by calling interrupt.
- __module__ = 'aiida.engine.utils'#
- interrupt(reason: Exception) None [source]#
This method should be called to interrupt the coroutine represented by this InterruptableFuture.
- async with_interrupt(coro: Awaitable[Any]) Any [source]#
Return result of a coroutine which will be interrupted if this future is interrupted
import asyncio loop = asyncio.get_event_loop() interruptable = InterutableFuture() loop.call_soon(interruptable.interrupt, RuntimeError("STOP")) loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.))) >>> RuntimeError: STOP
- Parameters:
coro – The coroutine that can be interrupted
- Returns:
The result of the coroutine
- aiida.engine.utils.ensure_coroutine(fct: Callable[[...], Any]) Callable[[...], Awaitable[Any]] [source]#
Ensure that the given function
fct
is a coroutineIf the passed function is not already a coroutine, it will be made to be a coroutine
- Parameters:
fct – the function
- Returns:
the coroutine
- async aiida.engine.utils.exponential_backoff_retry(fct: Callable[[...], Any], initial_interval: int | float = 10.0, max_attempts: int = 5, logger: Logger | None = None, ignore_exceptions: None | Type[Exception] | Tuple[Type[Exception], ...] = None) Any [source]#
Coroutine to call a function, recalling it with an exponential backoff in the case of an exception
This coroutine will loop
max_attempts
times, calling thefct
function, breaking immediately when the call finished without raising an exception, at which point the result will be returned. If an exception is caught, the function will await aasyncio.sleep
with a time interval equal to theinitial_interval
multiplied by2 ** (N - 1)
whereN
is the number of excepted calls.- Parameters:
fct – the function to call, which will be turned into a coroutine first if it is not already
initial_interval – the time to wait after the first caught exception before calling the coroutine again
max_attempts – the maximum number of times to call the coroutine before re-raising the exception
ignore_exceptions – exceptions to ignore, i.e. when caught do nothing and simply re-raise
- Returns:
result if the
coro
call completes withinmax_attempts
retries without raising
- aiida.engine.utils.get_process_state_change_timestamp(process_type: str | None = None) datetime | None [source]#
Get the global setting that reflects the last time a process of the given process type changed its state. The returned value will be the corresponding timestamp or None if the setting does not exist.
- Parameters:
process_type – optional process type for which to get the latest state change timestamp. Valid process types are either ‘calculation’ or ‘work’. If not specified, last timestamp for all known process types will be returned.
- Returns:
a timestamp or None
- aiida.engine.utils.instantiate_process(runner: Runner, process: 'Process' | Type['Process'] | 'ProcessBuilder', **inputs) Process [source]#
Return an instance of the process with the given inputs. The function can deal with various types of the process:
Process instance: will simply return the instance
ProcessBuilder instance: will instantiate the Process from the class and inputs defined within it
Process class: will instantiate with the specified inputs
If anything else is passed, a ValueError will be raised
- Parameters:
process – Process instance or class, CalcJobNode class or ProcessBuilder instance
inputs – the inputs for the process to be instantiated with
- aiida.engine.utils.interruptable_task(coro: Callable[[InterruptableFuture], Awaitable[Any]], loop: AbstractEventLoop | None = None) InterruptableFuture [source]#
Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.
- Parameters:
coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter
loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()
- Returns:
an InterruptableFuture
- aiida.engine.utils.is_process_function(function: Any) bool [source]#
Return whether the given function is a process function
- Parameters:
function – a function
- Returns:
True if the function is a wrapped process function, False otherwise
- aiida.engine.utils.is_process_scoped() bool [source]#
Return whether the current scope is within a process.
- Returns:
True if the current scope is within a nested process, False otherwise
- aiida.engine.utils.loop_scope(loop) Iterator[None] [source]#
Make an event loop current for the scope of the context
- Parameters:
loop – The event loop to make current for the duration of the scope
- aiida.engine.utils.prepare_inputs(inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any] [source]#
Prepare inputs for launch of a process.
This is a utility function to pre-process inputs for the process that can be specified both through keyword arguments as well as through the explicit
inputs
argument. When both are specified, aValueError
is raised.- Parameters:
inputs – Inputs dictionary.
kwargs – Inputs defined as keyword arguments.
- Raises:
ValueError – If both
kwargs
andinputs
are defined.- Returns:
The dictionary of inputs for the process.
- aiida.engine.utils.set_process_state_change_timestamp(node: ProcessNode) None [source]#
Set the global setting that reflects the last time a process changed state, for the process type of the given process, to the current timestamp. The process type will be determined based on the class of the calculation node it has as its database container.
- Parameters:
process – the Process instance that changed its state