Skip to content

redis#

Redis Message Queue.

RedisMessageQueueConfig #

Bases: BaseSettings

Redis message queue configuration.

Parameters:

Name Type Description Default
type Literal[str]
'redis'
url str
'redis://localhost:6379'
host str | None
None
port int | None
None
db int | None
None
username str | None
None
password str | None
None
ssl bool | None
None
Source code in llama_deploy/message_queues/redis.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RedisMessageQueueConfig(BaseSettings):
    """Redis message queue configuration."""

    model_config = SettingsConfigDict(env_prefix="REDIS_")

    type: Literal["redis"] = Field(default="redis", exclude=True)
    url: str = "redis://localhost:6379"
    host: str | None = None
    port: int | None = None
    db: int | None = None
    username: str | None = None
    password: str | None = None
    ssl: bool | None = None

    def model_post_init(self, __context: Any) -> None:
        if self.host and self.port:
            scheme = "rediss" if self.ssl else "redis"
            auth = (
                f"{self.username}:{self.password}@"
                if self.username and self.password
                else ""
            )
            self.url = f"{scheme}://{auth}{self.host}:{self.port}/{self.db or ''}"

RedisConsumerMetadata #

Bases: BaseModel

Parameters:

Name Type Description Default
message_type str
required
start_consuming_callable Callable[..., Coroutine[Any, Any, None]]
required
pubsub Any
None
topic str
required
Source code in llama_deploy/message_queues/redis.py
46
47
48
49
50
class RedisConsumerMetadata(BaseModel):
    message_type: str
    start_consuming_callable: StartConsumingCallable
    pubsub: Any = None
    topic: str

RedisMessageQueue #

Bases: AbstractMessageQueue

Redis integration for message queue.

This class uses Redis Pub/Sub functionality for message distribution.

Examples:

from llama_deploy.message_queues.redis import RedisMessageQueue

message_queue = RedisMessageQueue()  # uses the default url
Source code in llama_deploy/message_queues/redis.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class RedisMessageQueue(AbstractMessageQueue):
    """Redis integration for message queue.

    This class uses Redis Pub/Sub functionality for message distribution.

    Examples:
        ```python
        from llama_deploy.message_queues.redis import RedisMessageQueue

        message_queue = RedisMessageQueue()  # uses the default url
        ```
    """

    def __init__(self, config: RedisMessageQueueConfig | None = None) -> None:
        self._config = config or RedisMessageQueueConfig()
        self._consumers: dict[str, RedisConsumerMetadata] = {}

        try:
            from redis.asyncio import Redis

            self._redis: Redis = Redis.from_url(self._config.url)
        except ImportError:
            msg = "Missing redis optional dependency. Please install by running `pip install llama-deploy[redis]`."
            raise ValueError(msg)

    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Publish message to the Redis channel."""
        message_json = json.dumps(message.model_dump())
        result = await self._redis.publish(topic, message_json)
        logger.info(
            f"Published message {message.id_} to topic {topic} with {result} subscribers"
        )
        return result

    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer, topic: str
    ) -> StartConsumingCallable:
        """Register a new consumer."""
        if consumer.id_ in self._consumers:
            logger.debug(
                f"Consumer {consumer.id_} already registered for topic {topic}",
            )
            return self._consumers[consumer.id_].start_consuming_callable

        pubsub = self._redis.pubsub()
        await pubsub.subscribe(topic)

        async def start_consuming_callable() -> None:
            """StartConsumingCallable.

            Consumer of this queue should call this in order to start consuming.
            """
            try:
                while True:
                    message = await pubsub.get_message(ignore_subscribe_messages=True)
                    if message:
                        decoded_message = json.loads(message["data"])
                        queue_message = QueueMessage.model_validate(decoded_message)
                        await consumer.process_message(queue_message)
                    await asyncio.sleep(0.01)
            finally:
                return

        logger.info(
            f"Registered consumer {consumer.id_} for topic {topic}",
        )

        self._consumers[consumer.id_] = RedisConsumerMetadata(
            message_type=consumer.message_type,
            start_consuming_callable=start_consuming_callable,
            pubsub=pubsub,
            topic=topic,
        )

        return start_consuming_callable

    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        """Deregister a consumer."""
        consumer_metadata = self._consumers.pop(consumer.id_, None)
        if consumer_metadata is not None:
            await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
            logger.info(
                f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
            )

    async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
        """Perform any cleanup before shutting down."""
        for consumer_metadata in self._consumers.values():
            if consumer_metadata.pubsub:
                await consumer_metadata.pubsub.unsubscribe()
                await consumer_metadata.pubsub.aclose()

        # Clear consumers
        self._consumers = {}

        # Close main Redis connection
        await self._redis.aclose()  # type: ignore  # mypy doesn't see the async method for some reason

    def as_config(self) -> BaseModel:
        return self._config

register_consumer async #

register_consumer(consumer: BaseMessageQueueConsumer, topic: str) -> StartConsumingCallable

Register a new consumer.

Source code in llama_deploy/message_queues/redis.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
    """Register a new consumer."""
    if consumer.id_ in self._consumers:
        logger.debug(
            f"Consumer {consumer.id_} already registered for topic {topic}",
        )
        return self._consumers[consumer.id_].start_consuming_callable

    pubsub = self._redis.pubsub()
    await pubsub.subscribe(topic)

    async def start_consuming_callable() -> None:
        """StartConsumingCallable.

        Consumer of this queue should call this in order to start consuming.
        """
        try:
            while True:
                message = await pubsub.get_message(ignore_subscribe_messages=True)
                if message:
                    decoded_message = json.loads(message["data"])
                    queue_message = QueueMessage.model_validate(decoded_message)
                    await consumer.process_message(queue_message)
                await asyncio.sleep(0.01)
        finally:
            return

    logger.info(
        f"Registered consumer {consumer.id_} for topic {topic}",
    )

    self._consumers[consumer.id_] = RedisConsumerMetadata(
        message_type=consumer.message_type,
        start_consuming_callable=start_consuming_callable,
        pubsub=pubsub,
        topic=topic,
    )

    return start_consuming_callable

deregister_consumer async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

Deregister a consumer.

Source code in llama_deploy/message_queues/redis.py
129
130
131
132
133
134
135
136
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
    """Deregister a consumer."""
    consumer_metadata = self._consumers.pop(consumer.id_, None)
    if consumer_metadata is not None:
        await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
        logger.info(
            f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
        )

cleanup async #

cleanup(*args: Any, **kwargs: dict[str, Any]) -> None

Perform any cleanup before shutting down.

Source code in llama_deploy/message_queues/redis.py
138
139
140
141
142
143
144
145
146
147
148
149
async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
    """Perform any cleanup before shutting down."""
    for consumer_metadata in self._consumers.values():
        if consumer_metadata.pubsub:
            await consumer_metadata.pubsub.unsubscribe()
            await consumer_metadata.pubsub.aclose()

    # Clear consumers
    self._consumers = {}

    # Close main Redis connection
    await self._redis.aclose()  # type: ignore  # mypy doesn't see the async method for some reason