Skip to content

Redis

Redis Message Queue.

RedisMessageQueueConfig #

Bases: BaseSettings

Redis message queue configuration.

Source code in llama-agents/llama_deploy/message_queues/redis.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class RedisMessageQueueConfig(BaseSettings):
    """Redis message queue configuration."""

    model_config = SettingsConfigDict(env_prefix="REDIS_")

    url: str = DEFAULT_URL
    host: Optional[str] = None
    port: Optional[int] = None
    db: Optional[int] = None
    username: Optional[str] = None
    password: Optional[str] = None
    ssl: Optional[bool] = None

    def model_post_init(self, __context: Any) -> None:
        if self.host and self.port:
            scheme = "rediss" if self.ssl else "redis"
            auth = (
                f"{self.username}:{self.password}@"
                if self.username and self.password
                else ""
            )
            self.url = f"{scheme}://{auth}{self.host}:{self.port}/{self.db or ''}"

RedisMessageQueue #

Bases: BaseMessageQueue

Redis integration for message queue.

This class uses Redis Pub/Sub functionality for message distribution.

Attributes:

Name Type Description
url str

The Redis URL string to connect to the Redis server

redis Redis

The Redis connection

Examples:

from llama_deploy.message_queues.redis import RedisMessageQueue

message_queue = RedisMessageQueue()  # uses the default url
Source code in llama-agents/llama_deploy/message_queues/redis.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class RedisMessageQueue(BaseMessageQueue):
    """Redis integration for message queue.

    This class uses Redis Pub/Sub functionality for message distribution.

    Attributes:
        url (str): The Redis URL string to connect to the Redis server
        redis (redis.Redis): The Redis connection

    Examples:
        ```python
        from llama_deploy.message_queues.redis import RedisMessageQueue

        message_queue = RedisMessageQueue()  # uses the default url
        ```
    """

    url: str = DEFAULT_URL
    _redis: Optional["redis.Redis"] = PrivateAttr(None)

    def __init__(
        self,
        url: str = DEFAULT_URL,
        redis: Optional["redis.Redis"] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(url=url)
        self._redis = redis
        self._consumers: Dict[str, RedisConsumerMetadata] = {}

    @classmethod
    def from_url_params(
        cls,
        host: str,
        port: int = 6379,
        db: int = 0,
        username: Optional[str] = None,
        password: Optional[str] = None,
        ssl: bool = False,
    ) -> "RedisMessageQueue":
        """Convenience constructor from url params."""
        scheme = "rediss" if ssl else "redis"
        auth = f"{username}:{password}@" if username and password else ""
        url = f"{scheme}://{auth}{host}:{port}/{db}"
        return cls(url=url)

    async def new_connection(self) -> "redis.Redis":
        """Returns a new connection to the Redis server."""
        if self._redis is None:
            self._redis = await _establish_connection(self.url)
        return self._redis

    async def _publish(self, message: QueueMessage) -> Any:
        """Publish message to the Redis channel."""
        redis = await self.new_connection()
        message_json = json.dumps(message.model_dump())
        result = await redis.publish(message.type, message_json)
        logger.info(
            f"Published message {message.id_} to {message.type} channel with {result} subscribers"
        )
        return result

    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer
    ) -> StartConsumingCallable:
        """Register a new consumer."""
        if consumer.id_ in self._consumers:
            logger.debug(
                f"Consumer {consumer.id_} already registered for {consumer.message_type} messages",
            )
            return self._consumers[consumer.id_].start_consuming_callable

        redis = await self.new_connection()
        pubsub = redis.pubsub()
        await pubsub.subscribe(consumer.message_type)

        async def start_consuming_callable() -> None:
            """StartConsumingCallable.

            Consumer of this queue should call this in order to start consuming.
            """
            while True:
                message = await pubsub.get_message(ignore_subscribe_messages=True)
                if message:
                    decoded_message = json.loads(message["data"])
                    queue_message = QueueMessage.model_validate(decoded_message)
                    await consumer.process_message(queue_message)
                await asyncio.sleep(0.01)

        logger.info(
            f"Registered consumer {consumer.id_} for {consumer.message_type} messages",
        )

        self._consumers[consumer.id_] = RedisConsumerMetadata(
            message_type=consumer.message_type,
            start_consuming_callable=start_consuming_callable,
            pubsub=pubsub,
        )

        return start_consuming_callable

    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        """Deregister a consumer."""
        if consumer.id_ in self._consumers:
            await self._consumers[consumer.id_].pubsub.unsubscribe(
                consumer.message_type
            )
            del self._consumers[consumer.id_]
            logger.info(
                f"Deregistered consumer {consumer.id_} for {consumer.message_type} messages",
            )

    async def processing_loop(self) -> None:
        """A loop for getting messages from queues and sending to consumer.

        Not relevant for this class as Redis uses pub/sub model.
        """
        pass

    async def launch_local(self) -> asyncio.Task:
        """Launch the message queue locally, in-process.

        Launches a dummy task.
        """
        return asyncio.create_task(self.processing_loop())

    async def launch_server(self) -> None:
        """Launch the message queue server.

        Not relevant for this class. Redis server should be running separately."""
        pass

    async def cleanup_local(
        self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
    ) -> None:
        """Perform any cleanup before shutting down."""
        if self._redis:
            await self._redis.close()

        self._redis = None
        self._consumers = {}

    def as_config(self) -> BaseModel:
        return RedisMessageQueueConfig(url=self.url)

from_url_params classmethod #

from_url_params(host: str, port: int = 6379, db: int = 0, username: Optional[str] = None, password: Optional[str] = None, ssl: bool = False) -> RedisMessageQueue

Convenience constructor from url params.

Source code in llama-agents/llama_deploy/message_queues/redis.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def from_url_params(
    cls,
    host: str,
    port: int = 6379,
    db: int = 0,
    username: Optional[str] = None,
    password: Optional[str] = None,
    ssl: bool = False,
) -> "RedisMessageQueue":
    """Convenience constructor from url params."""
    scheme = "rediss" if ssl else "redis"
    auth = f"{username}:{password}@" if username and password else ""
    url = f"{scheme}://{auth}{host}:{port}/{db}"
    return cls(url=url)

new_connection async #

new_connection() -> Redis

Returns a new connection to the Redis server.

Source code in llama-agents/llama_deploy/message_queues/redis.py
112
113
114
115
116
async def new_connection(self) -> "redis.Redis":
    """Returns a new connection to the Redis server."""
    if self._redis is None:
        self._redis = await _establish_connection(self.url)
    return self._redis

register_consumer async #

register_consumer(consumer: BaseMessageQueueConsumer) -> StartConsumingCallable

Register a new consumer.

Source code in llama-agents/llama_deploy/message_queues/redis.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer
) -> StartConsumingCallable:
    """Register a new consumer."""
    if consumer.id_ in self._consumers:
        logger.debug(
            f"Consumer {consumer.id_} already registered for {consumer.message_type} messages",
        )
        return self._consumers[consumer.id_].start_consuming_callable

    redis = await self.new_connection()
    pubsub = redis.pubsub()
    await pubsub.subscribe(consumer.message_type)

    async def start_consuming_callable() -> None:
        """StartConsumingCallable.

        Consumer of this queue should call this in order to start consuming.
        """
        while True:
            message = await pubsub.get_message(ignore_subscribe_messages=True)
            if message:
                decoded_message = json.loads(message["data"])
                queue_message = QueueMessage.model_validate(decoded_message)
                await consumer.process_message(queue_message)
            await asyncio.sleep(0.01)

    logger.info(
        f"Registered consumer {consumer.id_} for {consumer.message_type} messages",
    )

    self._consumers[consumer.id_] = RedisConsumerMetadata(
        message_type=consumer.message_type,
        start_consuming_callable=start_consuming_callable,
        pubsub=pubsub,
    )

    return start_consuming_callable

deregister_consumer async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

Deregister a consumer.

Source code in llama-agents/llama_deploy/message_queues/redis.py
167
168
169
170
171
172
173
174
175
176
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
    """Deregister a consumer."""
    if consumer.id_ in self._consumers:
        await self._consumers[consumer.id_].pubsub.unsubscribe(
            consumer.message_type
        )
        del self._consumers[consumer.id_]
        logger.info(
            f"Deregistered consumer {consumer.id_} for {consumer.message_type} messages",
        )

processing_loop async #

processing_loop() -> None

A loop for getting messages from queues and sending to consumer.

Not relevant for this class as Redis uses pub/sub model.

Source code in llama-agents/llama_deploy/message_queues/redis.py
178
179
180
181
182
183
async def processing_loop(self) -> None:
    """A loop for getting messages from queues and sending to consumer.

    Not relevant for this class as Redis uses pub/sub model.
    """
    pass

launch_local async #

launch_local() -> Task

Launch the message queue locally, in-process.

Launches a dummy task.

Source code in llama-agents/llama_deploy/message_queues/redis.py
185
186
187
188
189
190
async def launch_local(self) -> asyncio.Task:
    """Launch the message queue locally, in-process.

    Launches a dummy task.
    """
    return asyncio.create_task(self.processing_loop())

launch_server async #

launch_server() -> None

Launch the message queue server.

Not relevant for this class. Redis server should be running separately.

Source code in llama-agents/llama_deploy/message_queues/redis.py
192
193
194
195
196
async def launch_server(self) -> None:
    """Launch the message queue server.

    Not relevant for this class. Redis server should be running separately."""
    pass

cleanup_local async #

cleanup_local(message_types: List[str], *args: Any, **kwargs: Dict[str, Any]) -> None

Perform any cleanup before shutting down.

Source code in llama-agents/llama_deploy/message_queues/redis.py
198
199
200
201
202
203
204
205
206
async def cleanup_local(
    self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any]
) -> None:
    """Perform any cleanup before shutting down."""
    if self._redis:
        await self._redis.close()

    self._redis = None
    self._consumers = {}

options: members: - RedisMessageQueueConfig - RedisMessageQueue