Source code for aiida.engine.processes.calcjobs.tasks

###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""Transport tasks for calculation jobs."""

from __future__ import annotations

import asyncio
import functools
import logging
import tempfile
from typing import TYPE_CHECKING, Any, Callable, Optional

import plumpy
import plumpy.futures
import plumpy.persistence
import plumpy.process_states

from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.engine.processes.exit_code import ExitCode
from aiida.engine.transports import TransportQueue
from aiida.engine.utils import InterruptableFuture, exponential_backoff_retry, interruptable_task
from aiida.manage.configuration import get_config_option
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
from aiida.schedulers.datastructures import JobState

from ..process import ProcessState
from .monitors import CalcJobMonitorAction, CalcJobMonitorResult, CalcJobMonitors

if TYPE_CHECKING:
    from .calcjob import CalcJob

UPLOAD_COMMAND = 'upload'
SUBMIT_COMMAND = 'submit'
UPDATE_COMMAND = 'update'
RETRIEVE_COMMAND = 'retrieve'
STASH_COMMAND = 'stash'
KILL_COMMAND = 'kill'

RETRY_INTERVAL_OPTION = 'transport.task_retry_initial_interval'
MAX_ATTEMPTS_OPTION = 'transport.task_maximum_attempts'

logger = logging.getLogger(__name__)


[docs] class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`."""
[docs] async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param process: the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ node = process.node if node.get_state() == CalcJobState.SUBMITTING: logger.warning(f'CalcJob<{node.pk}> already marked as SUBMITTING, skipping task_update_job') return initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) filepath_sandbox = get_config_option('storage.sandbox') or None authinfo = node.get_authinfo() async def do_upload(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) with SandboxFolder(filepath_sandbox) as folder: # Any exception thrown in `presubmit` call is not transient so we circumvent the exponential backoff try: calc_info = process.presubmit(folder) except Exception as exception: raise PreSubmitException('exception occurred in presubmit call') from exception else: remote_folder = execmanager.upload_calculation(node, transport, calc_info, folder) if remote_folder is not None: process.out('remote_folder', remote_folder) skip_submit = calc_info.skip_submit or False return skip_submit try: logger.info(f'scheduled request to upload CalcJob<{node.pk}>') ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) skip_submit = await exponential_backoff_retry( do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except PreSubmitException: raise except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'uploading CalcJob<{node.pk}> failed') raise TransportTaskException(f'upload_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'uploading CalcJob<{node.pk}> successful') node.set_state(CalcJobState.SUBMITTING) return skip_submit
[docs] async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to submit a job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param node: the node that represents the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ if node.get_state() == CalcJobState.WITHSCHEDULER: assert node.get_job_id() is not None, 'job is WITHSCHEDULER, however, it does not have a job id' logger.warning(f'CalcJob<{node.pk}> already marked as WITHSCHEDULER, skipping task_submit_job') return node.get_job_id() initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() async def do_submit(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) return execmanager.submit_calculation(node, transport) try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'submitting CalcJob<{node.pk}> failed') raise TransportTaskException(f'submit_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'submitting CalcJob<{node.pk}> successful') node.set_state(CalcJobState.WITHSCHEDULER) return result
[docs] async def task_update_job(node: CalcJobNode, job_manager, cancellable: InterruptableFuture): """Transport task that will attempt to update the scheduler status of the job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param node: the node that represents the job calculation :param job_manager: The job manager :param cancellable: A cancel flag :return: True if the tasks was successfully completed, False otherwise """ state = node.get_state() if state in [CalcJobState.RETRIEVING, CalcJobState.STASHING]: logger.warning(f'CalcJob<{node.pk}> already marked as `{state}`, skipping task_update_job') return True initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() job_id = node.get_job_id() async def do_update(): # Get the update request with job_manager.request_job_info_update(authinfo, job_id) as update_request: job_info = await cancellable.with_interrupt(update_request) if job_info is None: # If the job is computed or not found assume it's done node.set_scheduler_state(JobState.DONE) job_done = True else: node.set_last_job_info(job_info) node.set_scheduler_state(job_info.job_state) job_done = job_info.job_state == JobState.DONE return job_done try: logger.info(f'scheduled request to update CalcJob<{node.pk}>') ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) job_done = await exponential_backoff_retry( do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'updating CalcJob<{node.pk}> failed') raise TransportTaskException(f'update_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'updating CalcJob<{node.pk}> successful') if job_done: node.set_state(CalcJobState.STASHING) return job_done
[docs] async def task_monitor_job( node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture, monitors: CalcJobMonitors ): """Transport task that will monitor the job calculation if any monitors have been defined. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param node: the node that represents the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param cancellable: A cancel flag :param monitors: An instance of ``CalcJobMonitors`` holding the collection of monitors to process. :return: True if the tasks was successfully completed, False otherwise """ state = node.get_state() if state in [CalcJobState.RETRIEVING, CalcJobState.STASHING]: logger.warning(f'CalcJob<{node.pk}> already marked as `{state}`, skipping task_monitor_job') return None initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() async def do_monitor(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) transport.chdir(node.get_remote_workdir()) return monitors.process(node, transport) try: logger.info(f'scheduled request to monitor CalcJob<{node.pk}>') ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) monitor_result = await exponential_backoff_retry( do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'monitoring CalcJob<{node.pk}> failed') raise TransportTaskException(f'monitor_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'monitoring CalcJob<{node.pk}> successful') return monitor_result
[docs] async def task_retrieve_job( process: 'CalcJob', transport_queue: TransportQueue, retrieved_temporary_folder: str, cancellable: InterruptableFuture, ): """Transport task that will attempt to retrieve all files of a completed job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param process: the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param retrieved_temporary_folder: the absolute path to a directory to store files :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ node = process.node if node.get_state() == CalcJobState.PARSING: logger.warning(f'CalcJob<{node.pk}> already marked as PARSING, skipping task_retrieve_job') return initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() async def do_retrieve(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) # Perform the job accounting and set it on the node if successful. If the scheduler does not implement this # still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the # accounting was called but could not be set. scheduler = node.computer.get_scheduler() # type: ignore[union-attr] scheduler.set_transport(transport) if node.get_job_id() is None: logger.warning(f'there is no job id for CalcJobNoe<{node.pk}>: skipping `get_detailed_job_info`') retrieved = execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder) else: try: detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id()) except FeatureNotAvailable: logger.info(f'detailed job info not available for scheduler of CalcJob<{node.pk}>') node.set_detailed_job_info(None) else: node.set_detailed_job_info(detailed_job_info) retrieved = execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder) if retrieved is not None: process.out(node.link_label_retrieved, retrieved) return retrieved try: logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>') ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise except Exception as exception: logger.warning(f'retrieving CalcJob<{node.pk}> failed') raise TransportTaskException(f'retrieve_calculation failed {max_attempts} times consecutively') from exception else: node.set_state(CalcJobState.PARSING) logger.info(f'retrieving CalcJob<{node.pk}> successful') return result
[docs] async def task_stash_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will optionally stash files of a completed job calculation on the remote. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param node: the node that represents the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :raises: Return if the tasks was successfully completed :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ if node.get_state() == CalcJobState.RETRIEVING: logger.warning(f'calculation<{node.pk}> already marked as RETRIEVING, skipping task_stash_job') return initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() async def do_stash(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) logger.info(f'stashing calculation<{node.pk}>') return execmanager.stash_calculation(node, transport) try: await exponential_backoff_retry( do_stash, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.process_states.Interruption, ) except plumpy.process_states.Interruption: raise except Exception as exception: logger.warning(f'stashing calculation<{node.pk}> failed') raise TransportTaskException(f'stash_calculation failed {max_attempts} times consecutively') from exception else: node.set_state(CalcJobState.RETRIEVING) logger.info(f'stashing calculation<{node.pk}> successful') return
[docs] async def task_kill_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to kill a job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException :param node: the node that represents the job calculation :param transport_queue: the TransportQueue from which to request a Transport :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ initial_interval = get_config_option(RETRY_INTERVAL_OPTION) max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) if node.get_state() in [CalcJobState.UPLOADING, CalcJobState.SUBMITTING]: logger.warning(f'CalcJob<{node.pk}> killed, it was in the {node.get_state()} state') return True authinfo = node.get_authinfo() async def do_kill(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) return execmanager.kill_calculation(node, transport) try: logger.info(f'scheduled request to kill CalcJob<{node.pk}>') result = await exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger) except plumpy.process_states.Interruption: raise except Exception as exception: logger.warning(f'killing CalcJob<{node.pk}> failed') raise TransportTaskException(f'kill_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'killing CalcJob<{node.pk}> successful') node.set_scheduler_state(JobState.DONE) return result
[docs] @plumpy.persistence.auto_persist('msg', 'data', '_command', '_monitor_result') class Waiting(plumpy.process_states.Waiting): """The waiting state for the `CalcJob` process."""
[docs] def __init__( self, process: 'CalcJob', done_callback: Optional[Callable[..., Any]], msg: Optional[str] = None, data: Optional[Any] = None, ): """:param process: The process this state belongs to""" super().__init__(process, done_callback, msg, data) self._task: InterruptableFuture | None = None self._killing: plumpy.futures.Future | None = None self._command: Callable[..., Any] | None = None self._monitor_result: CalcJobMonitorResult | None = None self._monitors: CalcJobMonitors | None = None if isinstance(self.data, dict): self._command = self.data['command'] self._monitor_result = self.data.get('monitor_result', None) else: self._command = self.data
@property def monitors(self) -> CalcJobMonitors | None: """Return the collection of monitors if specified in the inputs. :return: Instance of ``CalcJobMonitors`` containing monitors if specified in the process' input. """ if not hasattr(self, '_monitors'): self._monitors = None if self._monitors is None and 'monitors' in self.process.node.inputs: self._monitors = CalcJobMonitors(self.process.node.inputs.monitors) return self._monitors @property def process(self) -> 'CalcJob': """:return: The process""" return self.state_machine # type: ignore[return-value]
[docs] def load_instance_state(self, saved_state, load_context): super().load_instance_state(saved_state, load_context) self._task = None self._killing = None
[docs] async def execute(self) -> plumpy.process_states.State: # type: ignore[override] """Override the execute coroutine of the base `Waiting` state.""" node = self.process.node transport_queue = self.process.runner.transport result: plumpy.process_states.State = self process_status = f'Waiting for transport task: {self._command}' node.set_process_status(process_status) try: if self._command == UPLOAD_COMMAND: skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue) if skip_submit: result = self.retrieve(monitor_result=self._monitor_result) else: result = self.submit() elif self._command == SUBMIT_COMMAND: result = await self._launch_task(task_submit_job, node, transport_queue) if isinstance(result, ExitCode): # The scheduler plugin returned an exit code from ``Scheduler.submit_from_script`` indicating the # job submission failed due to a non-transient problem and the job should be terminated. return self.create_state(ProcessState.RUNNING, self.process.terminate, result) result = self.update() elif self._command == UPDATE_COMMAND: job_done = False while not job_done: scheduler_state = node.get_scheduler_state() scheduler_state_string = scheduler_state.name if scheduler_state else 'UNKNOWN' process_status = f'Monitoring scheduler: job state {scheduler_state_string}' node.set_process_status(process_status) job_done = await self._launch_task(task_update_job, node, self.process.runner.job_manager) monitor_result = await self._monitor_job(node, transport_queue, self.monitors) if monitor_result and monitor_result.action is CalcJobMonitorAction.KILL: await self._kill_job(node, transport_queue) job_done = True if monitor_result and not monitor_result.retrieve: exit_code = self.process.exit_codes.STOPPED_BY_MONITOR.format(message=monitor_result.message) return self.create_state(ProcessState.RUNNING, self.process.terminate, exit_code) # type: ignore[return-value] result = self.stash(monitor_result=monitor_result) elif self._command == STASH_COMMAND: if node.get_option('stash') is not None: await self._launch_task(task_stash_job, node, transport_queue) result = self.retrieve(monitor_result=self._monitor_result) elif self._command == RETRIEVE_COMMAND: temp_folder = tempfile.mkdtemp() await self._launch_task(task_retrieve_job, self.process, transport_queue, temp_folder) if not self._monitor_result: result = self.parse(temp_folder) elif self._monitor_result.parse is False: exit_code = self.process.exit_codes.STOPPED_BY_MONITOR.format(message=self._monitor_result.message) result = self.create_state( # type: ignore[assignment] ProcessState.RUNNING, self.process.terminate, exit_code ) elif self._monitor_result.override_exit_code: exit_code = self.process.exit_codes.STOPPED_BY_MONITOR.format(message=self._monitor_result.message) result = self.parse(temp_folder, exit_code) else: result = self.parse(temp_folder) else: raise RuntimeError('Unknown waiting command') except TransportTaskException as exception: raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}') except plumpy.process_states.KillInterruption as exception: await self._kill_job(node, transport_queue) node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) except (plumpy.futures.CancelledError, asyncio.CancelledError): node.set_process_status(f'Transport task {self._command} was cancelled') raise except plumpy.process_states.Interruption: node.set_process_status(f'Transport task {self._command} was interrupted') raise else: node.set_process_status(None) return result finally: # If we were trying to kill but we didn't deal with it, make sure it's set here if self._killing and not self._killing.done(): self._killing.set_result(False)
[docs] async def _monitor_job(self, node, transport_queue, monitors) -> CalcJobMonitorResult | None: """Process job monitors if any were specified as inputs.""" if monitors is None: return None if self._monitor_result and self._monitor_result.action == CalcJobMonitorAction.DISABLE_ALL: return None monitor_result = await self._launch_task(task_monitor_job, node, transport_queue, monitors=monitors) if monitor_result and monitor_result.outputs: for label, output in monitor_result.outputs.items(): self.process.out(label, output) self.process.update_outputs() if monitor_result and monitor_result.action == CalcJobMonitorAction.DISABLE_SELF: monitors.monitors[monitor_result.key].disabled = True if monitor_result is not None: self._monitor_result = monitor_result return monitor_result
[docs] async def _kill_job(self, node, transport_queue) -> None: """Kill the job.""" await self._launch_task(task_kill_job, node, transport_queue) if self._killing is not None: self._killing.set_result(True) else: logger.info(f'killed CalcJob<{node.pk}> but async future was None')
[docs] async def _launch_task(self, coro, *args, **kwargs): """Launch a coroutine as a task, making sure to make it interruptable.""" task_fn = functools.partial(coro, *args, **kwargs) try: self._task = interruptable_task(task_fn) result = await self._task return result finally: self._task = None
[docs] def upload(self) -> 'Waiting': """Return the `Waiting` state that will `upload` the `CalcJob`.""" msg = 'Waiting for calculation folder upload' return self.create_state( # type: ignore[return-value] ProcessState.WAITING, None, msg=msg, data={'command': UPLOAD_COMMAND} )
[docs] def submit(self) -> 'Waiting': """Return the `Waiting` state that will `submit` the `CalcJob`.""" msg = 'Waiting for scheduler submission' return self.create_state( # type: ignore[return-value] ProcessState.WAITING, None, msg=msg, data={'command': SUBMIT_COMMAND} )
[docs] def update(self) -> 'Waiting': """Return the `Waiting` state that will `update` the `CalcJob`.""" msg = 'Waiting for scheduler update' return self.create_state( # type: ignore[return-value] ProcessState.WAITING, None, msg=msg, data={'command': UPDATE_COMMAND} )
[docs] def stash(self, monitor_result: CalcJobMonitorResult | None = None) -> 'Waiting': """Return the `Waiting` state that will `stash` the `CalcJob`.""" msg = 'Waiting to stash' return self.create_state( # type: ignore[return-value] ProcessState.WAITING, None, msg=msg, data={'command': STASH_COMMAND, 'monitor_result': monitor_result} )
[docs] def retrieve(self, monitor_result: CalcJobMonitorResult | None = None) -> 'Waiting': """Return the `Waiting` state that will `retrieve` the `CalcJob`.""" msg = 'Waiting to retrieve' return self.create_state( # type: ignore[return-value] ProcessState.WAITING, None, msg=msg, data={'command': RETRIEVE_COMMAND, 'monitor_result': monitor_result} )
[docs] def parse( self, retrieved_temporary_folder: str, exit_code: ExitCode | None = None ) -> plumpy.process_states.Running: """Return the `Running` state that will parse the `CalcJob`. :param retrieved_temporary_folder: temporary folder used in retrieving that can be used during parsing. """ return self.create_state( # type: ignore[return-value] ProcessState.RUNNING, self.process.parse, retrieved_temporary_folder, exit_code )
[docs] def interrupt(self, reason: Any) -> Optional[plumpy.futures.Future]: # type: ignore[override] """Interrupt the `Waiting` state by calling interrupt on the transport task `InterruptableFuture`.""" if self._task is not None: self._task.interrupt(reason) if isinstance(reason, plumpy.process_states.KillInterruption): if self._killing is None: self._killing = plumpy.futures.Future() return self._killing return None