In this vector store we store the text, its embedding and
its metadata in a KDBAI vector store table. This implementation
allows the use of an already existing table.
Parameters:
Name
Type
Description
Default
table
Table
The KDB.AI table to use as storage.
None
batch
int
batch size to insert data.
Default is 100.
required
Returns:
Name
Type
Description
KDBAIVectorStore
Vectorstore that supports add and query.
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-kdbai/llama_index/vector_stores/kdbai/base.py
classKDBAIVectorStore(BasePydanticVectorStore):"""The KDBAI Vector Store. In this vector store we store the text, its embedding and its metadata in a KDBAI vector store table. This implementation allows the use of an already existing table. Args: table kdbai.Table: The KDB.AI table to use as storage. batch (int, optional): batch size to insert data. Default is 100. Returns: KDBAIVectorStore: Vectorstore that supports add and query. """stores_text:bool=Trueflat_metadata:bool=Truehybrid_search:bool=Falsebatch_size:int_table:Any=PrivateAttr()_sparse_encoder:Optional[Callable]=PrivateAttr()def__init__(self,table:Any=None,hybrid_search:bool=False,sparse_encoder:Optional[Callable]=None,batch_size:int=DEFAULT_BATCH_SIZE,**kwargs:Any,)->None:"""Initialize params."""try:importkdbai_clientaskdbailogger.info("KDBAI client version: "+kdbai.__version__)exceptImportError:raiseValueError("Could not import kdbai_client package.""Please add it to the dependencies.")super().__init__(batch_size=batch_size,hybrid_search=hybrid_search)iftableisNone:raiseValueError("Must provide an existing KDB.AI table.")else:self._table=tableifhybrid_search:ifsparse_encoderisNone:self._sparse_encoder=default_sparse_encoderelse:self._sparse_encoder=sparse_encoder@propertydefclient(self)->Any:"""Return KDB.AI client."""returnself._table@classmethoddefclass_name(cls)->str:return"KDBAIVectorStore"defadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:"""Add nodes to the KDBAI Vector Store. Args: nodes (List[BaseNode]): List of nodes to be added. Returns: List[str]: List of document IDs that were added. """try:importkdbai_clientaskdbailogger.info("KDBAI client version: "+kdbai.__version__)exceptImportError:raiseValueError("Could not import kdbai_client package.""Please add it to the dependencies.")df=pd.DataFrame()docs=[]schema=self._table.schemaifself.hybrid_search:schema=[itemforiteminschemaifitem["name"]!="sparseVectors"]try:fornodeinnodes:doc={"document_id":node.node_id.encode("utf-8"),"text":node.text.encode("utf-8"),"embeddings":node.embedding,}ifself.hybrid_search:doc["sparseVectors"]=self._sparse_encoder(node.get_content())# handle metadata columnsiflen(schema)>len(DEFAULT_COLUMN_NAMES):forcolumnin[itemforiteminschemaifitem["name"]notinDEFAULT_COLUMN_NAMES]:try:doc[column["name"]]=node.metadata[column["name"]]exceptExceptionase:logger.error(f"Error writing column {column['name']} as type {column['type']}: {e}.")docs.append(doc)df=pd.DataFrame(docs)foriinrange((len(df)-1)//self.batch_size+1):batch=df.iloc[i*self.batch_size:(i+1)*self.batch_size]try:self._table.insert(batch)logger.info(f"inserted batch {i}")exceptExceptionase:logger.exception(f"Failed to insert batch {i} of documents into the datastore: {e}")return[x.decode("utf-8")forxindf["document_id"].tolist()]exceptExceptionase:logger.error(f"Error preparing data for KDB.AI: {e}.")defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:try:importkdbai_clientaskdbailogger.info("KDBAI client version: "+kdbai.__version__)exceptImportError:raiseValueError("Could not import kdbai_client package.""Please add it to the dependencies.")ifquery.alpha:raiseValueError("Could not run hybrid search. ""Please remove alpha and provide KDBAI weights for the two indexes though the vector_store_kwargs.")ifquery.filters:filter=query.filtersifkwargs.get("filter"):filter.extend(kwargs.pop("filter"))kwargs["filter"]=filterifkwargs.get("index"):index=kwargs.pop("index")ifself.hybrid_search:indexSparse=kwargs.pop("indexSparse",None)indexWeight=kwargs.pop("indexWeight",None)indexSparseWeight=kwargs.pop("indexSparseWeight",None)ifnotall([indexSparse,indexWeight,indexSparseWeight]):raiseValueError("Could not run hybrid search. ""Please provide KDBAI sparse index name and weights.")else:raiseValueError("Could not run the search. ""Please provide KDBAI index name.")ifself.hybrid_search:sparse_vectors=[self._sparse_encoder(query.query_str)]qry={index:[query.query_embedding],indexSparse:sparse_vectors}index_params={index:{"weight":indexWeight},indexSparse:{"weight":indexSparseWeight},}results=self._table.search(vectors=qry,index_params=index_params,n=query.similarity_top_k,**kwargs,)[0]else:results=self._table.search(vectors={index:[query.query_embedding]},n=query.similarity_top_k,**kwargs,)[0]top_k_nodes=[]top_k_ids=[]top_k_scores=[]forresultinresults.to_dict(orient="records"):metadata={x:result[x]forxinresultifxnotinDEFAULT_COLUMN_NAMES}node=TextNode(text=result["text"],id_=result["document_id"],metadata=metadata)top_k_ids.append(result["document_id"])top_k_nodes.append(node)top_k_scores.append(result["__nn_distance"])returnVectorStoreQueryResult(nodes=top_k_nodes,similarities=top_k_scores,ids=top_k_ids)defdelete(self,**delete_kwargs:Any)->None:raiseException("Not implemented.")
defadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:"""Add nodes to the KDBAI Vector Store. Args: nodes (List[BaseNode]): List of nodes to be added. Returns: List[str]: List of document IDs that were added. """try:importkdbai_clientaskdbailogger.info("KDBAI client version: "+kdbai.__version__)exceptImportError:raiseValueError("Could not import kdbai_client package.""Please add it to the dependencies.")df=pd.DataFrame()docs=[]schema=self._table.schemaifself.hybrid_search:schema=[itemforiteminschemaifitem["name"]!="sparseVectors"]try:fornodeinnodes:doc={"document_id":node.node_id.encode("utf-8"),"text":node.text.encode("utf-8"),"embeddings":node.embedding,}ifself.hybrid_search:doc["sparseVectors"]=self._sparse_encoder(node.get_content())# handle metadata columnsiflen(schema)>len(DEFAULT_COLUMN_NAMES):forcolumnin[itemforiteminschemaifitem["name"]notinDEFAULT_COLUMN_NAMES]:try:doc[column["name"]]=node.metadata[column["name"]]exceptExceptionase:logger.error(f"Error writing column {column['name']} as type {column['type']}: {e}.")docs.append(doc)df=pd.DataFrame(docs)foriinrange((len(df)-1)//self.batch_size+1):batch=df.iloc[i*self.batch_size:(i+1)*self.batch_size]try:self._table.insert(batch)logger.info(f"inserted batch {i}")exceptExceptionase:logger.exception(f"Failed to insert batch {i} of documents into the datastore: {e}")return[x.decode("utf-8")forxindf["document_id"].tolist()]exceptExceptionase:logger.error(f"Error preparing data for KDB.AI: {e}.")