Skip to content

message_queues#

Message queue module.

BaseMessageQueue #

Bases: BaseModel, ABC

Message broker interface between publisher and consumer.

Source code in llama_deploy/message_queues/base.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class BaseMessageQueue(BaseModel, ABC):
    """Message broker interface between publisher and consumer."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)

    @abstractmethod
    async def _publish(self, message: QueueMessage) -> Any:
        """Subclasses implement publish logic here."""
        ...

    async def publish(
        self,
        message: QueueMessage,
        callback: Optional[PublishCallback] = None,
        **kwargs: Any,
    ) -> Any:
        """Send message to a consumer."""
        logger.info(
            f"Publishing message to '{message.type}' with action '{message.action}'"
        )
        logger.debug(f"Message: {message.model_dump()}")

        message.stats.publish_time = message.stats.timestamp_str()
        await self._publish(message)

        if callback:
            if inspect.iscoroutinefunction(callback):
                await callback(message, **kwargs)
            else:
                callback(message, **kwargs)

    @abstractmethod
    async def register_consumer(
        self,
        consumer: "BaseMessageQueueConsumer",
    ) -> "StartConsumingCallable":
        """Register consumer to start consuming messages."""

    @abstractmethod
    async def deregister_consumer(self, consumer: "BaseMessageQueueConsumer") -> Any:
        """Deregister consumer to stop publishing messages)."""

    async def get_consumers(
        self,
        message_type: str,
    ) -> Sequence["BaseMessageQueueConsumer"]:
        """Gets list of consumers according to a message type."""
        raise NotImplementedError(
            "`get_consumers()` is not implemented for this class."
        )

    @abstractmethod
    async def processing_loop(self) -> None:
        """The processing loop for the service."""
        ...

    @abstractmethod
    async def launch_local(self) -> asyncio.Task:
        """Launch the service in-process."""
        ...

    @abstractmethod
    async def launch_server(self) -> None:
        """Launch the service as a server."""
        ...

    @abstractmethod
    async def cleanup_local(
        self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
    ) -> None:
        """Perform any cleanup before shutting down."""
        ...

    @abstractmethod
    def as_config(self) -> BaseModel:
        """Returns the config dict to reconstruct the message queue."""
        ...

publish async #

publish(message: QueueMessage, callback: Optional[PublishCallback] = None, **kwargs: Any) -> Any

Send message to a consumer.

Source code in llama_deploy/message_queues/base.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def publish(
    self,
    message: QueueMessage,
    callback: Optional[PublishCallback] = None,
    **kwargs: Any,
) -> Any:
    """Send message to a consumer."""
    logger.info(
        f"Publishing message to '{message.type}' with action '{message.action}'"
    )
    logger.debug(f"Message: {message.model_dump()}")

    message.stats.publish_time = message.stats.timestamp_str()
    await self._publish(message)

    if callback:
        if inspect.iscoroutinefunction(callback):
            await callback(message, **kwargs)
        else:
            callback(message, **kwargs)

register_consumer abstractmethod async #

register_consumer(consumer: BaseMessageQueueConsumer) -> StartConsumingCallable

Register consumer to start consuming messages.

Source code in llama_deploy/message_queues/base.py
84
85
86
87
88
89
@abstractmethod
async def register_consumer(
    self,
    consumer: "BaseMessageQueueConsumer",
) -> "StartConsumingCallable":
    """Register consumer to start consuming messages."""

deregister_consumer abstractmethod async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

Deregister consumer to stop publishing messages).

Source code in llama_deploy/message_queues/base.py
91
92
93
@abstractmethod
async def deregister_consumer(self, consumer: "BaseMessageQueueConsumer") -> Any:
    """Deregister consumer to stop publishing messages)."""

get_consumers async #

get_consumers(message_type: str) -> Sequence[BaseMessageQueueConsumer]

Gets list of consumers according to a message type.

Source code in llama_deploy/message_queues/base.py
 95
 96
 97
 98
 99
100
101
102
async def get_consumers(
    self,
    message_type: str,
) -> Sequence["BaseMessageQueueConsumer"]:
    """Gets list of consumers according to a message type."""
    raise NotImplementedError(
        "`get_consumers()` is not implemented for this class."
    )

processing_loop abstractmethod async #

processing_loop() -> None

The processing loop for the service.

Source code in llama_deploy/message_queues/base.py
104
105
106
107
@abstractmethod
async def processing_loop(self) -> None:
    """The processing loop for the service."""
    ...

launch_local abstractmethod async #

launch_local() -> Task

Launch the service in-process.

Source code in llama_deploy/message_queues/base.py
109
110
111
112
@abstractmethod
async def launch_local(self) -> asyncio.Task:
    """Launch the service in-process."""
    ...

launch_server abstractmethod async #

launch_server() -> None

Launch the service as a server.

Source code in llama_deploy/message_queues/base.py
114
115
116
117
@abstractmethod
async def launch_server(self) -> None:
    """Launch the service as a server."""
    ...

cleanup_local abstractmethod async #

cleanup_local(message_types: List[str], *args: Any, **kwargs: Dict[str, Any]) -> None

Perform any cleanup before shutting down.

Source code in llama_deploy/message_queues/base.py
119
120
121
122
123
124
@abstractmethod
async def cleanup_local(
    self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
) -> None:
    """Perform any cleanup before shutting down."""
    ...

as_config abstractmethod #

as_config() -> BaseModel

Returns the config dict to reconstruct the message queue.

Source code in llama_deploy/message_queues/base.py
126
127
128
129
@abstractmethod
def as_config(self) -> BaseModel:
    """Returns the config dict to reconstruct the message queue."""
    ...