Source code for aiida.brokers.rabbitmq.utils

"""Utilites for RabbitMQ."""

from . import defaults

__all__ = ('get_rmq_url', 'get_launch_queue_name', 'get_message_exchange_name', 'get_task_exchange_name')


[docs] def get_rmq_url(protocol=None, username=None, password=None, host=None, port=None, virtual_host=None, **kwargs): """Return the URL to connect to RabbitMQ. .. note:: The default of the ``host`` is set to ``127.0.0.1`` instead of ``localhost`` because on some computers localhost resolves first to IPv6 with address ::1 and if RMQ is not running on IPv6 one gets an annoying warning. For more info see: https://github.com/aiidateam/aiida-core/issues/1142 :param protocol: the protocol to use, `amqp` or `amqps`. :param username: the username for authentication. :param password: the password for authentication. :param host: the hostname of the RabbitMQ server. :param port: the port of the RabbitMQ server. :param virtual_host: the virtual host to connect to. :param kwargs: remaining keyword arguments that will be encoded as query parameters. :returns: the connection URL string. """ from urllib.parse import urlencode, urlunparse if 'heartbeat' not in kwargs: kwargs['heartbeat'] = defaults.BROKER_DEFAULTS.heartbeat scheme = protocol or defaults.BROKER_DEFAULTS.protocol netloc = '{username}:{password}@{host}:{port}'.format( username=username or defaults.BROKER_DEFAULTS.username, password=password or defaults.BROKER_DEFAULTS.password, host=host or defaults.BROKER_DEFAULTS.host, port=port or defaults.BROKER_DEFAULTS.port, ) path = virtual_host or defaults.BROKER_DEFAULTS.virtual_host parameters = '' query = urlencode(kwargs) fragment = '' # The virtual host is optional but if it is specified it needs to start with a forward slash. If the virtual host # itself contains forward slashes, they need to be encoded. if path and not path.startswith('/'): path = f'/{path}' return urlunparse((scheme, netloc, path, parameters, query, fragment))
[docs] def get_launch_queue_name(prefix=None): """Return the launch queue name with an optional prefix. :returns: launch queue name """ if prefix is not None: return f'{prefix}.{defaults.LAUNCH_QUEUE}' return defaults.LAUNCH_QUEUE
[docs] def get_message_exchange_name(prefix): """Return the message exchange name for a given prefix. :returns: message exchange name """ return f'{prefix}.{defaults.MESSAGE_EXCHANGE}'
[docs] def get_task_exchange_name(prefix): """Return the task exchange name for a given prefix. :returns: task exchange name """ return f'{prefix}.{defaults.TASK_EXCHANGE}'