# -*- coding: utf-8 -*-
"""Client for RabbitMQ Management HTTP API."""
from __future__ import annotations
import typing as t
from urllib.parse import quote
import requests
from aiida.common.exceptions import AiidaException
__all__ = ('RabbitmqManagementClient', 'ManagementApiConnectionError')
[docs]class ManagementApiConnectionError(AiidaException):
"""Raised when no connection can be made to the management HTTP API."""
[docs]class RabbitmqManagementClient:
"""Client for RabbitMQ Management HTTP API.
This requires the ``rabbitmq_management`` plugin (https://www.rabbitmq.com/management.html) to be enabled. Typically
this is enabled by running ``rabbitmq-plugins enable rabbitmq_management``.
"""
[docs] def __init__(self, username: str, password: str, hostname: str, virtual_host: str):
"""Construct a new instance.
:param username: The username to authenticate with.
:param password: The password to authenticate with.
:param hostname: The hostname of the RabbitMQ server.
:param virtual_host: The virtual host.
"""
self._username = username
self._password = password
self._hostname = hostname
self._virtual_host = virtual_host
self._authentication = requests.auth.HTTPBasicAuth(username, password)
[docs] def request(
self,
url: str,
url_params: dict[str, str] = None,
method: str = 'GET',
params: dict[str, t.Any] = None,
) -> requests.Response:
"""Make a request.
:param url: The resource path with placeholders, e.g., ``queues/{virtual_host}/{queue}``.
:param url_params: Dictionary with values for the placeholders in the ``url``. The ``virtual_host`` value is
automatically inserted and should not be specified.
:param method: The HTTP method.
:param params: Query parameters to add to the URL.
:returns: The response of the request.
:raises `ManagementApiConnectionError`: If connection to the API cannot be made.
"""
url = self.format_url(url, url_params)
try:
return requests.request(method, url, auth=self._authentication, params=params or {})
except requests.exceptions.ConnectionError as exception:
raise ManagementApiConnectionError(
'Could not connect to the management API. Make sure RabbitMQ is running and the management plugin is '
'installed using `sudo rabbitmq-plugins enable rabbitmq_management`.'
) from exception
@property
def is_connected(self) -> bool:
"""Return whether the API server can be connected to.
.. note:: Tries to reach the server at the ``/api/cluster-name`` end-point.
:returns: ``True`` if the server can be reached, ``False`` otherwise.
"""
try:
self.request('cluster-name')
except ManagementApiConnectionError:
return False
else:
return True