Skip to content

Index

Message queue module.

MessageProcessor #

Bases: Protocol

Protocol for a callable that processes messages.

Source code in llama_deploy/llama_deploy/message_queues/base.py
32
33
34
35
36
class MessageProcessor(Protocol):
    """Protocol for a callable that processes messages."""

    def __call__(self, message: QueueMessage, **kwargs: Any) -> None:
        ...

PublishCallback #

Bases: Protocol

Protocol for a callable that processes messages.

TODO: Variant for Async Publish Callback.

Source code in llama_deploy/llama_deploy/message_queues/base.py
39
40
41
42
43
44
45
46
class PublishCallback(Protocol):
    """Protocol for a callable that processes messages.

    TODO: Variant for Async Publish Callback.
    """

    def __call__(self, message: QueueMessage, **kwargs: Any) -> None:
        ...

BaseMessageQueue #

Bases: BaseModel, ABC

Message broker interface between publisher and consumer.

Source code in llama_deploy/llama_deploy/message_queues/base.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class BaseMessageQueue(BaseModel, ABC):
    """Message broker interface between publisher and consumer."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @abstractmethod
    async def _publish(self, message: QueueMessage) -> Any:
        """Subclasses implement publish logic here."""
        ...

    async def publish(
        self,
        message: QueueMessage,
        callback: Optional[PublishCallback] = None,
        **kwargs: Any,
    ) -> Any:
        """Send message to a consumer."""
        logger.info(
            f"Publishing message to '{message.type}' with action '{message.action}'"
        )
        logger.debug(f"Message: {message.model_dump()}")

        message.stats.publish_time = message.stats.timestamp_str()
        await self._publish(message)

        if callback:
            if inspect.iscoroutinefunction(callback):
                await callback(message, **kwargs)
            else:
                callback(message, **kwargs)

    @abstractmethod
    async def register_consumer(
        self,
        consumer: "BaseMessageQueueConsumer",
    ) -> "StartConsumingCallable":
        """Register consumer to start consuming messages."""

    @abstractmethod
    async def deregister_consumer(self, consumer: "BaseMessageQueueConsumer") -> Any:
        """Deregister consumer to stop publishing messages)."""

    async def get_consumers(
        self,
        message_type: str,
    ) -> List["BaseMessageQueueConsumer"]:
        """Gets list of consumers according to a message type."""
        raise NotImplementedError(
            "`get_consumers()` is not implemented for this class."
        )

    @abstractmethod
    async def processing_loop(self) -> None:
        """The processing loop for the service."""
        ...

    @abstractmethod
    async def launch_local(self) -> asyncio.Task:
        """Launch the service in-process."""
        ...

    @abstractmethod
    async def launch_server(self) -> None:
        """Launch the service as a server."""
        ...

    @abstractmethod
    async def cleanup_local(
        self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
    ) -> None:
        """Perform any cleanup before shutting down."""
        ...

    @abstractmethod
    def as_config(self) -> BaseModel:
        """Returns the config dict to reconstruct the message queue."""
        ...

publish async #

publish(message: QueueMessage, callback: Optional[PublishCallback] = None, **kwargs: Any) -> Any

Send message to a consumer.

Source code in llama_deploy/llama_deploy/message_queues/base.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def publish(
    self,
    message: QueueMessage,
    callback: Optional[PublishCallback] = None,
    **kwargs: Any,
) -> Any:
    """Send message to a consumer."""
    logger.info(
        f"Publishing message to '{message.type}' with action '{message.action}'"
    )
    logger.debug(f"Message: {message.model_dump()}")

    message.stats.publish_time = message.stats.timestamp_str()
    await self._publish(message)

    if callback:
        if inspect.iscoroutinefunction(callback):
            await callback(message, **kwargs)
        else:
            callback(message, **kwargs)

register_consumer abstractmethod async #

register_consumer(consumer: BaseMessageQueueConsumer) -> StartConsumingCallable

Register consumer to start consuming messages.

Source code in llama_deploy/llama_deploy/message_queues/base.py
80
81
82
83
84
85
@abstractmethod
async def register_consumer(
    self,
    consumer: "BaseMessageQueueConsumer",
) -> "StartConsumingCallable":
    """Register consumer to start consuming messages."""

deregister_consumer abstractmethod async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

Deregister consumer to stop publishing messages).

Source code in llama_deploy/llama_deploy/message_queues/base.py
87
88
89
@abstractmethod
async def deregister_consumer(self, consumer: "BaseMessageQueueConsumer") -> Any:
    """Deregister consumer to stop publishing messages)."""

get_consumers async #

get_consumers(message_type: str) -> List[BaseMessageQueueConsumer]

Gets list of consumers according to a message type.

Source code in llama_deploy/llama_deploy/message_queues/base.py
91
92
93
94
95
96
97
98
async def get_consumers(
    self,
    message_type: str,
) -> List["BaseMessageQueueConsumer"]:
    """Gets list of consumers according to a message type."""
    raise NotImplementedError(
        "`get_consumers()` is not implemented for this class."
    )

processing_loop abstractmethod async #

processing_loop() -> None

The processing loop for the service.

Source code in llama_deploy/llama_deploy/message_queues/base.py
100
101
102
103
@abstractmethod
async def processing_loop(self) -> None:
    """The processing loop for the service."""
    ...

launch_local abstractmethod async #

launch_local() -> Task

Launch the service in-process.

Source code in llama_deploy/llama_deploy/message_queues/base.py
105
106
107
108
@abstractmethod
async def launch_local(self) -> asyncio.Task:
    """Launch the service in-process."""
    ...

launch_server abstractmethod async #

launch_server() -> None

Launch the service as a server.

Source code in llama_deploy/llama_deploy/message_queues/base.py
110
111
112
113
@abstractmethod
async def launch_server(self) -> None:
    """Launch the service as a server."""
    ...

cleanup_local abstractmethod async #

cleanup_local(message_types: List[str], *args: Any, **kwargs: Dict[str, Any]) -> None

Perform any cleanup before shutting down.

Source code in llama_deploy/llama_deploy/message_queues/base.py
115
116
117
118
119
120
@abstractmethod
async def cleanup_local(
    self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
) -> None:
    """Perform any cleanup before shutting down."""
    ...

as_config abstractmethod #

as_config() -> BaseModel

Returns the config dict to reconstruct the message queue.

Source code in llama_deploy/llama_deploy/message_queues/base.py
122
123
124
125
@abstractmethod
def as_config(self) -> BaseModel:
    """Returns the config dict to reconstruct the message queue."""
    ...

options: members: - BaseMessageQueue