Source code for aiida.engine.processes.calcjobs.manager

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""
Module containing utilities and classes relating to job calculations running
on systems that require transport.
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import contextlib
from functools import partial
import time

from six import iteritems, itervalues
from tornado import concurrent, gen

from aiida import schedulers
from aiida.common import exceptions
from ...utils import RefObjectStore

__all__ = ('JobsList', 'JobManager')


[docs]class JobsList(object): # pylint: disable=useless-object-inheritance """ A list of submitted jobs on a machine connected to by transport based on the authorisation information. """
[docs] def __init__(self, authinfo, transport_queue): """ :param authinfo: The authinfo used to check the jobs list :type authinfo: :class:`aiida.orm.AuthInfo` :param transport_queue: A transport queue :type: :class:`aiida.engine.transports.TransportQueue` """ self._authinfo = authinfo self._transport_queue = transport_queue self._loop = transport_queue.loop() self._jobs_cache = {} self._last_updated = None # type: float self._job_update_requests = {} # Mapping: {job_id: Future} self._update_handle = None
[docs] def get_minimum_update_interval(self): """ Get the minimum interval that can be expected between updates of the list :return: The minimum interval :rtype: float """ return self._authinfo.computer.get_minimum_job_poll_interval()
[docs] def get_last_updated(self): """ Get the timestamp of when the list was last updated as produced by `time.time()` :return: The last update point :rtype: float """ return self._last_updated
[docs] @gen.coroutine def _get_jobs_from_scheduler(self): """ Get the current jobs list from the scheduler :return: A dictionary of {job_id: job info} :rtype: dict """ with self._transport_queue.request_transport(self._authinfo) as request: transport = yield request scheduler = self._authinfo.computer.get_scheduler() scheduler.set_transport(transport) kwargs = {'as_dict': True} if scheduler.get_feature('can_query_by_user'): kwargs['user'] = "$USER" else: kwargs['jobs'] = self._get_jobs_with_scheduler() scheduler_response = scheduler.get_jobs(**kwargs) jobs_cache = {} for job_id, job_info in iteritems(scheduler_response): # If the job is done then get detailed job information detailed_job_info = None if job_info.job_state == schedulers.JobState.DONE: try: detailed_job_info = scheduler.get_detailed_jobinfo(job_id) except exceptions.FeatureNotAvailable: detailed_job_info = 'This scheduler does not implement get_detailed_jobinfo' job_info.detailedJobinfo = detailed_job_info jobs_cache[job_id] = job_info raise gen.Return(jobs_cache)
[docs] @gen.coroutine def _update_job_info(self): """ Update all of the job information objects for a given authinfo, that is to say for all the jobs on a particular machine for a particular user. This will set the futures for all pending update requests where the corresponding job has a new status compared to the last update. """ try: if not self._update_requests_outstanding(): return # Update our cache of the job states self._jobs_cache = yield self._get_jobs_from_scheduler() except Exception as exception: # Set the exception on all the update futures for future in itervalues(self._job_update_requests): if not future.done(): future.set_exception(exception) raise else: for job_id, future in iteritems(self._job_update_requests): if not future.done(): future.set_result(self._jobs_cache.get(job_id, None)) finally: self._job_update_requests = {}
[docs] @contextlib.contextmanager def request_job_info_update(self, job_id): """ Request job info about a job when it next changes it's job state. If the job is not found in the jobs list at the update the future will resolve to None. :param job_id: The job identifier :return: A future that will resolve to a JobInfo object when the job changes state """ # Get or create the future request = self._job_update_requests.setdefault(job_id, concurrent.Future()) assert not request.done(), "The future should be no be in the done state" try: self._ensure_updating() yield request finally: pass
[docs] def _ensure_updating(self): """ Ensure that we are updating the job list from the remote resource. This will automatically stop if there are no outstanding requests. """ @gen.coroutine def updating(): """ Do the actual update, stop if not requests left """ yield self._update_job_info() # Any outstanding requests? if self._update_requests_outstanding(): self._update_handle = self._loop.call_later(self._get_next_update_delay(), updating) else: self._update_handle = None # Check if we're already updating if self._update_handle is None: self._update_handle = self._loop.call_later(self._get_next_update_delay(), updating)
[docs] @staticmethod def _has_job_state_changed(old, new): """ :type old: :class:`aiida.schedulers.JobInfo` :type new: :class:`aiida.schedulers.JobInfo` :rtype: bool """ if old is None and new is None: return False if old is None or new is None: # One is None and the other isn't return True return old.job_state != new.job_state or old.job_substate != new.job_substate
[docs] def _get_next_update_delay(self): """ Calculate when we are next allowed to call the scheduler get jobs command based on when we last called it, how long has elapsed and the minimum given update interval. :return: The delay (in seconds) for when it's safe to call the get jobs command :rtype: float """ if self._last_updated is None: # Never updated, so do it straight away return 0. # Make sure to actually 'get' it here, so that if the user changed it # between times we use the current value minimum_interval = self._authinfo.computer.get_minimum_job_poll_interval() elapsed = time.time() - self._last_updated return max(minimum_interval - elapsed, 0.)
[docs] def _update_requests_outstanding(self): return any(not request.done() for request in itervalues(self._job_update_requests))
[docs] def _get_jobs_with_scheduler(self): """ Get all the jobs that are currently with scheduler for this authinfo :return: the list of jobs with the scheduler :rtype: list """ return [str(job_id) for job_id, _ in self._job_update_requests.items()]
[docs]class JobManager(object): # pylint: disable=useless-object-inheritance """ A manager for jobs on a (usually) remote resource such as a supercomputer """
[docs] def __init__(self, transport_queue): self._transport_queue = transport_queue self._job_lists = RefObjectStore()
[docs] @contextlib.contextmanager def request_job_info_update(self, authinfo, job_id): """ Get a future that will resolve to information about a given job. This is a context manager so that if the user leaves the context the request is automatically cancelled. :return: A tuple containing the JobInfo object and detailed job info. Both can be None. :rtype: :class:`tornado.concurrent.Future` """ # Define a way to create a JobsList if needed create = partial(JobsList, authinfo, self._transport_queue) with self._job_lists.get(authinfo.id, create) as job_list: with job_list.request_job_info_update(job_id) as request: try: yield request finally: if not request.done(): request.cancel()