# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
import functools
import logging
import tempfile
from tornado.gen import coroutine, Return
import plumpy
from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
from ..process import ProcessState
UPLOAD_COMMAND = 'upload'
SUBMIT_COMMAND = 'submit'
UPDATE_COMMAND = 'update'
RETRIEVE_COMMAND = 'retrieve'
KILL_COMMAND = 'kill'
TRANSPORT_TASK_RETRY_INITIAL_INTERVAL = 20
TRANSPORT_TASK_MAXIMUM_ATTEMTPS = 5
logger = logging.getLogger(__name__)
[docs]class PreSubmitException(Exception):
"""Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`."""
[docs]@coroutine
def task_upload_job(process, transport_queue, cancellable):
"""Transport task that will attempt to upload the files of a job calculation to the remote.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
node = process.node
if node.get_state() == CalcJobState.SUBMITTING:
logger.warning('CalcJob<{}> already marked as SUBMITTING, skipping task_update_job'.format(node.pk))
raise Return
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_upload():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
with SandboxFolder() as folder:
# Any exception thrown in `presubmit` call is not transient so we circumvent the exponential backoff
try:
calc_info = process.presubmit(folder)
except Exception as exception: # pylint: disable=broad-except
raise PreSubmitException('exception occurred in presubmit call') from exception
else:
execmanager.upload_calculation(node, transport, calc_info, folder)
raise Return
try:
logger.info('scheduled request to upload CalcJob<{}>'.format(node.pk))
ignore_exceptions = (plumpy.CancelledError, PreSubmitException)
result = yield exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions)
except PreSubmitException:
raise
except plumpy.CancelledError:
pass
except Exception:
logger.warning('uploading CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('upload_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('uploading CalcJob<{}> successful'.format(node.pk))
node.set_state(CalcJobState.SUBMITTING)
raise Return(result)
[docs]@coroutine
def task_submit_job(node, transport_queue, cancellable):
"""Transport task that will attempt to submit a job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
if node.get_state() == CalcJobState.WITHSCHEDULER:
assert node.get_job_id() is not None, 'job is WITHSCHEDULER, however, it does not have a job id'
logger.warning('CalcJob<{}> already marked as WITHSCHEDULER, skipping task_submit_job'.format(node.pk))
raise Return(node.get_job_id())
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_submit():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.submit_calculation(node, transport))
try:
logger.info('scheduled request to submit CalcJob<{}>'.format(node.pk))
result = yield exponential_backoff_retry(
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption)
except plumpy.Interruption:
pass
except Exception:
logger.warning('submitting CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('submit_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('submitting CalcJob<{}> successful'.format(node.pk))
node.set_state(CalcJobState.WITHSCHEDULER)
raise Return(result)
[docs]@coroutine
def task_update_job(node, job_manager, cancellable):
"""Transport task that will attempt to update the scheduler status of the job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:type node: :class:`aiida.orm.nodes.process.calculation.calcjob.CalcJobNode`
:param job_manager: The job manager
:type job_manager: :class:`aiida.engine.processes.calcjobs.manager.JobManager`
:param cancellable: A cancel flag
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return containing True if the tasks was successfully completed, False otherwise
"""
if node.get_state() == CalcJobState.RETRIEVING:
logger.warning('CalcJob<{}> already marked as RETRIEVING, skipping task_update_job'.format(node.pk))
raise Return(True)
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
job_id = node.get_job_id()
@coroutine
def do_update():
# Get the update request
with job_manager.request_job_info_update(authinfo, job_id) as update_request:
job_info = yield cancellable.with_interrupt(update_request)
if job_info is None:
# If the job is computed or not found assume it's done
node.set_scheduler_state(JobState.DONE)
job_done = True
else:
node.set_last_job_info(job_info)
node.set_scheduler_state(job_info.job_state)
job_done = job_info.job_state == JobState.DONE
raise Return(job_done)
try:
logger.info('scheduled request to update CalcJob<{}>'.format(node.pk))
job_done = yield exponential_backoff_retry(
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption)
except plumpy.Interruption:
raise
except Exception:
logger.warning('updating CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('update_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('updating CalcJob<{}> successful'.format(node.pk))
if job_done:
node.set_state(CalcJobState.RETRIEVING)
raise Return(job_done)
[docs]@coroutine
def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancellable):
"""Transport task that will attempt to retrieve all files of a completed job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
if node.get_state() == CalcJobState.PARSING:
logger.warning('CalcJob<{}> already marked as PARSING, skipping task_retrieve_job'.format(node.pk))
raise Return
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_retrieve():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
# Perform the job accounting and set it on the node if successful. If the scheduler does not implement this
# still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the
# accounting was called but could not be set.
scheduler = node.computer.get_scheduler()
scheduler.set_transport(transport)
try:
detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id())
except FeatureNotAvailable:
logger.info('detailed job info not available for scheduler of CalcJob<{}>'.format(node.pk))
node.set_detailed_job_info(None)
else:
node.set_detailed_job_info(detailed_job_info)
raise Return(execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder))
try:
logger.info('scheduled request to retrieve CalcJob<{}>'.format(node.pk))
result = yield exponential_backoff_retry(
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption)
except plumpy.Interruption:
raise
except Exception:
logger.warning('retrieving CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('retrieve_calculation failed {} times consecutively'.format(max_attempts))
else:
node.set_state(CalcJobState.PARSING)
logger.info('retrieving CalcJob<{}> successful'.format(node.pk))
raise Return
[docs]@coroutine
def task_kill_job(node, transport_queue, cancellable):
"""Transport task that will attempt to kill a job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
if node.get_state() in [CalcJobState.UPLOADING, CalcJobState.SUBMITTING]:
logger.warning('CalcJob<{}> killed, it was in the {} state'.format(node.pk, node.get_state()))
raise Return(True)
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_kill():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.kill_calculation(node, transport))
try:
logger.info('scheduled request to kill CalcJob<{}>'.format(node.pk))
result = yield exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
except plumpy.Interruption:
raise
except Exception:
logger.warning('killing CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('kill_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('killing CalcJob<{}> successful'.format(node.pk))
node.set_scheduler_state(JobState.DONE)
raise Return(result)
[docs]class Waiting(plumpy.Waiting):
"""The waiting state for the `CalcJob` process."""
[docs] def __init__(self, process, done_callback, msg=None, data=None):
"""
:param :class:`~plumpy.base.state_machine.StateMachine` process: The process this state belongs to
"""
super().__init__(process, done_callback, msg, data)
self._task = None
self._killing = None
[docs] def load_instance_state(self, saved_state, load_context):
super().load_instance_state(saved_state, load_context)
self._task = None
self._killing = None
[docs] @coroutine
def execute(self):
node = self.process.node
transport_queue = self.process.runner.transport
command = self.data
process_status = 'Waiting for transport task: {}'.format(command)
try:
if command == UPLOAD_COMMAND:
node.set_process_status(process_status)
yield self._launch_task(task_upload_job, self.process, transport_queue)
raise Return(self.submit())
elif command == SUBMIT_COMMAND:
node.set_process_status(process_status)
yield self._launch_task(task_submit_job, node, transport_queue)
raise Return(self.update())
elif self.data == UPDATE_COMMAND:
job_done = False
while not job_done:
scheduler_state = node.get_scheduler_state()
scheduler_state_string = scheduler_state.name if scheduler_state else 'UNKNOWN'
process_status = 'Monitoring scheduler: job state {}'.format(scheduler_state_string)
node.set_process_status(process_status)
job_done = yield self._launch_task(task_update_job, node, self.process.runner.job_manager)
raise Return(self.retrieve())
elif self.data == RETRIEVE_COMMAND:
node.set_process_status(process_status)
# Create a temporary folder that has to be deleted by JobProcess.retrieved after successful parsing
temp_folder = tempfile.mkdtemp()
yield self._launch_task(task_retrieve_job, node, transport_queue, temp_folder)
raise Return(self.parse(temp_folder))
else:
raise RuntimeError('Unknown waiting command')
except TransportTaskException as exception:
raise plumpy.PauseInterruption('Pausing after failed transport task: {}'.format(exception))
except plumpy.KillInterruption:
yield self._launch_task(task_kill_job, node, transport_queue)
self._killing.set_result(True)
raise
except Return:
node.set_process_status(None)
raise
except (plumpy.Interruption, plumpy.CancelledError):
node.set_process_status('Transport task {} was interrupted'.format(command))
raise
finally:
# If we were trying to kill but we didn't deal with it, make sure it's set here
if self._killing and not self._killing.done():
self._killing.set_result(False)
[docs] @coroutine
def _launch_task(self, coro, *args, **kwargs):
task_fn = functools.partial(coro, *args, **kwargs)
try:
self._task = interruptable_task(task_fn)
result = yield self._task
raise Return(result)
finally:
self._task = None
[docs] def upload(self):
"""Return the `Waiting` state that will `upload` the `CalcJob`."""
msg = 'Waiting for calculation folder upload'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPLOAD_COMMAND)
[docs] def submit(self):
"""Return the `Waiting` state that will `submit` the `CalcJob`."""
msg = 'Waiting for scheduler submission'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=SUBMIT_COMMAND)
[docs] def update(self):
"""Return the `Waiting` state that will `update` the `CalcJob`."""
msg = 'Waiting for scheduler update'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPDATE_COMMAND)
[docs] def retrieve(self):
"""Return the `Waiting` state that will `retrieve` the `CalcJob`."""
msg = 'Waiting to retrieve'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=RETRIEVE_COMMAND)
[docs] def parse(self, retrieved_temporary_folder):
"""Return the `Running` state that will parse the `CalcJob`.
:param retrieved_temporary_folder: temporary folder used in retrieving that can be used during parsing.
"""
return self.create_state(ProcessState.RUNNING, self.process.parse, retrieved_temporary_folder)
[docs] def interrupt(self, reason):
"""Interrupt the `Waiting` state by calling interrupt on the transport task `InterruptableFuture`."""
if self._task is not None:
self._task.interrupt(reason)
if isinstance(reason, plumpy.KillInterruption):
if self._killing is None:
self._killing = plumpy.Future()
return self._killing