Skip to content

Myscale

MyScaleVectorStore #

Bases: BasePydanticVectorStore

MyScale Vector Store.

In this vector store, embeddings and docs are stored within an existing MyScale cluster.

During query time, the index uses MyScale to query for the top k most similar nodes.

Parameters:

Name Type Description Default
myscale_client httpclient

clickhouse-connect httpclient of an existing MyScale cluster.

None
table str

The name of the MyScale table where data will be stored. Defaults to "llama_index".

'llama_index'
database str

The name of the MyScale database where data will be stored. Defaults to "default".

'default'
index_type str

The type of the MyScale vector index. Defaults to "IVFFLAT".

'MSTG'
metric str

The metric type of the MyScale vector index. Defaults to "cosine".

'cosine'
batch_size int

the size of documents to insert. Defaults to 32.

32
index_params dict

The index parameters for MyScale. Defaults to None.

None
search_params dict

The search parameters for a MyScale query. Defaults to None.

None
embed_dims embed_dims

Embedding dimensions. Defaults to None.

None

Examples:

pip install llama-index-vector-stores-myscale

from llama_index.vector_stores.myscale import MyScaleVectorStore
import clickhouse_connect

# initialize client
client = clickhouse_connect.get_client(
    host="YOUR_CLUSTER_HOST",
    port=8443,
    username="YOUR_USERNAME",
    password="YOUR_CLUSTER_PASSWORD",
)

vector_store = MyScaleVectorStore(myscale_client=client)
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class MyScaleVectorStore(BasePydanticVectorStore):
    """MyScale Vector Store.

    In this vector store, embeddings and docs are stored within an existing
    MyScale cluster.

    During query time, the index uses MyScale to query for the top
    k most similar nodes.

    Args:
        myscale_client (httpclient): clickhouse-connect httpclient of
            an existing MyScale cluster.
        table (str, optional): The name of the MyScale table
            where data will be stored. Defaults to "llama_index".
        database (str, optional): The name of the MyScale database
            where data will be stored. Defaults to "default".
        index_type (str, optional): The type of the MyScale vector index.
            Defaults to "IVFFLAT".
        metric (str, optional): The metric type of the MyScale vector index.
            Defaults to "cosine".
        batch_size (int, optional): the size of documents to insert. Defaults to 32.
        index_params (dict, optional): The index parameters for MyScale.
            Defaults to None.
        search_params (dict, optional): The search parameters for a MyScale query.
            Defaults to None.
        embed_dims (embed_dims, optional): Embedding dimensions. Defaults to None.

    Examples:
        `pip install llama-index-vector-stores-myscale`

        ```python
        from llama_index.vector_stores.myscale import MyScaleVectorStore
        import clickhouse_connect

        # initialize client
        client = clickhouse_connect.get_client(
            host="YOUR_CLUSTER_HOST",
            port=8443,
            username="YOUR_USERNAME",
            password="YOUR_CLUSTER_PASSWORD",
        )

        vector_store = MyScaleVectorStore(myscale_client=client)
        ```

    """

    stores_text: bool = True
    metadata_column: str = "metadata"
    AMPLIFY_RATIO_LE5: int = 100
    AMPLIFY_RATIO_GT5: int = 20
    AMPLIFY_RATIO_GT50: int = 10

    _index_existed: bool = PrivateAttr(False)
    _client: Any = PrivateAttr()
    _config: MyScaleSettings = PrivateAttr()
    _column_config: Dict = PrivateAttr()
    _dim: int = PrivateAttr()

    def __init__(
        self,
        myscale_client: Optional[Any] = None,
        table: str = "llama_index",
        database: str = "default",
        index_type: str = "MSTG",
        metric: str = "cosine",
        batch_size: int = 32,
        index_params: Optional[dict] = None,
        search_params: Optional[dict] = None,
        embed_dims: Optional[int] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        import_err_msg = """
            `clickhouse_connect` package not found,
            please run `pip install clickhouse-connect`
        """
        super().__init__()

        try:
            from clickhouse_connect.driver.httpclient import HttpClient
        except ImportError:
            raise ImportError(import_err_msg)

        if myscale_client is None:
            raise ValueError("Missing MyScale client!")

        self._client = cast(HttpClient, myscale_client)
        self._config = MyScaleSettings(
            table=table,
            database=database,
            index_type=index_type,
            metric=metric,
            batch_size=batch_size,
            index_params=index_params,
            search_params=search_params,
            **kwargs,
        )

        # schema column name, type, and construct format method
        self._column_config: Dict = {
            "id": {"type": "String", "extract_func": lambda x: x.node_id},
            "doc_id": {"type": "String", "extract_func": lambda x: x.ref_doc_id},
            "text": {
                "type": "String",
                "extract_func": lambda x: escape_str(
                    x.get_content(metadata_mode=MetadataMode.NONE) or ""
                ),
            },
            "vector": {
                "type": "Array(Float32)",
                "extract_func": lambda x: format_list_to_string(x.get_embedding()),
            },
            "node_info": {
                "type": "JSON",
                "extract_func": lambda x: json.dumps(x.node_info),
            },
            "metadata": {
                "type": "JSON",
                "extract_func": lambda x: json.dumps(x.metadata),
            },
        }

        if embed_dims is not None:
            self._create_index(embed_dims)

    @classmethod
    def class_name(cls) -> str:
        """Get class name."""
        return "MyScaleVectorStore"

    @property
    def client(self) -> Any:
        """Get client."""
        return self._client

    def _create_index(self, dimension: int) -> None:
        index_params = (
            ", "
            + ",".join([f"'{k}={v}'" for k, v in self._config.index_params.items()])
            if self._config.index_params
            else ""
        )
        schema_ = f"""
            CREATE TABLE IF NOT EXISTS {self._config.database}.{self._config.table}(
                {",".join([f'{k} {v["type"]}' for k, v in self._column_config.items()])},
                CONSTRAINT vector_length CHECK length(vector) = {dimension},
                VECTOR INDEX {self._config.table}_index vector TYPE
                {self._config.index_type}('metric_type={self._config.metric}'{index_params})
            ) ENGINE = MergeTree ORDER BY id
            """
        self._dim = dimension
        self._client.command("SET allow_experimental_object_type=1")
        self._client.command(schema_)
        self._index_existed = True

    def _build_insert_statement(
        self,
        values: List[BaseNode],
    ) -> str:
        _data = []
        for item in values:
            item_value_str = ",".join(
                [
                    f"'{column['extract_func'](item)}'"
                    for column in self._column_config.values()
                ]
            )
            _data.append(f"({item_value_str})")

        return f"""
                INSERT INTO TABLE
                    {self._config.database}.{self._config.table}({",".join(self._column_config.keys())})
                VALUES
                    {','.join(_data)}
                """

    def _build_hybrid_search_statement(
        self, stage_one_sql: str, query_str: str, similarity_top_k: int
    ) -> str:
        terms_pattern = [f"(?i){x}" for x in query_str.split(" ")]
        column_keys = self._column_config.keys()
        return (
            f"SELECT {','.join(filter(lambda k: k != 'vector', column_keys))}, "
            f"dist FROM ({stage_one_sql}) tempt "
            f"ORDER BY length(multiMatchAllIndices(text, {terms_pattern})) "
            f"AS distance1 DESC, "
            f"log(1 + countMatches(text, '(?i)({query_str.replace(' ', '|')})')) "
            f"AS distance2 DESC limit {similarity_top_k}"
        )

    def _append_meta_filter_condition(
        self, where_str: Optional[str], exact_match_filter: list
    ) -> str:
        filter_str = " AND ".join(
            f"JSONExtractString(toJSONString("
            f"{self.metadata_column}), '{filter_item.key}') "
            f"= '{filter_item.value}'"
            for filter_item in exact_match_filter
        )
        if where_str is None:
            where_str = filter_str
        else:
            where_str = " AND " + filter_str
        return where_str

    def add(
        self,
        nodes: List[BaseNode],
        **add_kwargs: Any,
    ) -> List[str]:
        """Add nodes to index.

        Args:
            nodes: List[BaseNode]: list of nodes with embeddings

        """
        if not nodes:
            return []

        if not self._index_existed:
            self._create_index(len(nodes[0].get_embedding()))

        for result_batch in iter_batch(nodes, self._config.batch_size):
            insert_statement = self._build_insert_statement(values=result_batch)
            self._client.command(insert_statement)

        return [result.node_id for result in nodes]

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """
        Delete nodes using with ref_doc_id.

        Args:
            ref_doc_id (str): The doc_id of the document to delete.

        """
        self._client.command(
            f"DELETE FROM {self._config.database}.{self._config.table} "
            f"where doc_id='{ref_doc_id}'"
        )

    def drop(self) -> None:
        """Drop MyScale Index and table."""
        self._client.command(
            f"DROP TABLE IF EXISTS {self._config.database}.{self._config.table}"
        )

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """Query index for top k most similar nodes.

        Args:
            query (VectorStoreQuery): query

        """
        query_embedding = cast(List[float], query.query_embedding)
        where_str = (
            f"doc_id in {format_list_to_string(query.doc_ids)}"
            if query.doc_ids
            else None
        )
        if query.filters is not None and len(query.filters.legacy_filters()) > 0:
            where_str = self._append_meta_filter_condition(
                where_str, query.filters.legacy_filters()
            )

        # build query sql
        query_statement = self._config.build_query_statement(
            query_embed=query_embedding,
            where_str=where_str,
            limit=query.similarity_top_k,
        )
        if query.mode == VectorStoreQueryMode.HYBRID and query.query_str is not None:
            amplify_ratio = self.AMPLIFY_RATIO_LE5
            if 5 < query.similarity_top_k < 50:
                amplify_ratio = self.AMPLIFY_RATIO_GT5
            if query.similarity_top_k > 50:
                amplify_ratio = self.AMPLIFY_RATIO_GT50
            query_statement = self._build_hybrid_search_statement(
                self._config.build_query_statement(
                    query_embed=query_embedding,
                    where_str=where_str,
                    limit=query.similarity_top_k * amplify_ratio,
                ),
                query.query_str,
                query.similarity_top_k,
            )
            logger.debug(f"hybrid query_statement={query_statement}")
        nodes = []
        ids = []
        similarities = []
        for r in self._client.query(query_statement).named_results():
            start_char_idx = None
            end_char_idx = None

            if isinstance(r["node_info"], dict):
                start_char_idx = r["node_info"].get("start", None)
                end_char_idx = r["node_info"].get("end", None)
            node = TextNode(
                id_=r["id"],
                text=r["text"],
                metadata=r["metadata"],
                start_char_idx=start_char_idx,
                end_char_idx=end_char_idx,
                relationships={
                    NodeRelationship.SOURCE: RelatedNodeInfo(node_id=r["id"])
                },
            )

            nodes.append(node)
            similarities.append(r["dist"])
            ids.append(r["id"])
        return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)

client property #

client: Any

Get client.

class_name classmethod #

class_name() -> str

Get class name.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
161
162
163
164
@classmethod
def class_name(cls) -> str:
    """Get class name."""
    return "MyScaleVectorStore"

add #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

Add nodes to index.

Parameters:

Name Type Description Default
nodes List[BaseNode]

List[BaseNode]: list of nodes with embeddings

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def add(
    self,
    nodes: List[BaseNode],
    **add_kwargs: Any,
) -> List[str]:
    """Add nodes to index.

    Args:
        nodes: List[BaseNode]: list of nodes with embeddings

    """
    if not nodes:
        return []

    if not self._index_existed:
        self._create_index(len(nodes[0].get_embedding()))

    for result_batch in iter_batch(nodes, self._config.batch_size):
        insert_statement = self._build_insert_statement(values=result_batch)
        self._client.command(insert_statement)

    return [result.node_id for result in nodes]

delete #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

Delete nodes using with ref_doc_id.

Parameters:

Name Type Description Default
ref_doc_id str

The doc_id of the document to delete.

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
264
265
266
267
268
269
270
271
272
273
274
275
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    """
    Delete nodes using with ref_doc_id.

    Args:
        ref_doc_id (str): The doc_id of the document to delete.

    """
    self._client.command(
        f"DELETE FROM {self._config.database}.{self._config.table} "
        f"where doc_id='{ref_doc_id}'"
    )

drop #

drop() -> None

Drop MyScale Index and table.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
277
278
279
280
281
def drop(self) -> None:
    """Drop MyScale Index and table."""
    self._client.command(
        f"DROP TABLE IF EXISTS {self._config.database}.{self._config.table}"
    )

query #

query(query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult

Query index for top k most similar nodes.

Parameters:

Name Type Description Default
query VectorStoreQuery

query

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-myscale/llama_index/vector_stores/myscale/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
    """Query index for top k most similar nodes.

    Args:
        query (VectorStoreQuery): query

    """
    query_embedding = cast(List[float], query.query_embedding)
    where_str = (
        f"doc_id in {format_list_to_string(query.doc_ids)}"
        if query.doc_ids
        else None
    )
    if query.filters is not None and len(query.filters.legacy_filters()) > 0:
        where_str = self._append_meta_filter_condition(
            where_str, query.filters.legacy_filters()
        )

    # build query sql
    query_statement = self._config.build_query_statement(
        query_embed=query_embedding,
        where_str=where_str,
        limit=query.similarity_top_k,
    )
    if query.mode == VectorStoreQueryMode.HYBRID and query.query_str is not None:
        amplify_ratio = self.AMPLIFY_RATIO_LE5
        if 5 < query.similarity_top_k < 50:
            amplify_ratio = self.AMPLIFY_RATIO_GT5
        if query.similarity_top_k > 50:
            amplify_ratio = self.AMPLIFY_RATIO_GT50
        query_statement = self._build_hybrid_search_statement(
            self._config.build_query_statement(
                query_embed=query_embedding,
                where_str=where_str,
                limit=query.similarity_top_k * amplify_ratio,
            ),
            query.query_str,
            query.similarity_top_k,
        )
        logger.debug(f"hybrid query_statement={query_statement}")
    nodes = []
    ids = []
    similarities = []
    for r in self._client.query(query_statement).named_results():
        start_char_idx = None
        end_char_idx = None

        if isinstance(r["node_info"], dict):
            start_char_idx = r["node_info"].get("start", None)
            end_char_idx = r["node_info"].get("end", None)
        node = TextNode(
            id_=r["id"],
            text=r["text"],
            metadata=r["metadata"],
            start_char_idx=start_char_idx,
            end_char_idx=end_char_idx,
            relationships={
                NodeRelationship.SOURCE: RelatedNodeInfo(node_id=r["id"])
            },
        )

        nodes.append(node)
        similarities.append(r["dist"])
        ids.append(r["id"])
    return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)