classPGVectorStore(BasePydanticVectorStore):"""Postgres Vector Store. Examples: `pip install llama-index-vector-stores-postgres` ```python from llama_index.vector_stores.postgres import PGVectorStore # Create PGVectorStore instance vector_store = PGVectorStore.from_params( database="vector_db", host="localhost", password="password", port=5432, user="postgres", table_name="paul_graham_essay", embed_dim=1536 # openai embedding dimension ) ``` """fromsqlalchemy.sql.selectableimportSelectstores_text=Trueflat_metadata=Falseconnection_string:strasync_connection_string:strtable_name:strschema_name:strembed_dim:inthybrid_search:booltext_search_config:strcache_ok:boolperform_setup:booldebug:booluse_jsonb:bool_base:Any=PrivateAttr()_table_class:Any=PrivateAttr()_engine:Any=PrivateAttr()_session:Any=PrivateAttr()_async_engine:Any=PrivateAttr()_async_session:Any=PrivateAttr()_is_initialized:bool=PrivateAttr(default=False)def__init__(self,connection_string:str,async_connection_string:str,table_name:str,schema_name:str,hybrid_search:bool=False,text_search_config:str="english",embed_dim:int=1536,cache_ok:bool=False,perform_setup:bool=True,debug:bool=False,use_jsonb:bool=False,)->None:table_name=table_name.lower()schema_name=schema_name.lower()ifhybrid_searchandtext_search_configisNone:raiseValueError("Sparse vector index creation requires ""a text search configuration specification.")fromsqlalchemy.ormimportdeclarative_base# sqlalchemy modelself._base=declarative_base()self._table_class=get_data_model(self._base,table_name,schema_name,hybrid_search,text_search_config,cache_ok,embed_dim=embed_dim,use_jsonb=use_jsonb,)super().__init__(connection_string=connection_string,async_connection_string=async_connection_string,table_name=table_name,schema_name=schema_name,hybrid_search=hybrid_search,text_search_config=text_search_config,embed_dim=embed_dim,cache_ok=cache_ok,perform_setup=perform_setup,debug=debug,use_jsonb=use_jsonb,)asyncdefclose(self)->None:ifnotself._is_initialized:returnself._session.close_all()self._engine.dispose()awaitself._async_engine.dispose()@classmethoddefclass_name(cls)->str:return"PGVectorStore"@classmethoddeffrom_params(cls,host:Optional[str]=None,port:Optional[str]=None,database:Optional[str]=None,user:Optional[str]=None,password:Optional[str]=None,table_name:str="llamaindex",schema_name:str="public",connection_string:Optional[str]=None,async_connection_string:Optional[str]=None,hybrid_search:bool=False,text_search_config:str="english",embed_dim:int=1536,cache_ok:bool=False,perform_setup:bool=True,debug:bool=False,use_jsonb:bool=False,)->"PGVectorStore":"""Return connection string from database parameters."""conn_str=(connection_stringorf"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}")async_conn_str=async_connection_stringor(f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}")returncls(connection_string=conn_str,async_connection_string=async_conn_str,table_name=table_name,schema_name=schema_name,hybrid_search=hybrid_search,text_search_config=text_search_config,embed_dim=embed_dim,cache_ok=cache_ok,perform_setup=perform_setup,debug=debug,use_jsonb=use_jsonb,)@propertydefclient(self)->Any:ifnotself._is_initialized:returnNonereturnself._enginedef_connect(self)->Any:fromsqlalchemyimportcreate_enginefromsqlalchemy.ext.asyncioimportAsyncSession,create_async_enginefromsqlalchemy.ormimportsessionmakerself._engine=create_engine(self.connection_string,echo=self.debug)self._session=sessionmaker(self._engine)self._async_engine=create_async_engine(self.async_connection_string)self._async_session=sessionmaker(self._async_engine,class_=AsyncSession)# type: ignoredef_create_schema_if_not_exists(self)->None:withself._session()assession,session.begin():fromsqlalchemyimporttext# Check if the specified schema exists with "CREATE" statementcheck_schema_statement=text(f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{self.schema_name}'")result=session.execute(check_schema_statement).fetchone()# If the schema does not exist, then create itifnotresult:create_schema_statement=text(f"CREATE SCHEMA IF NOT EXISTS {self.schema_name}")session.execute(create_schema_statement)session.commit()def_create_tables_if_not_exists(self)->None:withself._session()assession,session.begin():self._base.metadata.create_all(session.connection())def_create_extension(self)->None:importsqlalchemywithself._session()assession,session.begin():statement=sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector")session.execute(statement)session.commit()def_initialize(self)->None:ifnotself._is_initialized:self._connect()ifself.perform_setup:self._create_extension()self._create_schema_if_not_exists()self._create_tables_if_not_exists()self._is_initialized=Truedef_node_to_table_row(self,node:BaseNode)->Any:returnself._table_class(node_id=node.node_id,embedding=node.get_embedding(),text=node.get_content(metadata_mode=MetadataMode.NONE),metadata_=node_to_metadata_dict(node,remove_text=True,flat_metadata=self.flat_metadata,),)defadd(self,nodes:List[BaseNode],**add_kwargs:Any)->List[str]:self._initialize()ids=[]withself._session()assession,session.begin():fornodeinnodes:ids.append(node.node_id)item=self._node_to_table_row(node)session.add(item)session.commit()returnidsasyncdefasync_add(self,nodes:List[BaseNode],**kwargs:Any)->List[str]:self._initialize()ids=[]asyncwithself._async_session()assession,session.begin():fornodeinnodes:ids.append(node.node_id)item=self._node_to_table_row(node)session.add(item)awaitsession.commit()returnidsdef_to_postgres_operator(self,operator:FilterOperator)->str:ifoperator==FilterOperator.EQ:return"="elifoperator==FilterOperator.GT:return">"elifoperator==FilterOperator.LT:return"<"elifoperator==FilterOperator.NE:return"!="elifoperator==FilterOperator.GTE:return">="elifoperator==FilterOperator.LTE:return"<="elifoperator==FilterOperator.IN:return"IN"elifoperator==FilterOperator.NIN:return"NOT IN"elifoperator==FilterOperator.CONTAINS:return"@>"else:_logger.warning(f"Unknown operator: {operator}, fallback to '='")return"="def_build_filter_clause(self,filter_:MetadataFilter)->Any:fromsqlalchemyimporttextiffilter_.operatorin[FilterOperator.IN,FilterOperator.NIN]:# Expects a single value in the metadata, and a list to comparereturntext(f"metadata_->>'{filter_.key}' {self._to_postgres_operator(filter_.operator)} :values").bindparams(values=tuple(filter_.value))eliffilter_.operator==FilterOperator.CONTAINS:# Expects a list stored in the metadata, and a single value to comparereturntext(f"metadata_::jsonb->'{filter_.key}' "f"{self._to_postgres_operator(filter_.operator)} "f"'[\"{filter_.value}\"]'")else:returntext(f"metadata_->>'{filter_.key}' "f"{self._to_postgres_operator(filter_.operator)} "f"'{filter_.value}'")def_recursively_apply_filters(self,filters:List[MetadataFilters])->Any:""" Returns a sqlalchemy where clause. """importsqlalchemysqlalchemy_conditions={"or":sqlalchemy.sql.or_,"and":sqlalchemy.sql.and_,}iffilters.conditionnotinsqlalchemy_conditions:raiseValueError(f"Invalid condition: {filters.condition}. "f"Must be one of {list(sqlalchemy_conditions.keys())}")returnsqlalchemy_conditions[filters.condition](*((self._build_filter_clause(filter_)ifnotisinstance(filter_,MetadataFilters)elseself._recursively_apply_filters(filter_))forfilter_infilters.filters))def_apply_filters_and_limit(self,stmt:Select,limit:int,metadata_filters:Optional[MetadataFilters]=None,)->Any:ifmetadata_filters:stmt=stmt.where(# type: ignoreself._recursively_apply_filters(metadata_filters))returnstmt.limit(limit)# type: ignoredef_build_query(self,embedding:Optional[List[float]],limit:int=10,metadata_filters:Optional[MetadataFilters]=None,)->Any:fromsqlalchemyimportselect,textstmt=select(# type: ignoreself._table_class.id,self._table_class.node_id,self._table_class.text,self._table_class.metadata_,self._table_class.embedding.cosine_distance(embedding).label("distance"),).order_by(text("distance asc"))returnself._apply_filters_and_limit(stmt,limit,metadata_filters)def_query_with_score(self,embedding:Optional[List[float]],limit:int=10,metadata_filters:Optional[MetadataFilters]=None,**kwargs:Any,)->List[DBEmbeddingRow]:stmt=self._build_query(embedding,limit,metadata_filters)withself._session()assession,session.begin():fromsqlalchemyimporttextifkwargs.get("ivfflat_probes"):session.execute(text(f"SET ivfflat.probes = {kwargs.get('ivfflat_probes')}"))ifkwargs.get("hnsw_ef_search"):session.execute(text(f"SET hnsw.ef_search = {kwargs.get('hnsw_ef_search')}"))res=session.execute(stmt,)return[DBEmbeddingRow(node_id=item.node_id,text=item.text,metadata=item.metadata_,similarity=(1-item.distance)ifitem.distanceisnotNoneelse0,)foriteminres.all()]asyncdef_aquery_with_score(self,embedding:Optional[List[float]],limit:int=10,metadata_filters:Optional[MetadataFilters]=None,**kwargs:Any,)->List[DBEmbeddingRow]:stmt=self._build_query(embedding,limit,metadata_filters)asyncwithself._async_session()asasync_session,async_session.begin():fromsqlalchemyimporttextifkwargs.get("hnsw_ef_search"):awaitasync_session.execute(text(f"SET hnsw.ef_search = {kwargs.get('hnsw_ef_search')}"))ifkwargs.get("ivfflat_probes"):awaitasync_session.execute(text(f"SET ivfflat.probes = {kwargs.get('ivfflat_probes')}"))res=awaitasync_session.execute(stmt)return[DBEmbeddingRow(node_id=item.node_id,text=item.text,metadata=item.metadata_,similarity=(1-item.distance)ifitem.distanceisnotNoneelse0,)foriteminres.all()]def_build_sparse_query(self,query_str:Optional[str],limit:int,metadata_filters:Optional[MetadataFilters]=None,)->Any:fromsqlalchemyimportselect,type_coercefromsqlalchemy.sqlimportfunc,textfromsqlalchemy.typesimportUserDefinedTypeclassREGCONFIG(UserDefinedType):defget_col_spec(self,**kw:Any)->str:return"regconfig"ifquery_strisNone:raiseValueError("query_str must be specified for a sparse vector query.")# Replace '&' with '|' to perform an OR search for higher recallts_query=func.to_tsquery(func.replace(func.text(func.plainto_tsquery(type_coerce(self.text_search_config,REGCONFIG),query_str)),"&","|",))stmt=(select(# type: ignoreself._table_class.id,self._table_class.node_id,self._table_class.text,self._table_class.metadata_,func.ts_rank(self._table_class.text_search_tsv,ts_query).label("rank"),).where(self._table_class.text_search_tsv.op("@@")(ts_query)).order_by(text("rank desc")))# type: ignorereturnself._apply_filters_and_limit(stmt,limit,metadata_filters)asyncdef_async_sparse_query_with_rank(self,query_str:Optional[str]=None,limit:int=10,metadata_filters:Optional[MetadataFilters]=None,)->List[DBEmbeddingRow]:stmt=self._build_sparse_query(query_str,limit,metadata_filters)asyncwithself._async_session()asasync_session,async_session.begin():res=awaitasync_session.execute(stmt)return[DBEmbeddingRow(node_id=item.node_id,text=item.text,metadata=item.metadata_,similarity=item.rank,)foriteminres.all()]def_sparse_query_with_rank(self,query_str:Optional[str]=None,limit:int=10,metadata_filters:Optional[MetadataFilters]=None,)->List[DBEmbeddingRow]:stmt=self._build_sparse_query(query_str,limit,metadata_filters)withself._session()assession,session.begin():res=session.execute(stmt)return[DBEmbeddingRow(node_id=item.node_id,text=item.text,metadata=item.metadata_,similarity=item.rank,)foriteminres.all()]asyncdef_async_hybrid_query(self,query:VectorStoreQuery,**kwargs:Any)->List[DBEmbeddingRow]:importasyncioifquery.alphaisnotNone:_logger.warning("postgres hybrid search does not support alpha parameter.")sparse_top_k=query.sparse_top_korquery.similarity_top_kresults=awaitasyncio.gather(self._aquery_with_score(query.query_embedding,query.similarity_top_k,query.filters,**kwargs,),self._async_sparse_query_with_rank(query.query_str,sparse_top_k,query.filters),)dense_results,sparse_results=resultsall_results=dense_results+sparse_resultsreturn_dedup_results(all_results)def_hybrid_query(self,query:VectorStoreQuery,**kwargs:Any)->List[DBEmbeddingRow]:ifquery.alphaisnotNone:_logger.warning("postgres hybrid search does not support alpha parameter.")sparse_top_k=query.sparse_top_korquery.similarity_top_kdense_results=self._query_with_score(query.query_embedding,query.similarity_top_k,query.filters,**kwargs,)sparse_results=self._sparse_query_with_rank(query.query_str,sparse_top_k,query.filters)all_results=dense_results+sparse_resultsreturn_dedup_results(all_results)def_db_rows_to_query_result(self,rows:List[DBEmbeddingRow])->VectorStoreQueryResult:nodes=[]similarities=[]ids=[]fordb_embedding_rowinrows:try:node=metadata_dict_to_node(db_embedding_row.metadata)node.set_content(str(db_embedding_row.text))exceptException:# NOTE: deprecated legacy logic for backward compatibilitynode=TextNode(id_=db_embedding_row.node_id,text=db_embedding_row.text,metadata=db_embedding_row.metadata,)similarities.append(db_embedding_row.similarity)ids.append(db_embedding_row.node_id)nodes.append(node)returnVectorStoreQueryResult(nodes=nodes,similarities=similarities,ids=ids,)asyncdefaquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:self._initialize()ifquery.mode==VectorStoreQueryMode.HYBRID:results=awaitself._async_hybrid_query(query,**kwargs)elifquery.modein[VectorStoreQueryMode.SPARSE,VectorStoreQueryMode.TEXT_SEARCH,]:sparse_top_k=query.sparse_top_korquery.similarity_top_kresults=awaitself._async_sparse_query_with_rank(query.query_str,sparse_top_k,query.filters)elifquery.mode==VectorStoreQueryMode.DEFAULT:results=awaitself._aquery_with_score(query.query_embedding,query.similarity_top_k,query.filters,**kwargs,)else:raiseValueError(f"Invalid query mode: {query.mode}")returnself._db_rows_to_query_result(results)defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:self._initialize()ifquery.mode==VectorStoreQueryMode.HYBRID:results=self._hybrid_query(query,**kwargs)elifquery.modein[VectorStoreQueryMode.SPARSE,VectorStoreQueryMode.TEXT_SEARCH,]:sparse_top_k=query.sparse_top_korquery.similarity_top_kresults=self._sparse_query_with_rank(query.query_str,sparse_top_k,query.filters)elifquery.mode==VectorStoreQueryMode.DEFAULT:results=self._query_with_score(query.query_embedding,query.similarity_top_k,query.filters,**kwargs,)else:raiseValueError(f"Invalid query mode: {query.mode}")returnself._db_rows_to_query_result(results)defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:importsqlalchemyself._initialize()withself._session()assession,session.begin():stmt=sqlalchemy.text(f"DELETE FROM {self.schema_name}.data_{self.table_name} where "f"(metadata_->>'doc_id')::text = '{ref_doc_id}' ")session.execute(stmt)session.commit()