aiida.engine.daemon package#
Module with resources for the daemon.
Submodules#
Client to interact with the daemon.
- class aiida.engine.daemon.client.ControllerProtocol(value)[source]#
Bases:
enum.Enum
The protocol to use for the controller of the Circus daemon.
- IPC = 0#
- TCP = 1#
- __module__ = 'aiida.engine.daemon.client'#
- class aiida.engine.daemon.client.DaemonClient(profile: aiida.manage.configuration.profile.Profile)[source]#
Bases:
object
Client to interact with the daemon.
- DAEMON_ERROR_NOT_RUNNING = 'daemon-error-not-running'#
- DAEMON_ERROR_TIMEOUT = 'daemon-error-timeout'#
- _DAEMON_NAME = 'aiida-{name}'#
- _ENDPOINT_PROTOCOL = 0#
- __dict__ = mappingproxy({'__module__': 'aiida.engine.daemon.client', '__doc__': 'Client to interact with the daemon.', 'DAEMON_ERROR_NOT_RUNNING': 'daemon-error-not-running', 'DAEMON_ERROR_TIMEOUT': 'daemon-error-timeout', '_DAEMON_NAME': 'aiida-{name}', '_ENDPOINT_PROTOCOL': <ControllerProtocol.IPC: 0>, '__init__': <function DaemonClient.__init__>, 'profile': <property object>, 'daemon_name': <property object>, '_verdi_bin': <property object>, 'cmd_start_daemon': <function DaemonClient.cmd_start_daemon>, 'cmd_start_daemon_worker': <property object>, 'loglevel': <property object>, 'virtualenv': <property object>, 'circus_log_file': <property object>, 'circus_pid_file': <property object>, 'circus_port_file': <property object>, 'circus_socket_file': <property object>, 'circus_socket_endpoints': <property object>, 'daemon_log_file': <property object>, 'daemon_pid_file': <property object>, 'get_circus_port': <function DaemonClient.get_circus_port>, 'get_env': <staticmethod object>, 'get_circus_socket_directory': <function DaemonClient.get_circus_socket_directory>, 'get_daemon_pid': <function DaemonClient.get_daemon_pid>, 'is_daemon_running': <property object>, 'delete_circus_socket_directory': <function DaemonClient.delete_circus_socket_directory>, 'get_available_port': <classmethod object>, 'get_controller_endpoint': <function DaemonClient.get_controller_endpoint>, 'get_pubsub_endpoint': <function DaemonClient.get_pubsub_endpoint>, 'get_stats_endpoint': <function DaemonClient.get_stats_endpoint>, 'get_ipc_endpoint': <function DaemonClient.get_ipc_endpoint>, 'get_tcp_endpoint': <function DaemonClient.get_tcp_endpoint>, 'get_client': <function DaemonClient.get_client>, 'call_client': <function DaemonClient.call_client>, 'get_status': <function DaemonClient.get_status>, 'get_numprocesses': <function DaemonClient.get_numprocesses>, 'get_worker_info': <function DaemonClient.get_worker_info>, 'get_daemon_info': <function DaemonClient.get_daemon_info>, 'increase_workers': <function DaemonClient.increase_workers>, 'decrease_workers': <function DaemonClient.decrease_workers>, 'stop_daemon': <function DaemonClient.stop_daemon>, 'restart_daemon': <function DaemonClient.restart_daemon>, 'start_daemon': <function DaemonClient.start_daemon>, '_await_condition': <staticmethod object>, '_start_daemon': <function DaemonClient._start_daemon>, '__dict__': <attribute '__dict__' of 'DaemonClient' objects>, '__weakref__': <attribute '__weakref__' of 'DaemonClient' objects>, '__annotations__': {'_SOCKET_DIRECTORY': 'str | None', '_DAEMON_TIMEOUT': 'int'}})#
- __init__(profile: aiida.manage.configuration.profile.Profile)[source]#
Construct an instance for a given profile.
- Parameters
profile – The profile instance.
- __module__ = 'aiida.engine.daemon.client'#
- __weakref__#
list of weak references to the object (if defined)
- static _await_condition(condition: Callable, exception: Exception, timeout: int = 5, interval: float = 0.1)[source]#
Await a condition to evaluate to
True
or raise the exception if the timeout is reached.- Parameters
condition – A callable that is waited for to return
True
.exception – Raise this exception if
condition
does not returnTrue
aftertimeout
seconds.timeout – Wait this number of seconds for
condition
to returnTrue
before raising.interval – The time in seconds to wait between invocations of
condition
.
- Raises
The exception provided by
exception
if timeout is reached.
- _start_daemon(number_workers: int = 1, foreground: bool = False) None [source]#
Start the daemon.
Warning
This will daemonize the current process and put it in the background. It is most likely not what you want to call if you want to start the daemon from the Python API. Instead you probably will want to use the
aiida.engine.daemon.client.DaemonClient.start_daemon()
function instead.- Parameters
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
- property _verdi_bin: str#
Return the absolute path to the
verdi
binary.- Raises
ConfigurationError – If the path to
verdi
could not be found
- call_client(command: Dict[str, Any]) Dict[str, Any] [source]#
Call the client with a specific command.
Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call still fails with a timeout, this means the daemon was actually not running and it was terminated unexpectedly causing the pid file to not be cleaned up properly.
- Parameters
command – Command to call the circus client with.
- Returns
The result of the circus client call.
- cmd_start_daemon(number_workers: int = 1, foreground: bool = False) list[str] [source]#
Return the command to start the daemon.
- Parameters
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
- decrease_workers(number: int) Dict[str, Any] [source]#
Decrease the number of workers.
- Parameters
number – The number of workers to remove.
- Returns
The client call response.
- delete_circus_socket_directory() None [source]#
Attempt to delete the directory used to store the circus endpoint sockets.
Will not raise if the directory does not exist.
- classmethod get_available_port()[source]#
Get an available port from the operating system.
- Returns
A currently available port.
- get_circus_port() int [source]#
Retrieve the port for the circus controller, which should be written to the circus port file.
If the daemon is running, the port file should exist and contain the port to which the controller is connected. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, an available port will be requested from the operating system, written to the port file and returned.
- Returns
The port for the circus controller.
- get_circus_socket_directory() str [source]#
Retrieve the absolute path of the directory where the circus sockets are stored.
If the daemon is running, the sockets file should exist and contain the absolute path of the directory that contains the sockets of the circus endpoints. If it cannot be read, a
RuntimeError
will be thrown. If the daemon is not running, a temporary directory will be created and its path will be written to the sockets file and returned.Note
A temporary folder needs to be used for the sockets because UNIX limits the filepath length to 107 bytes. Placing the socket files in the AiiDA config folder might seem like the more logical choice but that folder can be placed in an arbitrarily nested directory, the socket filename will exceed the limit. The solution is therefore to always store them in the temporary directory of the operation system whose base path is typically short enough as to not exceed the limit
- Returns
The absolute path of directory to write the sockets to.
- get_client() CircusClient [source]#
Return an instance of the CircusClient.
The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon starting of the daemon.
- Returns
CircusClient instance
- get_controller_endpoint()[source]#
Get the endpoint string for the circus controller.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol an available port will be found and saved in the profile specific port file.
- Returns
The endpoint string.
- get_daemon_info() Dict[str, Any] [source]#
Get statistics about this daemon itself.
- Returns
The client call response. If successful, will contain ‘info’ key.
- get_daemon_pid() int | None [source]#
Get the daemon pid which should be written in the daemon pid file specific to the profile.
- Returns
The pid of the circus daemon process or None if not found.
- static get_env() dict[str, str] [source]#
Return the environment for this current process.
This method is used to pass variables from the environment of the current process to a subprocess that is spawned when the daemon or a daemon worker is started.
It replicates the
PATH
,PYTHONPATH` and the ``AIIDA_PATH
environment variables. ThePYTHONPATH
variable ensures that all Python modules that can be imported by the parent process, are also importable by the subprocess. TheAIIDA_PATH
variable ensures that the subprocess will use the same AiiDA configuration directory as used by the current process.
- get_ipc_endpoint(endpoint)[source]#
Get the ipc endpoint string for a circus daemon endpoint for a given socket.
- Parameters
endpoint – The circus endpoint for which to return a socket.
- Returns
The ipc endpoint string.
- get_numprocesses() Dict[str, Any] [source]#
Get the number of running daemon processes.
- Returns
The client call response. If successful, will contain ‘numprocesses’ key.
- get_pubsub_endpoint()[source]#
Get the endpoint string for the circus pubsub endpoint.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.
- Returns
The endpoint string.
- get_stats_endpoint()[source]#
Get the endpoint string for the circus stats endpoint.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.
- Returns
The endpoint string.
- get_status() Dict[str, Any] [source]#
Get the daemon running status.
- Returns
The client call response. If successful, will will contain ‘status’ key.
- get_tcp_endpoint(port=None)[source]#
Get the tcp endpoint string for a circus daemon endpoint.
If the port is unspecified, the operating system will be asked for a currently available port.
- Parameters
port – A port to use for the endpoint.
- Returns
The tcp endpoint string.
- get_worker_info() Dict[str, Any] [source]#
Get workers statistics for this daemon.
- Returns
The client call response. If successful, will contain ‘info’ key.
- increase_workers(number: int) Dict[str, Any] [source]#
Increase the number of workers.
- Parameters
number – The number of workers to add.
- Returns
The client call response.
- property is_daemon_running: bool#
Return whether the daemon is running, which is determined by seeing if the daemon pid file is present.
- Returns
True if daemon is running, False otherwise.
- property profile: aiida.manage.configuration.profile.Profile#
- restart_daemon(wait: bool) Dict[str, Any] [source]#
Restart the daemon.
- Parameters
wait – Boolean to indicate whether to wait for the result of the command.
- Returns
The client call response.
- start_daemon(number_workers: int = 1, foreground: bool = False, timeout: int = 5) None [source]#
Start the daemon in a sub process running in the background.
- Parameters
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
timeout – Wait this number of seconds for the
is_daemon_running
to returnTrue
before raising.
- Raises
DaemonException – If the daemon fails to start.
DaemonException – If the daemon starts but then is unresponsive or in an unexpected state.
DaemonException – If
is_daemon_running
returnsFalse
after thetimeout
has passed.
- stop_daemon(wait: bool = True, timeout: int = 5) Dict[str, Any] [source]#
Stop the daemon.
- Parameters
wait – Boolean to indicate whether to wait for the result of the command.
timeout – Wait this number of seconds for the
is_daemon_running
to returnFalse
before raising.
- Returns
The client call response.
- Raises
DaemonException – If
is_daemon_running
returnsTrue
after thetimeout
has passed.
- exception aiida.engine.daemon.client.DaemonException[source]#
Bases:
aiida.common.exceptions.AiidaException
Raised when the starting of the daemon failed.
- __module__ = 'aiida.engine.daemon.client'#
- aiida.engine.daemon.client.get_daemon_client(profile_name: str | None = None) DaemonClient [source]#
Return the daemon client for the given profile or the current profile if not specified.
- Parameters
profile_name – Optional profile name to load.
- Returns
The daemon client.
- Raises
aiida.common.MissingConfigurationError – if the configuration file cannot be found.
aiida.common.ProfileConfigurationError – if the given profile does not exist.
This file contains the main routines to submit, check and retrieve calculation results. These are general and contain only the main logic; where appropriate, the routines make reference to the suitable plugins for all plugin-specific operations.
- aiida.engine.daemon.execmanager._find_data_node(inputs: Mapping[str, Any], uuid: str) Optional[aiida.orm.nodes.node.Node] [source]#
Find and return the node with the given UUID from a nested mapping of input nodes.
- Parameters
inputs – (nested) mapping of nodes
uuid – UUID of the node to find
- Returns
instance of Node or None if not found
- aiida.engine.daemon.execmanager.kill_calculation(calculation: aiida.orm.nodes.process.calculation.calcjob.CalcJobNode, transport: aiida.transports.transport.Transport) None [source]#
Kill the calculation through the scheduler
- Parameters
calculation – the instance of CalcJobNode to kill.
transport – an already opened transport to use to address the scheduler
- aiida.engine.daemon.execmanager.retrieve_calculation(calculation: aiida.orm.nodes.process.calculation.calcjob.CalcJobNode, transport: aiida.transports.transport.Transport, retrieved_temporary_folder: str) None [source]#
Retrieve all the files of a completed job calculation using the given transport.
If the job defined anything in the retrieve_temporary_list, those entries will be stored in the retrieved_temporary_folder. The caller is responsible for creating and destroying this folder.
- Parameters
calculation – the instance of CalcJobNode to update.
transport – an already opened transport to use for the retrieval.
retrieved_temporary_folder – the absolute path to a directory in which to store the files listed, if any, in the retrieved_temporary_folder of the jobs CalcInfo
- aiida.engine.daemon.execmanager.retrieve_files_from_list(calculation: aiida.orm.nodes.process.calculation.calcjob.CalcJobNode, transport: aiida.transports.transport.Transport, folder: str, retrieve_list: List[Union[str, Tuple[str, str, int], list]]) None [source]#
Retrieve all the files in the retrieve_list from the remote into the local folder instance through the transport. The entries in the retrieve_list can be of two types:
a string
a list
If it is a string, it represents the remote absolute filepath of the file. If the item is a list, the elements will correspond to the following:
remotepath
localpath
depth
If the remotepath contains file patterns with wildcards, the localpath will be treated as the work directory of the folder and the depth integer determines upto what level of the original remotepath nesting the files will be copied.
- Parameters
transport – the Transport instance.
folder – an absolute path to a folder that contains the files to copy.
retrieve_list – the list of files to retrieve.
- aiida.engine.daemon.execmanager.stash_calculation(calculation: aiida.orm.nodes.process.calculation.calcjob.CalcJobNode, transport: aiida.transports.transport.Transport) None [source]#
Stash files from the working directory of a completed calculation to a permanent remote folder.
After a calculation has been completed, optionally stash files from the work directory to a storage location on the same remote machine. This is useful if one wants to keep certain files from a completed calculation to be removed from the scratch directory, because they are necessary for restarts, but that are too heavy to retrieve. Instructions of which files to copy where are retrieved from the stash.source_list option.
- Parameters
calculation – the calculation job node.
transport – an already opened transport.
- aiida.engine.daemon.execmanager.submit_calculation(calculation: CalcJobNode, transport: Transport) str | ExitCode [source]#
Submit a previously uploaded CalcJob to the scheduler.
- Parameters
calculation – the instance of CalcJobNode to submit.
transport – an already opened transport to use to submit the calculation.
- Returns
the job id as returned by the scheduler submit_from_script call
- aiida.engine.daemon.execmanager.upload_calculation(node: aiida.orm.nodes.process.calculation.calcjob.CalcJobNode, transport: aiida.transports.transport.Transport, calc_info: aiida.common.datastructures.CalcInfo, folder: aiida.common.folders.SandboxFolder, inputs: Optional[Mapping[str, Any]] = None, dry_run: bool = False) None [source]#
Upload a CalcJob instance
- Parameters
node – the CalcJobNode.
transport – an already opened transport to use to submit the calculation.
calc_info – the calculation info datastructure returned by CalcJob.presubmit
folder – temporary local file system folder containing the inputs written by CalcJob.prepare_for_submission
Function that starts a daemon worker.
- async aiida.engine.daemon.worker.shutdown_worker(runner: aiida.engine.runners.Runner) None [source]#
Cleanup tasks tied to the service’s shutdown.