classNeptuneAnalyticsVectorStore(BasePydanticVectorStore):stores_text:bool=Trueflat_metadata:bool=Truenode_label:strgraph_identifier:strembedding_dimension:inttext_node_property:strhybrid_search:boolretrieval_query:Optional[str]_client:Any=PrivateAttr()def__init__(self,graph_identifier:str,embedding_dimension:int,client:Any=None,credentials_profile_name:Optional[str]=None,region_name:Optional[str]=None,hybrid_search:bool=False,node_label:str="Chunk",text_node_property:str="text",retrieval_query:str=None,**kwargs:Any,)->None:"""Create a new Neptune Analytics graph wrapper instance."""super().__init__(graph_identifier=graph_identifier,embedding_dimension=embedding_dimension,node_label=node_label,text_node_property=text_node_property,hybrid_search=hybrid_search,retrieval_query=retrieval_query,)try:ifclientisnotNone:self._client=clientelse:importboto3ifcredentials_profile_nameisnotNone:session=boto3.Session(profile_name=credentials_profile_name)else:# use default credentialssession=boto3.Session()ifregion_name:self._client=session.client("neptune-graph",region_name=region_name)else:self._client=session.client("neptune-graph")exceptImportError:raiseModuleNotFoundError("Could not import boto3 python package. ""Please install it with `pip install boto3`.")exceptExceptionase:iftype(e).__name__=="UnknownServiceError":raiseModuleNotFoundError("NeptuneGraph requires a boto3 version 1.34.40 or greater.""Please install it with `pip install -U boto3`.")fromeelse:raiseValueError("Could not load credentials to authenticate with AWS client. ""Please check that credentials in the specified ""profile name are valid.")frome# Verify that the analytics graph has a vector search index and that the dimensions matchself._verify_vectorIndex()def_verify_vectorIndex(self)->None:""" Check if the connected Neptune Analytics graph has VSS enabled and that the dimensions are the same. """resp=self._client.get_graph(graphIdentifier=self.graph_identifier)if"vectorSearchConfiguration"inresp:if(notresp["vectorSearchConfiguration"]["dimension"]==self.embedding_dimension):raiseValueError(f"Vector search index dimension for Neptune Analytics graph does not match the provided value.")else:raiseValueError(f"Vector search index does not exist for the Neptune Analytics graph.")@classmethoddefclass_name(cls)->str:return"NeptuneAnalyticsVectorStore"@propertydefclient(self)->Any:returnself._clientdefdatabase_query(self,query:str,params:Optional[dict]=None)->List[Dict[str,Any]]:""" This method sends a query to the Neptune Analytics graph and returns the results as a list of dictionaries. Args: query (str): The openCypher query to execute. params (dict, optional): Dictionary of query parameters. Defaults to {}. Returns: List[Dict[str, Any]]: List of dictionaries containing the query results. """try:logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")resp=self._client.execute_query(graphIdentifier=self.graph_identifier,queryString=query,parameters=params,language="OPEN_CYPHER",)returnjson.loads(resp["payload"].read().decode("UTF-8"))["results"]exceptExceptionase:raiseNeptuneVectorQueryException({"message":"An error occurred while executing the query.","details":str(e),})defadd(self,nodes:List[BaseNode],**add_kwargs:Any)->List[str]:ids=[r.node_idforrinnodes]forrinnodes:import_query=(f"MERGE (c:`{self.node_label}` {{`~id`: $id}}) ""SET c += $data ""WITH c "f"CALL neptune.algo.vectors.upsert(c, {r.embedding}) ""YIELD node ""RETURN id(node) as id")resp=self.database_query(import_query,params=self.__clean_params(r),)print("Nodes added")returnidsdef_get_search_index_query(self,hybrid:bool,k:int=10)->str:ifnothybrid:return("WITH $embedding as emb ""CALL neptune.algo.vectors.topKByEmbedding(emb, {topK: "+str(k)+"}) YIELD embedding, node, score ")else:raiseNotImplementedErrordefquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:default_retrieval=(f"RETURN node.`{self.text_node_property}` AS text, score, ""id(node) AS id, "f"node AS metadata")retrieval_query=self.retrieval_queryordefault_retrievalread_query=(self._get_search_index_query(self.hybrid_search,query.similarity_top_k)+retrieval_query)parameters={"embedding":query.query_embedding,}results=self.database_query(read_query,params=parameters)nodes=[]similarities=[]ids=[]forrecordinresults:node=metadata_dict_to_node(record["metadata"]["~properties"])node.set_content(str(record["text"]))nodes.append(node)similarities.append(record["score"])ids.append(record["id"])returnVectorStoreQueryResult(nodes=nodes,similarities=similarities,ids=ids)defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:self.database_query(f"MATCH (n:`{self.node_label}`) WHERE n.ref_doc_id = $id DETACH DELETE n",params={"id":ref_doc_id},)def__clean_params(self,record:BaseNode)->List[Dict[str,Any]]:"""Convert BaseNode object to a dictionary to be imported into Neo4j."""text=record.get_content(metadata_mode=MetadataMode.NONE)id=record.node_idmetadata=node_to_metadata_dict(record,remove_text=True,flat_metadata=False)# Remove redundant metadata informationforkin["document_id","doc_id"]:delmetadata[k]return{"id":id,"data":{self.text_node_property:text,"id":id,**metadata}}
defdatabase_query(self,query:str,params:Optional[dict]=None)->List[Dict[str,Any]]:""" This method sends a query to the Neptune Analytics graph and returns the results as a list of dictionaries. Args: query (str): The openCypher query to execute. params (dict, optional): Dictionary of query parameters. Defaults to {}. Returns: List[Dict[str, Any]]: List of dictionaries containing the query results. """try:logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")resp=self._client.execute_query(graphIdentifier=self.graph_identifier,queryString=query,parameters=params,language="OPEN_CYPHER",)returnjson.loads(resp["payload"].read().decode("UTF-8"))["results"]exceptExceptionase:raiseNeptuneVectorQueryException({"message":"An error occurred while executing the query.","details":str(e),})