Source code for aiida.engine.runners

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
# pylint: disable=global-statement
"""Runners that can run and submit processes."""

import collections
import logging
import signal
import tornado.ioloop

import plumpy

from aiida.common import exceptions
from aiida.orm import load_node
from aiida.plugins.utils import PluginVersionProvider

from .processes import futures
from .processes.calcjobs import manager
from . import transports
from . import utils

__all__ = ('Runner',)

LOGGER = logging.getLogger(__name__)

ResultAndNode = collections.namedtuple('ResultAndNode', ['result', 'node'])
ResultAndPk = collections.namedtuple('ResultAndPk', ['result', 'pk'])


[docs]class Runner: # pylint: disable=too-many-public-methods """Class that can launch processes by running in the current interpreter or by submitting them to the daemon.""" _persister = None _communicator = None _controller = None _closed = False
[docs] def __init__(self, poll_interval=0, loop=None, communicator=None, rmq_submit=False, persister=None): """ Construct a new runner :param poll_interval: interval in seconds between polling for status of active calculations :param loop: an event loop to use, if none is suppled a new one will be created :type loop: :class:`tornado.ioloop.IOLoop` :param communicator: the communicator to use :type communicator: :class:`kiwipy.Communicator` :param rmq_submit: if True, processes will be submitted to RabbitMQ, otherwise they will be scheduled here :param persister: the persister to use to persist processes :type persister: :class:`plumpy.Persister` """ assert not (rmq_submit and persister is None), \ 'Must supply a persister if you want to submit using communicator' self._loop = loop if loop is not None else tornado.ioloop.IOLoop() self._poll_interval = poll_interval self._rmq_submit = rmq_submit self._transport = transports.TransportQueue(self._loop) self._job_manager = manager.JobManager(self._transport) self._persister = persister self._plugin_version_provider = PluginVersionProvider() if communicator is not None: self._communicator = communicator self._controller = plumpy.RemoteProcessThreadController(communicator) elif self._rmq_submit: LOGGER.warning('Disabling RabbitMQ submission, no communicator provided') self._rmq_submit = False
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.close()
@property def loop(self): """ Get the event loop of this runner :return: the event loop :rtype: :class:`tornado.ioloop.IOLoop` """ return self._loop @property def transport(self): return self._transport @property def persister(self): return self._persister @property def communicator(self): """ Get the communicator used by this runner :return: the communicator :rtype: :class:`kiwipy.Communicator` """ return self._communicator @property def plugin_version_provider(self): return self._plugin_version_provider @property def job_manager(self): return self._job_manager @property def controller(self): return self._controller @property def is_daemon_runner(self): """Return whether the runner is a daemon runner, which means it submits processes over RabbitMQ. :return: True if the runner is a daemon runner :rtype: bool """ return self._rmq_submit
[docs] def is_closed(self): return self._closed
[docs] def start(self): """Start the internal event loop.""" self._loop.start()
[docs] def stop(self): """Stop the internal event loop.""" self._loop.stop()
[docs] def run_until_complete(self, future): """Run the loop until the future has finished and return the result.""" with utils.loop_scope(self._loop): return self._loop.run_sync(lambda: future)
[docs] def close(self): """Close the runner by stopping the loop.""" assert not self._closed self.stop() self._closed = True
[docs] def instantiate_process(self, process, *args, **inputs): from .utils import instantiate_process return instantiate_process(self, process, *args, **inputs)
[docs] def submit(self, process, *args, **inputs): """ Submit the process with the supplied inputs to this runner immediately returning control to the interpreter. The return value will be the calculation node of the submitted process :param process: the process class to submit :param inputs: the inputs to be passed to the process :return: the calculation node of the process """ assert not utils.is_process_function(process), 'Cannot submit a process function' assert not self._closed process = self.instantiate_process(process, *args, **inputs) if not process.metadata.store_provenance: raise exceptions.InvalidOperation('cannot submit a process with `store_provenance=False`') if process.metadata.get('dry_run', False): raise exceptions.InvalidOperation('cannot submit a process from within another with `dry_run=True`') if self._rmq_submit: self.persister.save_checkpoint(process) process.close() self.controller.continue_process(process.pid, nowait=False, no_reply=True) else: self.loop.add_callback(process.step_until_terminated) return process.node
[docs] def schedule(self, process, *args, **inputs): """ Schedule a process to be executed by this runner :param process: the process class to submit :param inputs: the inputs to be passed to the process :return: the calculation node of the process """ assert not utils.is_process_function(process), 'Cannot submit a process function' assert not self._closed process = self.instantiate_process(process, *args, **inputs) self.loop.add_callback(process.step_until_terminated) return process.node
[docs] def _run(self, process, *args, **inputs): """ Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process :param process: the process class or process function to run :param inputs: the inputs to be passed to the process :return: tuple of the outputs of the process and the calculation node """ assert not self._closed if utils.is_process_function(process): result, node = process.run_get_node(*args, **inputs) return result, node with utils.loop_scope(self.loop): process = self.instantiate_process(process, *args, **inputs) def kill_process(_num, _frame): """Send the kill signal to the process in the current scope.""" LOGGER.critical('runner received interrupt, killing process %s', process.pid) process.kill(msg='Process was killed because the runner received an interrupt') signal.signal(signal.SIGINT, kill_process) signal.signal(signal.SIGTERM, kill_process) process.execute() return process.outputs, process.node
[docs] def run(self, process, *args, **inputs): """ Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process :param process: the process class or process function to run :param inputs: the inputs to be passed to the process :return: the outputs of the process """ result, _ = self._run(process, *args, **inputs) return result
[docs] def run_get_node(self, process, *args, **inputs): """ Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process :param process: the process class or process function to run :param inputs: the inputs to be passed to the process :return: tuple of the outputs of the process and the calculation node """ result, node = self._run(process, *args, **inputs) return ResultAndNode(result, node)
[docs] def run_get_pk(self, process, *args, **inputs): """ Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process :param process: the process class or process function to run :param inputs: the inputs to be passed to the process :return: tuple of the outputs of the process and process node pk """ result, node = self._run(process, *args, **inputs) return ResultAndPk(result, node.pk)
[docs] def call_on_calculation_finish(self, pk, callback): """ Callback to be called when the calculation of the given pk is terminated :param pk: the pk of the calculation :param callback: the function to be called upon calculation termination """ calculation = load_node(pk=pk) self._poll_calculation(calculation, callback)
[docs] def get_calculation_future(self, pk): """ Get a future for an orm Calculation. The future will have the calculation node as the result when finished. :return: A future representing the completion of the calculation node """ return futures.CalculationFuture(pk, self._loop, self._poll_interval, self._communicator)
[docs] def _poll_calculation(self, calc_node, callback): if calc_node.is_terminated: self._loop.add_callback(callback, calc_node.pk) else: self._loop.call_later(self._poll_interval, self._poll_calculation, calc_node, callback)