Skip to content

message_queues#

Message queue module.

AbstractMessageQueue #

Bases: ABC

Message broker interface between publisher and consumer.

Source code in llama_deploy/message_queues/base.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class AbstractMessageQueue(ABC):
    """Message broker interface between publisher and consumer."""

    @abstractmethod
    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Subclasses implement publish logic here."""

    async def publish(
        self,
        message: QueueMessage,
        topic: str,
        callback: PublishCallback | None = None,
        **kwargs: Any,
    ) -> Any:
        """Send message to a consumer."""
        logger.info(
            f"Publishing message of type '{message.type}' with action '{message.action}' to topic '{topic}'"
        )
        logger.debug(f"Message: {message.model_dump()}")

        message.stats.publish_time = message.stats.timestamp_str()
        await self._publish(message, topic)

        if callback:
            if inspect.iscoroutinefunction(callback):
                await callback(message, **kwargs)
            else:
                callback(message, **kwargs)

    @abstractmethod
    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer, topic: str
    ) -> StartConsumingCallable:
        """Register consumer to start consuming messages."""

    @abstractmethod
    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        """Deregister consumer to stop publishing messages)."""

    async def get_consumers(
        self, message_type: str
    ) -> Sequence[BaseMessageQueueConsumer]:
        """Gets list of consumers according to a message type."""
        raise NotImplementedError(
            "`get_consumers()` is not implemented for this class."
        )

    @abstractmethod
    async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
        """Perform any cleanup before shutting down."""

    @abstractmethod
    def as_config(self) -> BaseModel:
        """Returns the config dict to reconstruct the message queue."""

publish async #

publish(message: QueueMessage, topic: str, callback: PublishCallback | None = None, **kwargs: Any) -> Any

Send message to a consumer.

Source code in llama_deploy/message_queues/base.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def publish(
    self,
    message: QueueMessage,
    topic: str,
    callback: PublishCallback | None = None,
    **kwargs: Any,
) -> Any:
    """Send message to a consumer."""
    logger.info(
        f"Publishing message of type '{message.type}' with action '{message.action}' to topic '{topic}'"
    )
    logger.debug(f"Message: {message.model_dump()}")

    message.stats.publish_time = message.stats.timestamp_str()
    await self._publish(message, topic)

    if callback:
        if inspect.iscoroutinefunction(callback):
            await callback(message, **kwargs)
        else:
            callback(message, **kwargs)

register_consumer abstractmethod async #

register_consumer(consumer: BaseMessageQueueConsumer, topic: str) -> StartConsumingCallable

Register consumer to start consuming messages.

Source code in llama_deploy/message_queues/base.py
58
59
60
61
62
@abstractmethod
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
    """Register consumer to start consuming messages."""

deregister_consumer abstractmethod async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

Deregister consumer to stop publishing messages).

Source code in llama_deploy/message_queues/base.py
64
65
66
@abstractmethod
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
    """Deregister consumer to stop publishing messages)."""

get_consumers async #

get_consumers(message_type: str) -> Sequence[BaseMessageQueueConsumer]

Gets list of consumers according to a message type.

Source code in llama_deploy/message_queues/base.py
68
69
70
71
72
73
74
async def get_consumers(
    self, message_type: str
) -> Sequence[BaseMessageQueueConsumer]:
    """Gets list of consumers according to a message type."""
    raise NotImplementedError(
        "`get_consumers()` is not implemented for this class."
    )

cleanup abstractmethod async #

cleanup(*args: Any, **kwargs: dict[str, Any]) -> None

Perform any cleanup before shutting down.

Source code in llama_deploy/message_queues/base.py
76
77
78
@abstractmethod
async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
    """Perform any cleanup before shutting down."""

as_config abstractmethod #

as_config() -> BaseModel

Returns the config dict to reconstruct the message queue.

Source code in llama_deploy/message_queues/base.py
80
81
82
@abstractmethod
def as_config(self) -> BaseModel:
    """Returns the config dict to reconstruct the message queue."""