Skip to content

message_consumers#

BaseMessageQueueConsumer #

Bases: BaseModel, ABC

Consumer of a MessageQueue.

Process messages from a MessageQueue for a specific message type.

Parameters:

Name Type Description Default
id_ str
'104dd9d3-47bd-44af-bd8e-d95d6801b8e2'
message_type str

Type of the message to consume.

'default'
channel Any

The channel if any for which to receive messages.

None
consuming_callable Callable[..., Coroutine[Any, Any, None]]
<function default_start_consuming_callable at 0x7fd6ea334cc0>
Source code in llama_deploy/message_consumers/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseMessageQueueConsumer(BaseModel, ABC):
    """Consumer of a MessageQueue.

    Process messages from a MessageQueue for a specific message type.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    id_: str = Field(default_factory=generate_id)
    message_type: str = Field(
        default="default", description="Type of the message to consume."
    )
    channel: Any = Field(
        default=None, description="The channel if any for which to receive messages."
    )
    consuming_callable: StartConsumingCallable = Field(
        default=default_start_consuming_callable
    )

    @abstractmethod
    async def _process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
        """Subclasses should implement logic here."""

    async def process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
        """Logic for processing message."""
        if message.type != self.message_type:
            msg = f"Consumer cannot process messages of type '{message.type}'."
            raise ValueError(msg)
        return await self._process_message(message, **kwargs)

    async def start_consuming(
        self,
    ) -> None:
        """Begin consuming messages."""
        await self.consuming_callable()

process_message async #

process_message(message: QueueMessage, **kwargs: Any) -> Any

Logic for processing message.

Source code in llama_deploy/message_consumers/base.py
43
44
45
46
47
48
async def process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
    """Logic for processing message."""
    if message.type != self.message_type:
        msg = f"Consumer cannot process messages of type '{message.type}'."
        raise ValueError(msg)
    return await self._process_message(message, **kwargs)

start_consuming async #

start_consuming() -> None

Begin consuming messages.

Source code in llama_deploy/message_consumers/base.py
50
51
52
53
54
async def start_consuming(
    self,
) -> None:
    """Begin consuming messages."""
    await self.consuming_callable()

CallableMessageConsumer #

Bases: BaseMessageQueueConsumer

Message consumer for a callable handler.

For a given message, it will call the handler with the message as input.

Parameters:

Name Type Description Default
handler Callable
required
Source code in llama_deploy/message_consumers/callable.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class CallableMessageConsumer(BaseMessageQueueConsumer):
    """Message consumer for a callable handler.

    For a given message, it will call the handler with the message as input.
    """

    handler: Callable

    async def _process_message(self, message: QueueMessage, **kwargs: Any) -> None:
        if asyncio.iscoroutinefunction(self.handler):
            await self.handler(message, **kwargs)
        else:
            self.handler(message, **kwargs)