classBaiduVectorDB(VectorStore):"""Baidu VectorDB as a vector store. In order to use this you need to have a database instance. See the following documentation for details: https://cloud.baidu.com/doc/VDB/index.html Args: endpoint (Optional[str]): endpoint of Baidu VectorDB account (Optional[str]): The account for Baidu VectorDB. Default value is "root" api_key (Optional[str]): The Api-Key for Baidu VectorDB database_name(Optional[str]): The database name for Baidu VectorDB table_params (Optional[TableParams]): The table parameters for BaiduVectorDB """user_defined_fields:List[TableField]=[]def__init__(self,endpoint:str,api_key:str,account:str=DEFAULT_ACCOUNT,database_name:str=DEFAULT_DATABASE_NAME,table_params:TableParams=TableParams(dimension=1536),batch_size:int=1000,**kwargs:Any,):"""Init params."""self._init_client(endpoint,account,api_key)self._create_database_if_not_exists(database_name)self._create_table(table_params)self.batch_size=batch_sizeself.user_defined_fields=table_params.filter_fields@classmethoddefclass_name(cls)->str:return"BaiduVectorDB"@classmethoddeffrom_params(cls,endpoint:str,api_key:str,account:str=DEFAULT_ACCOUNT,database_name:str=DEFAULT_DATABASE_NAME,table_params:TableParams=TableParams(dimension=1536),batch_size:int=1000,**kwargs:Any,)->"BaiduVectorDB":_try_import()returncls(endpoint=endpoint,account=account,api_key=api_key,database_name=database_name,table_params=table_params,batch_size=batch_size,**kwargs,)def_init_client(self,endpoint:str,account:str,api_key:str)->None:importpymochowfrompymochow.configurationimportConfigurationfrompymochow.auth.bce_credentialsimportBceCredentialsconfig=Configuration(credentials=BceCredentials(account,api_key),endpoint=endpoint,connection_timeout_in_mills=DEFAULT_TIMEOUT_IN_MILLS,)self.vdb_client=pymochow.MochowClient(config)def_create_database_if_not_exists(self,database_name:str)->None:db_list=self.vdb_client.list_databases()ifdatabase_namein[db.database_namefordbindb_list]:self.database=self.vdb_client.database(database_name)else:self.database=self.vdb_client.create_database(database_name)def_create_table(self,table_params:TableParams)->None:importpymochowiftable_paramsisNone:raiseValueError(VALUE_NONE_ERROR.format("table_params"))try:self.table=self.database.describe_table(table_params.table_name)iftable_params.drop_exists:self.database.drop_table(table_params.table_name)# wait db release resourcetime.sleep(5)self._create_table_in_db(table_params)exceptpymochow.exception.ServerError:self._create_table_in_db(table_params)def_create_table_in_db(self,table_params:TableParams,)->None:frompymochow.model.enumimportFieldTypefrompymochow.model.schemaimportField,Schema,SecondaryIndex,VectorIndexfrompymochow.model.tableimportPartitionindex_type=self._get_index_type(table_params.index_type)metric_type=self._get_metric_type(table_params.metric_type)vector_params=self._get_index_params(index_type,table_params)fields=[]fields.append(Field(FIELD_ID,FieldType.STRING,primary_key=True,partition_key=True,auto_increment=False,not_null=True,))fields.append(Field(DEFAULT_DOC_ID_KEY,FieldType.STRING))fields.append(Field(FIELD_METADATA,FieldType.STRING))fields.append(Field(DEFAULT_TEXT_KEY,FieldType.STRING))fields.append(Field(FIELD_VECTOR,FieldType.FLOAT_VECTOR,dimension=table_params.dimension))forfieldintable_params.filter_fields:fields.append(Field(field.name,FieldType(field.data_type),not_null=True))indexes=[]indexes.append(VectorIndex(index_name=INDEX_VECTOR,index_type=index_type,field=FIELD_VECTOR,metric_type=metric_type,params=vector_params,))forfieldintable_params.filter_fields:index_name=field.name+INDEX_SUFFIXindexes.append(SecondaryIndex(index_name=index_name,field=field.name))schema=Schema(fields=fields,indexes=indexes)self.table=self.database.create_table(table_name=table_params.table_name,replication=table_params.replication,partition=Partition(partition_num=table_params.partition),schema=Schema(fields=fields,indexes=indexes),enable_dynamic_field=True,)# need wait 10s to wait proxy sync metatime.sleep(10)@staticmethoddef_get_index_params(index_type:Any,table_params:TableParams)->None:frompymochow.model.enumimportIndexTypefrompymochow.model.schemaimportHNSWParamsvector_params=({}iftable_params.vector_paramsisNoneelsetable_params.vector_params)ifindex_type==IndexType.HNSW:returnHNSWParams(m=vector_params.get("M",DEFAULT_HNSW_M),efconstruction=vector_params.get("efConstruction",DEFAULT_HNSW_EF_CONSTRUCTION),)returnNone@staticmethoddef_get_index_type(index_type_value:str)->Any:frompymochow.model.enumimportIndexTypeindex_type_value=index_type_valueorIndexType.HNSWtry:returnIndexType(index_type_value)exceptValueError:support_index_types=[d.valuefordinIndexType.__members__.values()]raiseValueError(NOT_SUPPORT_INDEX_TYPE_ERROR.format(index_type_value,support_index_types))@staticmethoddef_get_metric_type(metric_type_value:str)->Any:frompymochow.model.enumimportMetricTypemetric_type_value=metric_type_valueorMetricType.L2try:returnMetricType(metric_type_value.upper())exceptValueError:support_metric_types=[d.valuefordinMetricType.__members__.values()]raiseValueError(NOT_SUPPORT_METRIC_TYPE_ERROR.format(metric_type_value,support_metric_types))@propertydefclient(self)->Any:"""Get client."""returnself.tencent_clientdefadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:"""Add nodes to index. Args: nodes: List[BaseNode]: list of nodes with embeddings """frompymochow.model.tableimportRowfrompymochow.model.enumimportIndexStateids=[]rows=[]fornodeinnodes:row=Row(id=node.node_id,vector=node.get_embedding())ifnode.ref_doc_idisnotNone:row._data[DEFAULT_DOC_ID_KEY]=node.ref_doc_idifnode.metadataisnotNone:row._data[FIELD_METADATA]=json.dumps(node.metadata)forfieldinself.user_defined_fields:v=node.metadata.get(field.name)ifvisnotNone:row._data[field.name]=vifisinstance(node,TextNode)andnode.textisnotNone:row._data[DEFAULT_TEXT_KEY]=node.textrows.append(row)ids.append(node.node_id)iflen(rows)>=self.batch_size:self.collection.upsert(rows=rows)rows=[]iflen(rows)>0:self.table.upsert(rows=rows)self.table.rebuild_index(INDEX_VECTOR)whileTrue:time.sleep(2)index=self.table.describe_index(INDEX_VECTOR)ifindex.state==IndexState.NORMAL:breakreturnids# Baidu VectorDB Not support delete with filter right now, will support it later.defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:""" Delete nodes using with ref_doc_id or ids. Args: ref_doc_id (str): The doc_id of the document to delete. """raiseNotImplementedError("Not support.")defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:"""Query index for top k most similar nodes. Args: query (VectorStoreQuery): contains query_embedding (List[float]): query embedding similarity_top_k (int): top k most similar nodes filters (Optional[MetadataFilters]): filter result """frompymochow.model.tableimportAnnSearch,HNSWSearchParamssearch_filter=Noneifquery.filtersisnotNone:search_filter=self._build_filter_condition(query.filters,**kwargs)anns=AnnSearch(vector_field=FIELD_VECTOR,vector_floats=query.query_embedding,params=HNSWSearchParams(ef=DEFAULT_HNSW_EF,limit=query.similarity_top_k),filter=search_filter,)res=self.table.search(anns=anns,retrieve_vector=True)rows=res.rowsifrowsisNoneorlen(rows)==0:returnVectorStoreQueryResult(nodes=[],similarities=[],ids=[])nodes=[]similarities=[]ids=[]forrowinrows:similarities.append(row.get("distance"))row_data=row.get("row",{})ids.append(row_data.get(FIELD_ID))meta_str=row_data.get(FIELD_METADATA)meta={}ifmeta_strisNoneelsejson.loads(meta_str)doc_id=row_data.get(DEFAULT_DOC_ID_KEY)node=TextNode(id_=row_data.get(FIELD_ID),text=row_data.get(DEFAULT_TEXT_KEY),embedding=row_data.get(FIELD_VECTOR),metadata=meta,)ifdoc_idisnotNone:node.relationships={NodeRelationship.SOURCE:RelatedNodeInfo(node_id=doc_id)}nodes.append(node)returnVectorStoreQueryResult(nodes=nodes,similarities=similarities,ids=ids)@staticmethoddef_build_filter_condition(standard_filters:MetadataFilters)->str:filters_list=[]forfilterinstandard_filters.filters:iffilter.operator:iffilter.operatorin["<",">","<=",">=","!="]:condition=f"{filter.key}{filter.operator}{filter.value}"eliffilter.operatorin["=="]:ifisinstance(filter.value,str):condition=f"{filter.key}='{filter.value}'"else:condition=f"{filter.key}=={filter.value}"else:raiseValueError(f"Filter operator {filter.operator} not supported.")else:condition=f"{filter.key}={filter.value}"filters_list.append(condition)returnstandard_filters.condition.join(filters_list)
defadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:"""Add nodes to index. Args: nodes: List[BaseNode]: list of nodes with embeddings """frompymochow.model.tableimportRowfrompymochow.model.enumimportIndexStateids=[]rows=[]fornodeinnodes:row=Row(id=node.node_id,vector=node.get_embedding())ifnode.ref_doc_idisnotNone:row._data[DEFAULT_DOC_ID_KEY]=node.ref_doc_idifnode.metadataisnotNone:row._data[FIELD_METADATA]=json.dumps(node.metadata)forfieldinself.user_defined_fields:v=node.metadata.get(field.name)ifvisnotNone:row._data[field.name]=vifisinstance(node,TextNode)andnode.textisnotNone:row._data[DEFAULT_TEXT_KEY]=node.textrows.append(row)ids.append(node.node_id)iflen(rows)>=self.batch_size:self.collection.upsert(rows=rows)rows=[]iflen(rows)>0:self.table.upsert(rows=rows)self.table.rebuild_index(INDEX_VECTOR)whileTrue:time.sleep(2)index=self.table.describe_index(INDEX_VECTOR)ifindex.state==IndexState.NORMAL:breakreturnids
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-baiduvectordb/llama_index/vector_stores/baiduvectordb/base.py
379380381382383384385386387
defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:""" Delete nodes using with ref_doc_id or ids. Args: ref_doc_id (str): The doc_id of the document to delete. """raiseNotImplementedError("Not support.")
contains
query_embedding (List[float]): query embedding
similarity_top_k (int): top k most similar nodes
filters (Optional[MetadataFilters]): filter result
required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-baiduvectordb/llama_index/vector_stores/baiduvectordb/base.py
defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:"""Query index for top k most similar nodes. Args: query (VectorStoreQuery): contains query_embedding (List[float]): query embedding similarity_top_k (int): top k most similar nodes filters (Optional[MetadataFilters]): filter result """frompymochow.model.tableimportAnnSearch,HNSWSearchParamssearch_filter=Noneifquery.filtersisnotNone:search_filter=self._build_filter_condition(query.filters,**kwargs)anns=AnnSearch(vector_field=FIELD_VECTOR,vector_floats=query.query_embedding,params=HNSWSearchParams(ef=DEFAULT_HNSW_EF,limit=query.similarity_top_k),filter=search_filter,)res=self.table.search(anns=anns,retrieve_vector=True)rows=res.rowsifrowsisNoneorlen(rows)==0:returnVectorStoreQueryResult(nodes=[],similarities=[],ids=[])nodes=[]similarities=[]ids=[]forrowinrows:similarities.append(row.get("distance"))row_data=row.get("row",{})ids.append(row_data.get(FIELD_ID))meta_str=row_data.get(FIELD_METADATA)meta={}ifmeta_strisNoneelsejson.loads(meta_str)doc_id=row_data.get(DEFAULT_DOC_ID_KEY)node=TextNode(id_=row_data.get(FIELD_ID),text=row_data.get(DEFAULT_TEXT_KEY),embedding=row_data.get(FIELD_VECTOR),metadata=meta,)ifdoc_idisnotNone:node.relationships={NodeRelationship.SOURCE:RelatedNodeInfo(node_id=doc_id)}nodes.append(node)returnVectorStoreQueryResult(nodes=nodes,similarities=similarities,ids=ids)