Skip to content

simple#

SimpleMessageQueue #

Bases: AbstractMessageQueue

Remote client to be used with a SimpleMessageQueue server.

Source code in llama_deploy/message_queues/simple/client.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class SimpleMessageQueue(AbstractMessageQueue):
    """Remote client to be used with a SimpleMessageQueue server."""

    def __init__(
        self, config: SimpleMessageQueueConfig = SimpleMessageQueueConfig()
    ) -> None:
        self._config = config
        self._consumers: dict[str, dict[str, BaseMessageQueueConsumer]] = {}

    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Sends a message to the SimpleMessageQueueServer."""
        url = f"{self._config.base_url}messages/{topic}"
        async with httpx.AsyncClient(**self._config.client_kwargs) as client:
            result = await client.post(url, json=message.model_dump())
        return result

    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer, topic: str
    ) -> StartConsumingCallable:
        """Register a new consumer."""
        # register topic
        if topic not in self._consumers:
            # call the server to create it
            url = f"{self._config.base_url}topics/{topic}"
            async with httpx.AsyncClient(**self._config.client_kwargs) as client:
                result = await client.post(url)
                result.raise_for_status()

            self._consumers[topic] = {}

        if consumer.id_ in self._consumers[topic]:
            msg = f"Consumer {consumer.id_} already registered for topic {topic}"
            raise ValueError(msg)

        self._consumers[topic][consumer.id_] = consumer
        logger.info(
            f"Consumer '{consumer.id_}' for type '{consumer.message_type}' on topic '{topic}' has been registered."
        )

        async def start_consuming_callable() -> None:
            """StartConsumingCallable.

            Consumer of this queue should call this in order to start consuming.
            """
            url = f"{self._config.base_url}messages/{topic}"
            async with httpx.AsyncClient(**self._config.client_kwargs) as client:
                while True:
                    try:
                        result = await client.get(url)
                        result.raise_for_status()
                        if result.json():
                            message = QueueMessage.model_validate(result.json())
                            await consumer.process_message(message)
                        await asyncio.sleep(0.1)
                    except asyncio.exceptions.CancelledError:
                        break

        return start_consuming_callable

    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        for topic, consumers in self._consumers.copy().items():
            if consumer.id_ in consumers:
                del self._consumers[topic][consumer.id_]

    async def cleanup(self, *args: Any, **kwargs: Dict[str, Any]) -> None:
        # Nothing to clean up
        pass

    def as_config(self) -> SimpleMessageQueueConfig:
        return self._config

register_consumer async #

register_consumer(consumer: BaseMessageQueueConsumer, topic: str) -> StartConsumingCallable

Register a new consumer.

Source code in llama_deploy/message_queues/simple/client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
    """Register a new consumer."""
    # register topic
    if topic not in self._consumers:
        # call the server to create it
        url = f"{self._config.base_url}topics/{topic}"
        async with httpx.AsyncClient(**self._config.client_kwargs) as client:
            result = await client.post(url)
            result.raise_for_status()

        self._consumers[topic] = {}

    if consumer.id_ in self._consumers[topic]:
        msg = f"Consumer {consumer.id_} already registered for topic {topic}"
        raise ValueError(msg)

    self._consumers[topic][consumer.id_] = consumer
    logger.info(
        f"Consumer '{consumer.id_}' for type '{consumer.message_type}' on topic '{topic}' has been registered."
    )

    async def start_consuming_callable() -> None:
        """StartConsumingCallable.

        Consumer of this queue should call this in order to start consuming.
        """
        url = f"{self._config.base_url}messages/{topic}"
        async with httpx.AsyncClient(**self._config.client_kwargs) as client:
            while True:
                try:
                    result = await client.get(url)
                    result.raise_for_status()
                    if result.json():
                        message = QueueMessage.model_validate(result.json())
                        await consumer.process_message(message)
                    await asyncio.sleep(0.1)
                except asyncio.exceptions.CancelledError:
                    break

    return start_consuming_callable

SimpleMessageQueueConfig #

Bases: BaseSettings

Simple message queue configuration.

Parameters:

Name Type Description Default
type Literal[str]
'simple'
host str
'127.0.0.1'
port int
8001
raise_exceptions bool
False
use_ssl bool
False
Source code in llama_deploy/message_queues/simple/config.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class SimpleMessageQueueConfig(BaseSettings):
    """Simple message queue configuration."""

    model_config = SettingsConfigDict(env_prefix="SIMPLE_MESSAGE_QUEUE_")

    type: Literal["simple"] = Field(default="simple", exclude=True)
    host: str = "127.0.0.1"
    port: int = 8001
    client_kwargs: dict[str, Any] = Field(default_factory=dict)
    raise_exceptions: bool = False
    use_ssl: bool = False

    @property
    def base_url(self) -> str:
        protocol = "https" if self.use_ssl else "http"
        if self.port != 80:
            return f"{protocol}://{self.host}:{self.port}/"
        return f"{protocol}://{self.host}/"

SimpleMessageQueueServer #

SimpleMessageQueueServer.

An in-memory message queue that implements a push model for consumers.

When registering, a specific queue for a consumer is created. When a message is published, it is added to the queue for the given message type.

When launched as a server, exposes the following endpoints: - GET /: Home endpoint - POST /register_consumer: Register a consumer - POST /deregister_consumer: Deregister a consumer - GET /get_consumers/{message_type}: Get consumers for a message type - POST /publish: Publish a message

Source code in llama_deploy/message_queues/simple/server.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class SimpleMessageQueueServer:
    """SimpleMessageQueueServer.

    An in-memory message queue that implements a push model for consumers.

    When registering, a specific queue for a consumer is created.
    When a message is published, it is added to the queue for the given message type.

    When launched as a server, exposes the following endpoints:
    - GET `/`: Home endpoint
    - POST `/register_consumer`: Register a consumer
    - POST `/deregister_consumer`: Deregister a consumer
    - GET `/get_consumers/{message_type}`: Get consumers for a message type
    - POST `/publish`: Publish a message
    """

    def __init__(self, config: SimpleMessageQueueConfig = SimpleMessageQueueConfig()):
        self._config = config
        self._consumers: dict[str, dict[str, BaseMessageQueueConsumer]] = {}
        self._queues: dict[str, deque] = {}
        self._running = False
        self._app = FastAPI()

        self._app.add_api_route(
            "/",
            self._home,
            methods=["GET"],
        )
        self._app.add_api_route(
            "/topics/{topic}",
            self._create_topic,
            methods=["POST"],
        )
        self._app.add_api_route(
            "/messages/{topic}",
            self._publish,
            methods=["POST"],
        )
        self._app.add_api_route(
            "/messages/{topic}",
            self._get_messages,
            methods=["GET"],
        )

    async def launch_server(self) -> None:
        """Launch the message queue as a FastAPI server."""
        logger.info(f"Launching message queue server at {self._config.base_url}")
        self._running = True

        cfg = uvicorn.Config(
            self._app, host=self._config.host, port=self._config.port or 80
        )
        server = uvicorn.Server(cfg)

        try:
            await server.serve()
        except asyncio.CancelledError:
            self._running = False
            await asyncio.gather(server.shutdown(), return_exceptions=True)

    #
    # HTTP API endpoints
    #

    async def _home(self) -> Dict[str, str]:
        return {
            "service_name": "message_queue",
            "description": "Message queue for multi-agent system",
        }

    async def _create_topic(self, topic: str) -> Any:
        if topic in self._queues:
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail="A consumer with the same url has previously been registered.",
            )

        self._queues[topic] = deque()

    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Publish message to a queue."""
        if topic not in self._queues:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail=f"topic {topic} not found"
            )

        self._queues[topic].append(message)

    async def _get_messages(self, topic: str) -> QueueMessage | None:
        if topic not in self._queues:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail=f"topic {topic} not found"
            )
        if queue := self._queues[topic]:
            message: QueueMessage = queue.popleft()
            return message

        return None

launch_server async #

launch_server() -> None

Launch the message queue as a FastAPI server.

Source code in llama_deploy/message_queues/simple/server.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def launch_server(self) -> None:
    """Launch the message queue as a FastAPI server."""
    logger.info(f"Launching message queue server at {self._config.base_url}")
    self._running = True

    cfg = uvicorn.Config(
        self._app, host=self._config.host, port=self._config.port or 80
    )
    server = uvicorn.Server(cfg)

    try:
        await server.serve()
    except asyncio.CancelledError:
        self._running = False
        await asyncio.gather(server.shutdown(), return_exceptions=True)