Skip to content

Async sql

AsyncDBChatStore #

Bases: BaseModel

Base class for DB-based chat stores.

Meant to implement a FIFO queue to manage short-term memory and general conversation history.

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class AsyncDBChatStore(BaseModel):
    """
    Base class for DB-based chat stores.

    Meant to implement a FIFO queue to manage short-term memory and
    general conversation history.
    """

    @abstractmethod
    async def get_messages(
        self,
        key: str,
        status: Optional[MessageStatus] = MessageStatus.ACTIVE,
        limit: Optional[int] = None,
        offset: Optional[int] = None,
    ) -> List[ChatMessage]:
        """
        Get all messages for a key with the specified status (async).

        Returns a list of messages.
        """

    @abstractmethod
    async def count_messages(
        self,
        key: str,
        status: Optional[MessageStatus] = MessageStatus.ACTIVE,
    ) -> int:
        """Count messages for a key with the specified status (async)."""

    @abstractmethod
    async def add_message(
        self,
        key: str,
        message: ChatMessage,
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Add a message for a key with the specified status (async)."""

    @abstractmethod
    async def add_messages(
        self,
        key: str,
        messages: List[ChatMessage],
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Add a list of messages in batch for the specified key and status (async)."""

    @abstractmethod
    async def set_messages(
        self,
        key: str,
        messages: List[ChatMessage],
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Set all messages for a key (replacing existing ones) with the specified status (async)."""

    @abstractmethod
    async def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
        """Delete a specific message by ID and return it (async)."""

    @abstractmethod
    async def delete_messages(
        self, key: str, status: Optional[MessageStatus] = None
    ) -> None:
        """Delete all messages for a key with the specified status (async)."""

    @abstractmethod
    async def delete_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
        """Delete the oldest n messages for a key and return them (async)."""

    @abstractmethod
    async def archive_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
        """Archive the oldest n messages for a key and return them (async)."""

    @abstractmethod
    async def get_keys(self) -> List[str]:
        """Get all unique keys in the store (async)."""

    @classmethod
    def class_name(cls) -> str:
        """Return the class name."""
        return "AsyncDBChatStore"

get_messages abstractmethod async #

get_messages(key: str, status: Optional[MessageStatus] = ACTIVE, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ChatMessage]

Get all messages for a key with the specified status (async).

Returns a list of messages.

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
27
28
29
30
31
32
33
34
35
36
37
38
39
@abstractmethod
async def get_messages(
    self,
    key: str,
    status: Optional[MessageStatus] = MessageStatus.ACTIVE,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> List[ChatMessage]:
    """
    Get all messages for a key with the specified status (async).

    Returns a list of messages.
    """

count_messages abstractmethod async #

count_messages(key: str, status: Optional[MessageStatus] = ACTIVE) -> int

Count messages for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
41
42
43
44
45
46
47
@abstractmethod
async def count_messages(
    self,
    key: str,
    status: Optional[MessageStatus] = MessageStatus.ACTIVE,
) -> int:
    """Count messages for a key with the specified status (async)."""

add_message abstractmethod async #

add_message(key: str, message: ChatMessage, status: MessageStatus = ACTIVE) -> None

Add a message for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
49
50
51
52
53
54
55
56
@abstractmethod
async def add_message(
    self,
    key: str,
    message: ChatMessage,
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Add a message for a key with the specified status (async)."""

add_messages abstractmethod async #

add_messages(key: str, messages: List[ChatMessage], status: MessageStatus = ACTIVE) -> None

Add a list of messages in batch for the specified key and status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
58
59
60
61
62
63
64
65
@abstractmethod
async def add_messages(
    self,
    key: str,
    messages: List[ChatMessage],
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Add a list of messages in batch for the specified key and status (async)."""

set_messages abstractmethod async #

set_messages(key: str, messages: List[ChatMessage], status: MessageStatus = ACTIVE) -> None

Set all messages for a key (replacing existing ones) with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
67
68
69
70
71
72
73
74
@abstractmethod
async def set_messages(
    self,
    key: str,
    messages: List[ChatMessage],
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Set all messages for a key (replacing existing ones) with the specified status (async)."""

delete_message abstractmethod async #

delete_message(key: str, idx: int) -> Optional[ChatMessage]

Delete a specific message by ID and return it (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
76
77
78
@abstractmethod
async def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
    """Delete a specific message by ID and return it (async)."""

delete_messages abstractmethod async #

delete_messages(key: str, status: Optional[MessageStatus] = None) -> None

Delete all messages for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
80
81
82
83
84
@abstractmethod
async def delete_messages(
    self, key: str, status: Optional[MessageStatus] = None
) -> None:
    """Delete all messages for a key with the specified status (async)."""

delete_oldest_messages abstractmethod async #

delete_oldest_messages(key: str, n: int) -> List[ChatMessage]

Delete the oldest n messages for a key and return them (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
86
87
88
@abstractmethod
async def delete_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
    """Delete the oldest n messages for a key and return them (async)."""

archive_oldest_messages abstractmethod async #

archive_oldest_messages(key: str, n: int) -> List[ChatMessage]

Archive the oldest n messages for a key and return them (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
90
91
92
@abstractmethod
async def archive_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
    """Archive the oldest n messages for a key and return them (async)."""

get_keys abstractmethod async #

get_keys() -> List[str]

Get all unique keys in the store (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
94
95
96
@abstractmethod
async def get_keys(self) -> List[str]:
    """Get all unique keys in the store (async)."""

class_name classmethod #

class_name() -> str

Return the class name.

Source code in llama-index-core/llama_index/core/storage/chat_store/base_db.py
 98
 99
100
101
@classmethod
def class_name(cls) -> str:
    """Return the class name."""
    return "AsyncDBChatStore"

SQLAlchemyChatStore #

Bases: AsyncDBChatStore

Base class for SQLAlchemy-based chat stores.

This class provides a foundation for creating chat stores that use SQLAlchemy to interact with SQL databases. It handles common operations like managing sessions, creating tables, and CRUD operations on chat messages.

Enhanced with status tracking for better FIFO queue management for short-term memory.

This class is meant to replace all other chat store classes.

Parameters:

Name Type Description Default
table_name str

Name of the table to store messages

required
async_database_uri str

SQLAlchemy async connection URI

'sqlite+aiosqlite:///:memory:'
Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class SQLAlchemyChatStore(AsyncDBChatStore):
    """
    Base class for SQLAlchemy-based chat stores.

    This class provides a foundation for creating chat stores that use SQLAlchemy
    to interact with SQL databases. It handles common operations like managing
    sessions, creating tables, and CRUD operations on chat messages.

    Enhanced with status tracking for better FIFO queue management for short-term memory.

    This class is meant to replace all other chat store classes.
    """

    table_name: str = Field(description="Name of the table to store messages")
    async_database_uri: str = Field(
        default=DEFAULT_ASYNC_DATABASE_URI,
        description="SQLAlchemy async connection URI",
    )

    _async_engine: Optional[AsyncEngine] = PrivateAttr(default=None)
    _async_session_factory: Optional[async_sessionmaker] = PrivateAttr(default=None)
    _metadata: MetaData = PrivateAttr(default_factory=MetaData)
    _table: Optional[Table] = PrivateAttr(default=None)
    _db_data: Optional[List[Dict[str, Any]]] = PrivateAttr(default=None)

    def __init__(
        self,
        table_name: str,
        async_database_uri: Optional[str] = DEFAULT_ASYNC_DATABASE_URI,
        async_engine: Optional[AsyncEngine] = None,
        db_data: Optional[List[Dict[str, Any]]] = None,
    ):
        """Initialize the SQLAlchemy chat store."""
        super().__init__(
            table_name=table_name,
            async_database_uri=async_database_uri or DEFAULT_ASYNC_DATABASE_URI,
        )
        self._async_engine = async_engine
        self._db_data = db_data

    @staticmethod
    def _is_in_memory_uri(uri: Optional[str]) -> bool:
        """Check if the URI points to an in-memory SQLite database."""
        # Handles both :memory: and empty path which also means in-memory for sqlite
        return uri == "sqlite+aiosqlite:///:memory:" or uri == "sqlite+aiosqlite://"

    async def _initialize(self) -> Tuple[async_sessionmaker[AsyncSession], Table]:
        """Initialize the chat store. Used to avoid HTTP connections in constructor."""
        if self._async_session_factory is not None and self._table is not None:
            return self._async_session_factory, self._table

        async_engine, async_session_factory = await self._setup_connections()
        table = await self._setup_tables(async_engine)

        # Restore data from in-memory database if provided
        if self._db_data:
            async with async_session_factory() as session:
                await session.execute(insert(table).values(self._db_data))
                await session.commit()

                # clear the data after it's inserted
                self._db_data = None

        return async_session_factory, table

    async def _setup_connections(
        self,
    ) -> Tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
        """Set up database connections and session factories."""
        # Create async engine and session factory if async URI is provided
        if self._async_session_factory is not None and self._async_engine is not None:
            return self._async_engine, self._async_session_factory
        elif self.async_database_uri or self._async_engine:
            self._async_engine = self._async_engine or create_async_engine(
                self.async_database_uri
            )
            if self.async_database_uri is None:
                self.async_database_uri = self._async_engine.url

            self._async_session_factory = async_sessionmaker(
                bind=self._async_engine, class_=AsyncSession
            )

            return self._async_engine, self._async_session_factory
        else:
            raise ValueError(
                "No async database URI or engine provided, cannot initialize DB sessionmaker"
            )

    async def _setup_tables(self, async_engine: AsyncEngine) -> Table:
        """Set up database tables."""
        # Create messages table with status column
        self._table = Table(
            f"{self.table_name}",
            self._metadata,
            Column("id", Integer, primary_key=True, autoincrement=True),
            Column("key", String, nullable=False, index=True),
            Column("timestamp", Integer, nullable=False, index=True),
            Column("role", String, nullable=False),
            Column(
                "status",
                String,
                nullable=False,
                default=MessageStatus.ACTIVE.value,
                index=True,
            ),
            Column("data", JSON, nullable=False),
        )

        # Create tables in the database
        async with async_engine.begin() as conn:
            await conn.run_sync(self._metadata.create_all)

        return self._table

    async def get_messages(
        self,
        key: str,
        status: Optional[MessageStatus] = MessageStatus.ACTIVE,
        limit: Optional[int] = None,
        offset: Optional[int] = None,
    ) -> List[ChatMessage]:
        """
        Get all messages for a key with the specified status (async).

        Returns a list of messages.
        """
        session_factory, table = await self._initialize()

        query = select(table).where(table.c.key == key)

        if limit is not None:
            query = query.limit(limit)

        if offset is not None:
            query = query.offset(offset)

        if status is not None:
            query = query.where(table.c.status == status.value)

        async with session_factory() as session:
            result = await session.execute(
                query.order_by(table.c.timestamp, table.c.id)
            )
            rows = result.fetchall()

            return [ChatMessage.model_validate(row.data) for row in rows]

    async def count_messages(
        self,
        key: str,
        status: Optional[MessageStatus] = MessageStatus.ACTIVE,
    ) -> int:
        """Count messages for a key with the specified status (async)."""
        session_factory, table = await self._initialize()

        query = select(table.c.id).where(table.c.key == key)

        if status is not None:
            query = query.where(table.c.status == status.value)

        async with session_factory() as session:
            result = await session.execute(query)
            rows = result.fetchall()
            return len(rows)

    async def add_message(
        self,
        key: str,
        message: ChatMessage,
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Add a message for a key with the specified status (async)."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            await session.execute(
                insert(table).values(
                    key=key,
                    timestamp=int(time.time()),
                    role=message.role,
                    status=status.value,
                    data=message.model_dump(mode="json"),
                )
            )
            await session.commit()

    async def add_messages(
        self,
        key: str,
        messages: List[ChatMessage],
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Add a list of messages in batch for the specified key and status (async)."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            await session.execute(
                insert(table).values(
                    [
                        {
                            "key": key,
                            "timestamp": int(time.time()),
                            "role": message.role,
                            "status": status.value,
                            "data": message.model_dump(mode="json"),
                        }
                        for message in messages
                    ]
                )
            )
            await session.commit()

    async def set_messages(
        self,
        key: str,
        messages: List[ChatMessage],
        status: MessageStatus = MessageStatus.ACTIVE,
    ) -> None:
        """Set all messages for a key (replacing existing ones) with the specified status (async)."""
        session_factory, table = await self._initialize()

        # First delete all existing messages
        await self.delete_messages(key)

        # Then add new messages
        current_time = int(time.time())

        async with session_factory() as session:
            for i, message in enumerate(messages):
                await session.execute(
                    insert(table).values(
                        key=key,
                        # Preserve order with incremental timestamps
                        timestamp=current_time + i,
                        role=message.role,
                        status=status.value,
                        data=message.model_dump(mode="json"),
                    )
                )
            await session.commit()

    async def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
        """Delete a specific message by ID and return it (async)."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            # First get the message
            result = await session.execute(
                select(table).where(table.c.key == key, table.c.id == idx)
            )
            row = result.fetchone()

            if not row:
                return None

            # Store the message we're about to delete
            message = ChatMessage.model_validate(row.data)

            # Delete the message
            await session.execute(delete(table).where(table.c.id == idx))
            await session.commit()

            return message

    async def delete_messages(
        self, key: str, status: Optional[MessageStatus] = None
    ) -> None:
        """Delete all messages for a key with the specified status (async)."""
        session_factory, table = await self._initialize()

        query = delete(table).where(table.c.key == key)

        if status is not None:
            query = query.where(table.c.status == status.value)

        async with session_factory() as session:
            await session.execute(query)
            await session.commit()

    async def delete_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
        """Delete the oldest n messages for a key and return them (async)."""
        session_factory, table = await self._initialize()

        oldest_messages = []

        async with session_factory() as session:
            # First get the oldest n messages
            result = await session.execute(
                select(table)
                .where(
                    table.c.key == key,
                    table.c.status == MessageStatus.ACTIVE.value,
                )
                .order_by(table.c.timestamp, table.c.id)
                .limit(n)
            )
            rows = result.fetchall()

            if not rows:
                return []

            # Store the messages we're about to delete
            oldest_messages = [ChatMessage.model_validate(row.data) for row in rows]

            # Get the IDs to delete
            ids_to_delete = [row.id for row in rows]

            # Delete the messages
            await session.execute(delete(table).where(table.c.id.in_(ids_to_delete)))
            await session.commit()

        return oldest_messages

    async def archive_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
        """Archive the oldest n messages for a key and return them (async)."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            # First get the oldest n messages
            result = await session.execute(
                select(table)
                .where(
                    table.c.key == key,
                    table.c.status == MessageStatus.ACTIVE.value,
                )
                .order_by(table.c.timestamp, table.c.id)
                .limit(n)
            )
            rows = result.fetchall()

            if not rows:
                return []

            # Store the messages we're about to archive
            archived_messages = [ChatMessage.model_validate(row.data) for row in rows]

            # Get the IDs to archive
            ids_to_archive = [row.id for row in rows]

            # Update message status to archived
            await session.execute(
                update(table)
                .where(table.c.id.in_(ids_to_archive))
                .values(status=MessageStatus.ARCHIVED.value)
            )
            await session.commit()

        return archived_messages

    async def get_keys(self) -> List[str]:
        """Get all unique keys in the store (async)."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            result = await session.execute(select(table.c.key).distinct())
            return [row[0] for row in result.fetchall()]

    async def _dump_db_data(self) -> List[Dict[str, Any]]:
        """Dump the data from the database."""
        session_factory, table = await self._initialize()

        async with session_factory() as session:
            result = await session.execute(select(table))
            rows = result.fetchall()
            return [
                {
                    "key": row.key,
                    "timestamp": row.timestamp,
                    "role": row.role,
                    "status": row.status,
                    "data": row.data,
                }
                for row in rows
            ]

    @model_serializer()
    def dump_store(self) -> dict:
        """
        Dump the store's configuration and data (if in-memory).

        Returns:
            A dictionary containing the store's configuration and potentially its data.

        """
        dump_data = {
            "table_name": self.table_name,
            "async_database_uri": self.async_database_uri,
        }

        if self._is_in_memory_uri(self.async_database_uri):
            # switch to sync sqlite
            dump_data["db_data"] = asyncio_run(self._dump_db_data())

        return dump_data

    @classmethod
    def class_name(cls) -> str:
        """Return the class name."""
        return "SQLAlchemyChatStore"

get_messages async #

get_messages(key: str, status: Optional[MessageStatus] = ACTIVE, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ChatMessage]

Get all messages for a key with the specified status (async).

Returns a list of messages.

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
async def get_messages(
    self,
    key: str,
    status: Optional[MessageStatus] = MessageStatus.ACTIVE,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> List[ChatMessage]:
    """
    Get all messages for a key with the specified status (async).

    Returns a list of messages.
    """
    session_factory, table = await self._initialize()

    query = select(table).where(table.c.key == key)

    if limit is not None:
        query = query.limit(limit)

    if offset is not None:
        query = query.offset(offset)

    if status is not None:
        query = query.where(table.c.status == status.value)

    async with session_factory() as session:
        result = await session.execute(
            query.order_by(table.c.timestamp, table.c.id)
        )
        rows = result.fetchall()

        return [ChatMessage.model_validate(row.data) for row in rows]

count_messages async #

count_messages(key: str, status: Optional[MessageStatus] = ACTIVE) -> int

Count messages for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
async def count_messages(
    self,
    key: str,
    status: Optional[MessageStatus] = MessageStatus.ACTIVE,
) -> int:
    """Count messages for a key with the specified status (async)."""
    session_factory, table = await self._initialize()

    query = select(table.c.id).where(table.c.key == key)

    if status is not None:
        query = query.where(table.c.status == status.value)

    async with session_factory() as session:
        result = await session.execute(query)
        rows = result.fetchall()
        return len(rows)

add_message async #

add_message(key: str, message: ChatMessage, status: MessageStatus = ACTIVE) -> None

Add a message for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def add_message(
    self,
    key: str,
    message: ChatMessage,
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Add a message for a key with the specified status (async)."""
    session_factory, table = await self._initialize()

    async with session_factory() as session:
        await session.execute(
            insert(table).values(
                key=key,
                timestamp=int(time.time()),
                role=message.role,
                status=status.value,
                data=message.model_dump(mode="json"),
            )
        )
        await session.commit()

add_messages async #

add_messages(key: str, messages: List[ChatMessage], status: MessageStatus = ACTIVE) -> None

Add a list of messages in batch for the specified key and status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def add_messages(
    self,
    key: str,
    messages: List[ChatMessage],
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Add a list of messages in batch for the specified key and status (async)."""
    session_factory, table = await self._initialize()

    async with session_factory() as session:
        await session.execute(
            insert(table).values(
                [
                    {
                        "key": key,
                        "timestamp": int(time.time()),
                        "role": message.role,
                        "status": status.value,
                        "data": message.model_dump(mode="json"),
                    }
                    for message in messages
                ]
            )
        )
        await session.commit()

set_messages async #

set_messages(key: str, messages: List[ChatMessage], status: MessageStatus = ACTIVE) -> None

Set all messages for a key (replacing existing ones) with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def set_messages(
    self,
    key: str,
    messages: List[ChatMessage],
    status: MessageStatus = MessageStatus.ACTIVE,
) -> None:
    """Set all messages for a key (replacing existing ones) with the specified status (async)."""
    session_factory, table = await self._initialize()

    # First delete all existing messages
    await self.delete_messages(key)

    # Then add new messages
    current_time = int(time.time())

    async with session_factory() as session:
        for i, message in enumerate(messages):
            await session.execute(
                insert(table).values(
                    key=key,
                    # Preserve order with incremental timestamps
                    timestamp=current_time + i,
                    role=message.role,
                    status=status.value,
                    data=message.model_dump(mode="json"),
                )
            )
        await session.commit()

delete_message async #

delete_message(key: str, idx: int) -> Optional[ChatMessage]

Delete a specific message by ID and return it (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
async def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
    """Delete a specific message by ID and return it (async)."""
    session_factory, table = await self._initialize()

    async with session_factory() as session:
        # First get the message
        result = await session.execute(
            select(table).where(table.c.key == key, table.c.id == idx)
        )
        row = result.fetchone()

        if not row:
            return None

        # Store the message we're about to delete
        message = ChatMessage.model_validate(row.data)

        # Delete the message
        await session.execute(delete(table).where(table.c.id == idx))
        await session.commit()

        return message

delete_messages async #

delete_messages(key: str, status: Optional[MessageStatus] = None) -> None

Delete all messages for a key with the specified status (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
async def delete_messages(
    self, key: str, status: Optional[MessageStatus] = None
) -> None:
    """Delete all messages for a key with the specified status (async)."""
    session_factory, table = await self._initialize()

    query = delete(table).where(table.c.key == key)

    if status is not None:
        query = query.where(table.c.status == status.value)

    async with session_factory() as session:
        await session.execute(query)
        await session.commit()

delete_oldest_messages async #

delete_oldest_messages(key: str, n: int) -> List[ChatMessage]

Delete the oldest n messages for a key and return them (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def delete_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
    """Delete the oldest n messages for a key and return them (async)."""
    session_factory, table = await self._initialize()

    oldest_messages = []

    async with session_factory() as session:
        # First get the oldest n messages
        result = await session.execute(
            select(table)
            .where(
                table.c.key == key,
                table.c.status == MessageStatus.ACTIVE.value,
            )
            .order_by(table.c.timestamp, table.c.id)
            .limit(n)
        )
        rows = result.fetchall()

        if not rows:
            return []

        # Store the messages we're about to delete
        oldest_messages = [ChatMessage.model_validate(row.data) for row in rows]

        # Get the IDs to delete
        ids_to_delete = [row.id for row in rows]

        # Delete the messages
        await session.execute(delete(table).where(table.c.id.in_(ids_to_delete)))
        await session.commit()

    return oldest_messages

archive_oldest_messages async #

archive_oldest_messages(key: str, n: int) -> List[ChatMessage]

Archive the oldest n messages for a key and return them (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
async def archive_oldest_messages(self, key: str, n: int) -> List[ChatMessage]:
    """Archive the oldest n messages for a key and return them (async)."""
    session_factory, table = await self._initialize()

    async with session_factory() as session:
        # First get the oldest n messages
        result = await session.execute(
            select(table)
            .where(
                table.c.key == key,
                table.c.status == MessageStatus.ACTIVE.value,
            )
            .order_by(table.c.timestamp, table.c.id)
            .limit(n)
        )
        rows = result.fetchall()

        if not rows:
            return []

        # Store the messages we're about to archive
        archived_messages = [ChatMessage.model_validate(row.data) for row in rows]

        # Get the IDs to archive
        ids_to_archive = [row.id for row in rows]

        # Update message status to archived
        await session.execute(
            update(table)
            .where(table.c.id.in_(ids_to_archive))
            .values(status=MessageStatus.ARCHIVED.value)
        )
        await session.commit()

    return archived_messages

get_keys async #

get_keys() -> List[str]

Get all unique keys in the store (async).

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
384
385
386
387
388
389
390
async def get_keys(self) -> List[str]:
    """Get all unique keys in the store (async)."""
    session_factory, table = await self._initialize()

    async with session_factory() as session:
        result = await session.execute(select(table.c.key).distinct())
        return [row[0] for row in result.fetchall()]

dump_store #

dump_store() -> dict

Dump the store's configuration and data (if in-memory).

Returns:

Type Description
dict

A dictionary containing the store's configuration and potentially its data.

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
@model_serializer()
def dump_store(self) -> dict:
    """
    Dump the store's configuration and data (if in-memory).

    Returns:
        A dictionary containing the store's configuration and potentially its data.

    """
    dump_data = {
        "table_name": self.table_name,
        "async_database_uri": self.async_database_uri,
    }

    if self._is_in_memory_uri(self.async_database_uri):
        # switch to sync sqlite
        dump_data["db_data"] = asyncio_run(self._dump_db_data())

    return dump_data

class_name classmethod #

class_name() -> str

Return the class name.

Source code in llama-index-core/llama_index/core/storage/chat_store/sql.py
430
431
432
433
@classmethod
def class_name(cls) -> str:
    """Return the class name."""
    return "SQLAlchemyChatStore"