Source code for aiida.brokers.rabbitmq.defaults
"""Defaults related to RabbitMQ."""
from __future__ import annotations
import os
import typing as t
from aiida.common.extendeddicts import AttributeDict
from aiida.common.log import AIIDA_LOGGER
__all__ = ('BROKER_DEFAULTS',)
LOGGER = AIIDA_LOGGER.getChild('brokers.rabbitmq.defaults')
LAUNCH_QUEUE = 'process.queue'
MESSAGE_EXCHANGE = 'messages'
TASK_EXCHANGE = 'tasks'
BROKER_DEFAULTS = AttributeDict(
{
'protocol': 'amqp',
'username': 'guest',
'password': 'guest',
'host': '127.0.0.1',
'port': 5672,
'virtual_host': '',
'heartbeat': 600,
}
)
[docs]
def detect_rabbitmq_config(
protocol: str | None = None,
username: str | None = None,
password: str | None = None,
host: str | None = None,
port: int | None = None,
virtual_host: str | None = None,
) -> dict[str, t.Any]:
"""Try to connect to a RabbitMQ server with the default connection parameters.
:raises ConnectionError: If the connection failed with the provided connection parameters
:returns: The connection parameters if the RabbitMQ server was successfully connected to, or ``None`` otherwise.
"""
from kiwipy.rmq.threadcomms import connect
connection_params = {
'protocol': protocol or os.getenv('AIIDA_BROKER_PROTOCOL', BROKER_DEFAULTS['protocol']),
'username': username or os.getenv('AIIDA_BROKER_USERNAME', BROKER_DEFAULTS['username']),
'password': password or os.getenv('AIIDA_BROKER_PASSWORD', BROKER_DEFAULTS['password']),
'host': host or os.getenv('AIIDA_BROKER_HOST', BROKER_DEFAULTS['host']),
'port': port or int(os.getenv('AIIDA_BROKER_PORT', BROKER_DEFAULTS['port'])),
'virtual_host': virtual_host or os.getenv('AIIDA_BROKER_VIRTUAL_HOST', BROKER_DEFAULTS['virtual_host']),
}
LOGGER.info(f'Attempting to connect to RabbitMQ with parameters: {connection_params}')
try:
connect(connection_params=connection_params)
except ConnectionError:
raise ConnectionError(f'Failed to connect with following connection parameters: {connection_params}')
# The profile configuration expects the keys of the broker config to be prefixed with ``broker_``.
return {f'broker_{key}': value for key, value in connection_params.items()}