Source code for kiwipy.communications

from __future__ import absolute_import
import abc
import concurrent.futures
import sys

import shortuuid
import six

from . import futures

__all__ = [
    'Communicator', 'CommunicatorHelper', 'RemoteException', 'DeliveryFailed', 'TaskRejected', 'UnroutableError',
    'TimeoutError', 'DuplicateSubscriberIdentifier'
]


[docs]class RemoteException(Exception): """An exception occurred at the remote end of the call """
[docs]class DeliveryFailed(Exception): """Failed to deliver a message """
class UnroutableError(DeliveryFailed): """The messages was unroutable """ class TaskRejected(Exception): """ A task was rejected at the remote end """ class DuplicateSubscriberIdentifier(Exception): """Failed to add a subscriber because the identifier supplied is already in use""" TimeoutError = concurrent.futures.TimeoutError # pylint: disable=redefined-builtin @six.add_metaclass(abc.ABCMeta) class Communicator(object): """ The interface for a communicator used to both send and receive various types of message. """ @abc.abstractmethod def add_rpc_subscriber(self, subscriber, identifier=None): pass @abc.abstractmethod def remove_rpc_subscriber(self, identifier): """ Remove an RPC subscriber given the identifier. Raises a `ValueError` if there is no such subscriber. :param identifier: The RPC subscriber identifier """ pass @abc.abstractmethod def add_task_subscriber(self, subscriber): pass @abc.abstractmethod def remove_task_subscriber(self, subscriber): pass @abc.abstractmethod def add_broadcast_subscriber(self, subscriber, identifier=None): """ Add a broadcast subscriber that will receive all broadcast messages :param subscriber: the subscriber function to be called :param identifier: an optional identifier for the subscriber :return: an identifier for the subscriber and can be subsequently used to remove it """ pass @abc.abstractmethod def remove_broadcast_subscriber(self, identifier): """ Remove a broadcast subscriber :param identifier: the identifier of the subscriber to remove """ pass @abc.abstractmethod def task_send(self, task, no_reply=False): """ Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task. :param task: The task message :param no_reply: Do not send a reply containing the result of the task :type no_reply: bool :return: A future corresponding to the outcome of the task :rtype: :class:`kiwipy.Future` """ @abc.abstractmethod def rpc_send(self, recipient_id, msg): """ Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call. :param recipient_id: The recipient identifier :param msg: The body of the message :return: A future corresponding to the outcome of the call :rtype: :class:`kiwipy.Future` """ pass @abc.abstractmethod def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): pass class CommunicatorHelper(Communicator): # Have to disable this linter because this class remains abstract and it is # just used by calsses that will themselves be concrete # pylint: disable=abstract-method def __init__(self): self._task_subscribers = [] self._broadcast_subscribers = {} self._rpc_subscribers = {} def add_rpc_subscriber(self, subscriber, identifier=None): identifier = identifier or shortuuid.uuid() if identifier in self._rpc_subscribers: raise DuplicateSubscriberIdentifier("RPC identifier '{}'".format(identifier)) self._rpc_subscribers[identifier] = subscriber def remove_rpc_subscriber(self, identifier): try: self._rpc_subscribers.pop(identifier) except KeyError: raise ValueError("Unknown subscriber '{}'".format(identifier)) def add_task_subscriber(self, subscriber): """ Register a task subscriber :param subscriber: The task callback function """ self._task_subscribers.append(subscriber) def remove_task_subscriber(self, subscriber): """ Remove a task subscriber :param subscriber: The task callback function """ try: self._task_subscribers.remove(subscriber) except ValueError: raise ValueError("Unknown subscriber: '{}'".format(subscriber)) def add_broadcast_subscriber(self, subscriber, identifier=None): identifier = identifier or shortuuid.uuid() if identifier in self._broadcast_subscribers: raise DuplicateSubscriberIdentifier("Broadcast identifier '{}'".format(identifier)) self._broadcast_subscribers[identifier] = subscriber return identifier def remove_broadcast_subscriber(self, identifier): try: del self._broadcast_subscribers[identifier] except KeyError: raise ValueError("Broadcast subscriber '{}' unknown".format(identifier)) def fire_task(self, msg, no_reply=False): future = futures.Future() handled = False for subscriber in self._task_subscribers: try: result = subscriber(self, msg) future.set_result(result) handled = True break except TaskRejected: pass except Exception: # pylint: disable=broad-except future.set_exception(RemoteException(sys.exc_info())) handled = True break if not handled: future.set_exception(TaskRejected("Rejected by all subscribers")) if no_reply: return None return future def fire_rpc(self, recipient_id, msg): try: subscriber = self._rpc_subscribers[recipient_id] except KeyError: raise UnroutableError("Unknown rpc recipient '{}'".format(recipient_id)) else: future = futures.Future() try: future.set_result(subscriber(self, msg)) except Exception: # pylint: disable=broad-except future.set_exception(RemoteException(sys.exc_info())) return future def fire_broadcast(self, body, sender=None, subject=None, correlation_id=None): for subscriber in self._broadcast_subscribers.values(): subscriber(self, body=body, sender=sender, subject=subject, correlation_id=correlation_id) return True