# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""AiiDA manager for global settings"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import functools
from .configuration import get_config
__all__ = ('get_manager', 'reset_manager')
MANAGER = None
class Manager(object): # pylint: disable=useless-object-inheritance
"""
Manager singleton to provide global versions of commonly used profile/settings related objects
and methods to facilitate their construction.
In AiiDA the settings of many objects are tied to options defined in the current profile. This
means that certain objects should be constructed in a way that depends on the profile. Instead of
having disparate parts of the code accessing the profile we put together here the profile and methods
to create objects based on the current settings.
It is also a useful place to put objects where there can be a single 'global' (per profile) instance.
Future plans:
* reset manager cache when loading a new profile
"""
def _load_backend(self, schema_check=True):
"""Load the backend for the currently configured profile and return it.
:param schema_check: force a database schema check if the database environment has not yet been loaded
:return: the database backend
:rtype: :class:`aiida.orm.Backend`
"""
from aiida.backends.profile import BACKEND_DJANGO, BACKEND_SQLA
from aiida.backends.utils import is_dbenv_loaded, load_dbenv, _load_dbenv_noschemacheck
profile = self.get_profile()
if not is_dbenv_loaded():
if schema_check:
load_dbenv(profile.name)
else:
_load_dbenv_noschemacheck(profile.name)
backend_type = profile.dictionary.AIIDADB_BACKEND
if backend_type == BACKEND_DJANGO:
from aiida.orm.implementation.django.backend import DjangoBackend
self._backend = DjangoBackend()
elif backend_type == BACKEND_SQLA:
from aiida.orm.implementation.sqlalchemy.backend import SqlaBackend
self._backend = SqlaBackend()
else:
raise RuntimeError('Invalid backend type {} in profile: {}'.format(backend_type, profile.name))
return self._backend
def get_backend(self):
"""
Get the database backend
:return: the database backend
:rtype: :class:`aiida.orm.Backend`
"""
if self._backend is None:
self._load_backend()
return self._backend
def get_profile(self):
"""
Get the current profile
:return: current loaded profile instance
:rtype: :class:`~aiida.manage.configuration.profile.Profile`
"""
if self._profile is None:
config = get_config()
self._profile = config.current_profile
return self._profile
def get_persister(self):
"""
Get the persister
:return: the current persister instance
:rtype: :class:`plumpy.Persister`
"""
from aiida.engine import persistence
if self._persister is None:
self._persister = persistence.AiiDAPersister()
return self._persister
def get_communicator(self):
"""
Get the communicator
:return: a global communicator instance
:rtype: :class:`kiwipy.Communicator`
"""
if self._communicator is None:
self._communicator = self.create_communicator()
return self._communicator
def create_communicator(self, task_prefetch_count=None, with_orm=True):
"""
Create a Communicator
:param task_prefetch_count: optional specify how many tasks this communicator take simultaneously
:param with_orm: if True, use ORM (de)serializers. If false, use json.
This is used by verdi status to get a communicator without needing to load the dbenv.
:return: the communicator instance
:rtype: :class:`~kiwipy.rmq.communicator.RmqThreadCommunicator`
"""
from aiida.manage.external import rmq
import kiwipy.rmq
profile = self.get_profile()
if task_prefetch_count is None:
task_prefetch_count = rmq._RMQ_TASK_PREFETCH_COUNT # pylint: disable=protected-access
url = rmq.get_rmq_url()
prefix = profile.rmq_prefix
# This needs to be here, because the verdi commands will call this function and when called in unit tests the
# testing_mode cannot be set.
testing_mode = profile.is_test_profile
message_exchange = rmq.get_message_exchange_name(prefix)
task_exchange = rmq.get_task_exchange_name(prefix)
task_queue = rmq.get_launch_queue_name(prefix)
if with_orm:
from aiida.orm.utils import serialize
encoder = functools.partial(serialize.serialize, encoding='utf-8')
decoder = serialize.deserialize
else:
# used by verdi status to get a communicator without needing to load the dbenv
from aiida.common import json
encoder = functools.partial(json.dumps, encoding='utf-8')
decoder = json.loads
return kiwipy.rmq.RmqThreadCommunicator.connect(
connection_params={'url': url},
message_exchange=message_exchange,
encoder=encoder,
decoder=decoder,
task_exchange=task_exchange,
task_queue=task_queue,
task_prefetch_count=task_prefetch_count,
testing_mode=testing_mode,
)
def get_daemon_client(self):
"""
Return the daemon client for the current profile.
:return: the daemon client
:rtype: :class:`aiida.daemon.client.DaemonClient`
:raises aiida.common.MissingConfigurationError: if the configuration file cannot be found
:raises aiida.common.ProfileConfigurationError: if the given profile does not exist
"""
from aiida.engine.daemon.client import DaemonClient
if self._daemon_client is None:
self._daemon_client = DaemonClient(self.get_profile())
return self._daemon_client
def get_process_controller(self):
"""
Get a process controller
:return: the process controller instance
:rtype: :class:`plumpy.RemoteProcessThreadController`
"""
import plumpy
if self._process_controller is None:
self._process_controller = plumpy.RemoteProcessThreadController(self.get_communicator())
return self._process_controller
def get_runner(self):
"""
Get a runner that is based on the current profile settings and can be used globally by the code.
:return: the global runner
:rtype: :class:`aiida.engine.runners.Runner`
"""
if self._runner is None:
self._runner = self.create_runner()
return self._runner
def set_runner(self, new_runner):
"""
Set the currently used runner
:param new_runner: the new runner to use
:type new_runner: :class:`aiida.engine.runners.Runner`
"""
if self._runner is not None:
self._runner.close()
self._runner = new_runner
def create_runner(self, with_persistence=True, **kwargs):
"""
Create a new runner
:param with_persistence: create a runner with persistence enabled
:type with_persistence: bool
:return: a new runner instance
:rtype: :class:`aiida.engine.runners.Runner`
"""
from aiida.engine import runners
config = get_config()
profile = self.get_profile()
poll_interval = 0.0 if profile.is_test_profile else config.option_get('runner.poll.interval')
settings = {'rmq_submit': False, 'poll_interval': poll_interval}
settings.update(kwargs)
if 'communicator' not in settings:
# Only call get_communicator if we have to as it will lazily create
settings['communicator'] = self.get_communicator()
if with_persistence and 'persister' not in settings:
settings['persister'] = self.get_persister()
return runners.Runner(**settings)
def create_daemon_runner(self, loop=None):
"""
Create a new daemon runner. This is used by workers when the daemon is running and in testing.
:param loop: the (optional) tornado event loop to use
:type loop: :class:`tornado.ioloop.IOLoop`
:return: a runner configured to work in the daemon configuration
:rtype: :class:`aiida.engine.runners.Runner`
"""
import plumpy
from aiida.engine import persistence
from aiida.manage.external import rmq
runner = self.create_runner(rmq_submit=True, loop=loop)
runner_loop = runner.loop
# Listen for incoming launch requests
task_receiver = rmq.ProcessLauncher(
loop=runner_loop,
persister=self.get_persister(),
load_context=plumpy.LoadSaveContext(runner=runner),
loader=persistence.get_object_loader())
def callback(*args, **kwargs):
return plumpy.create_task(functools.partial(task_receiver, *args, **kwargs), loop=runner_loop)
runner.communicator.add_task_subscriber(callback)
return runner
def close(self):
"""
Reset the global settings entirely and release any global objects
"""
if self._communicator is not None:
self._communicator.stop()
if self._runner is not None:
self._runner.stop()
self._backend = None
self._config = None
self._profile = None
self._communicator = None
self._daemon_client = None
self._process_controller = None
self._persister = None
self._runner = None
def __init__(self):
super(Manager, self).__init__()
self._backend = None # type: aiida.orm.Backend
self._config = None # type: aiida.manage.configuration.config.Config
self._daemon_client = None # type: aiida.daemon.client.DaemonClient
self._profile = None # type: aiida.manage.configuration.profile.Profile
self._communicator = None # type: kiwipy.rmq.RmqThreadCommunicator
self._process_controller = None # type: plumpy.RemoteProcessThreadController
self._persister = None # type: aiida.engine.persistence.AiiDAPersister
self._runner = None # type: aiida.engine.runners.Runner
[docs]def get_manager():
global MANAGER # pylint: disable=global-statement
if MANAGER is None:
MANAGER = Manager()
return MANAGER
[docs]def reset_manager():
global MANAGER # pylint: disable=global-statement
if MANAGER is not None:
MANAGER.close()
MANAGER = None