Skip to content

Redis

RedisKVStore #

Bases: BaseKVStore

Redis KV Store.

Parameters:

Name Type Description Default
redis_client Any

Redis client

required
redis_url Optional[str]

Redis server URI

required

Raises:

Type Description
ValueError

If redis-py is not installed

Examples:

>>> from llama_index.storage.kvstore.redis import RedisKVStore
>>> # Create a RedisKVStore
>>> redis_kv_store = RedisKVStore(
>>>     redis_url="redis://127.0.0.1:6379")
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class RedisKVStore(BaseKVStore):
    """Redis KV Store.

    Args:
        redis_client (Any): Redis client
        redis_url (Optional[str]): Redis server URI

    Raises:
            ValueError: If redis-py is not installed

    Examples:
        >>> from llama_index.storage.kvstore.redis import RedisKVStore
        >>> # Create a RedisKVStore
        >>> redis_kv_store = RedisKVStore(
        >>>     redis_url="redis://127.0.0.1:6379")

    """

    def __init__(
        self,
        redis_uri: Optional[str] = "redis://127.0.0.1:6379",
        **kwargs: Any,
    ) -> None:
        # user could inject customized redis client.
        # for instance, redis have specific TLS connection, etc.
        if "redis_client" in kwargs:
            self._redis_client = cast(Redis, kwargs["redis_client"])
        elif redis_uri is not None:
            # otherwise, try initializing redis client
            try:
                # connect to redis from url
                self._redis_client = Redis.from_url(redis_uri, **kwargs)
            except ValueError as e:
                raise ValueError(f"Redis failed to connect: {e}")
        else:
            raise ValueError("Either 'redis_client' or redis_url must be provided.")

    def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
        """Put a key-value pair into the store.

        Args:
            key (str): key
            val (dict): value
            collection (str): collection name

        """
        self._redis_client.hset(name=collection, key=key, value=json.dumps(val))

    async def aput(
        self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
    ) -> None:
        """Put a key-value pair into the store.

        Args:
            key (str): key
            val (dict): value
            collection (str): collection name

        """
        raise NotImplementedError

    def put_all(
        self,
        kv_pairs: List[Tuple[str, dict]],
        collection: str = DEFAULT_COLLECTION,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> None:
        """Put a dictionary of key-value pairs into the store.

        Args:
            kv_pairs (List[Tuple[str, dict]]): key-value pairs
            collection (str): collection name

        """
        with self._redis_client.pipeline() as pipe:
            cur_batch = 0
            for key, val in kv_pairs:
                pipe.hset(name=collection, key=key, value=json.dumps(val))
                cur_batch += 1

                if cur_batch >= batch_size:
                    cur_batch = 0
                    pipe.execute()

            if cur_batch > 0:
                pipe.execute()

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """Get a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        val_str = self._redis_client.hget(name=collection, key=key)
        if val_str is None:
            return None
        return json.loads(val_str)

    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """Get a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        raise NotImplementedError

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """Get all values from the store."""
        collection_kv_dict = {}
        for key, val_str in self._redis_client.hscan_iter(name=collection):
            value = dict(json.loads(val_str))
            collection_kv_dict[key.decode()] = value
        return collection_kv_dict

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """Get all values from the store."""
        raise NotImplementedError

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """Delete a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        deleted_num = self._redis_client.hdel(collection, key)
        return bool(deleted_num > 0)

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """Delete a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        raise NotImplementedError

    @classmethod
    def from_host_and_port(
        cls,
        host: str,
        port: int,
    ) -> "RedisKVStore":
        """Load a RedisKVStore from a Redis host and port.

        Args:
            host (str): Redis host
            port (int): Redis port
        """
        url = f"redis://{host}:{port}".format(host=host, port=port)
        return cls(redis_uri=url)

    @classmethod
    def from_redis_client(cls, redis_client: Any) -> "RedisKVStore":
        """Load a RedisKVStore from a Redis Client.

        Args:
            redis_client (Redis): Redis client
        """
        return cls(redis_client=redis_client)

put #

put(key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None

Put a key-value pair into the store.

Parameters:

Name Type Description Default
key str

key

required
val dict

value

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
50
51
52
53
54
55
56
57
58
59
def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
    """Put a key-value pair into the store.

    Args:
        key (str): key
        val (dict): value
        collection (str): collection name

    """
    self._redis_client.hset(name=collection, key=key, value=json.dumps(val))

aput async #

aput(key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None

Put a key-value pair into the store.

Parameters:

Name Type Description Default
key str

key

required
val dict

value

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
61
62
63
64
65
66
67
68
69
70
71
72
async def aput(
    self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
) -> None:
    """Put a key-value pair into the store.

    Args:
        key (str): key
        val (dict): value
        collection (str): collection name

    """
    raise NotImplementedError

put_all #

put_all(kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE) -> None

Put a dictionary of key-value pairs into the store.

Parameters:

Name Type Description Default
kv_pairs List[Tuple[str, dict]]

key-value pairs

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def put_all(
    self,
    kv_pairs: List[Tuple[str, dict]],
    collection: str = DEFAULT_COLLECTION,
    batch_size: int = DEFAULT_BATCH_SIZE,
) -> None:
    """Put a dictionary of key-value pairs into the store.

    Args:
        kv_pairs (List[Tuple[str, dict]]): key-value pairs
        collection (str): collection name

    """
    with self._redis_client.pipeline() as pipe:
        cur_batch = 0
        for key, val in kv_pairs:
            pipe.hset(name=collection, key=key, value=json.dumps(val))
            cur_batch += 1

            if cur_batch >= batch_size:
                cur_batch = 0
                pipe.execute()

        if cur_batch > 0:
            pipe.execute()

get #

get(key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]

Get a value from the store.

Parameters:

Name Type Description Default
key str

key

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
100
101
102
103
104
105
106
107
108
109
110
111
def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
    """Get a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    val_str = self._redis_client.hget(name=collection, key=key)
    if val_str is None:
        return None
    return json.loads(val_str)

aget async #

aget(key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]

Get a value from the store.

Parameters:

Name Type Description Default
key str

key

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
113
114
115
116
117
118
119
120
121
122
123
async def aget(
    self, key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]:
    """Get a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    raise NotImplementedError

get_all #

get_all(collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]

Get all values from the store.

Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
125
126
127
128
129
130
131
def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """Get all values from the store."""
    collection_kv_dict = {}
    for key, val_str in self._redis_client.hscan_iter(name=collection):
        value = dict(json.loads(val_str))
        collection_kv_dict[key.decode()] = value
    return collection_kv_dict

aget_all async #

aget_all(collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]

Get all values from the store.

Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
133
134
135
async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """Get all values from the store."""
    raise NotImplementedError

delete #

delete(key: str, collection: str = DEFAULT_COLLECTION) -> bool

Delete a value from the store.

Parameters:

Name Type Description Default
key str

key

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
137
138
139
140
141
142
143
144
145
146
def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
    """Delete a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    deleted_num = self._redis_client.hdel(collection, key)
    return bool(deleted_num > 0)

adelete async #

adelete(key: str, collection: str = DEFAULT_COLLECTION) -> bool

Delete a value from the store.

Parameters:

Name Type Description Default
key str

key

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
148
149
150
151
152
153
154
155
156
async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
    """Delete a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    raise NotImplementedError

from_host_and_port classmethod #

from_host_and_port(host: str, port: int) -> RedisKVStore

Load a RedisKVStore from a Redis host and port.

Parameters:

Name Type Description Default
host str

Redis host

required
port int

Redis port

required
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@classmethod
def from_host_and_port(
    cls,
    host: str,
    port: int,
) -> "RedisKVStore":
    """Load a RedisKVStore from a Redis host and port.

    Args:
        host (str): Redis host
        port (int): Redis port
    """
    url = f"redis://{host}:{port}".format(host=host, port=port)
    return cls(redis_uri=url)

from_redis_client classmethod #

from_redis_client(redis_client: Any) -> RedisKVStore

Load a RedisKVStore from a Redis Client.

Parameters:

Name Type Description Default
redis_client Redis

Redis client

required
Source code in llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-redis/llama_index/storage/kvstore/redis/base.py
173
174
175
176
177
178
179
180
@classmethod
def from_redis_client(cls, redis_client: Any) -> "RedisKVStore":
    """Load a RedisKVStore from a Redis Client.

    Args:
        redis_client (Redis): Redis client
    """
    return cls(redis_client=redis_client)