Agentic rag using vertex ai
Build Agentic RAG with Llamaindex for Vertex AI¶
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Author(s) | Dave Wang |
Install Libraries¶
!pip install --upgrade google-cloud-aiplatform==1.53 llama-index-vector-stores-vertexaivectorsearch
Install testing libraries¶
- llama-index-core
- llama-index-llms-vertex
The current official version 0.1.8 of llama-index-llms-vertex (https://pypi.org/project/llama-index-llms-vertex/) does not support Llamaindex function calling and agent for Vertex AI.
This testing repo supports these functions. 6/11/2024
# uninstall existing llama_index
!pip uninstall llama-index -y
!git clone https://github.com/wadave/llama_index.git
%%bash
cd llama_index
pip install -e llama-index-core
pip install -e llama-index-integrations/llms/llama-index-llms-vertex
pip install -e .
Restart current runtime¶
To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which will restart the current kernel.
# Colab only
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Authenticate your notebook environment (Colab only)¶
If you are running this notebook on Google Colab, you will need to authenticate your environment. To do this, run the new cell below. This step is not required if you are using Vertex AI Workbench.
# Colab only
import sys
if "google.colab" in sys.modules:
from google.colab import auth
auth.authenticate_user()
# If you're using JupyterLab instance, uncomment and run the below code.
#!gcloud auth login
Define Google Cloud project information and initialize Vertex AI¶
Initialize the Vertex AI SDK for Python for your project:
# TODO : Set values as per your requirements
# Project and Storage Constants
PROJECT_ID = "<your project>"
REGION = "<your region>"
GCS_BUCKET_NAME = f"{PROJECT_ID}"
GCS_BUCKET_URI = f"gs://{GCS_BUCKET_NAME}"
# The number of dimensions for the textembedding-gecko@003 is 768
# If other embedder is used, the dimensions would probably need to change.
VS_DIMENSIONS = 768
# Vertex AI Vector Search Index configuration
# parameter description here
# https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.MatchingEngineIndex#google_cloud_aiplatform_MatchingEngineIndex_create_tree_ah_index
VS_INDEX_NAME = "vertex_vector_search_index" # @param {type:"string"}
VS_INDEX_ENDPOINT_NAME = "vector_search_endpoint" # @param {type:"string"}
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION)
Download Sample Documents for Testing¶
urls = [
"https://openreview.net/pdf?id=VtmBAGCN7o",
"https://openreview.net/pdf?id=6PmJoRfdaK",
"https://openreview.net/pdf?id=LzPWWPAdY4",
"https://openreview.net/pdf?id=VTF8yNQM66",
"https://openreview.net/pdf?id=hSyW5go0v8",
"https://openreview.net/pdf?id=9WD9KwssyT",
"https://openreview.net/pdf?id=yV6fD7LYkF",
"https://openreview.net/pdf?id=hnrB5YHoYu",
"https://openreview.net/pdf?id=WbWtOYIzIK",
"https://openreview.net/pdf?id=c5pwL0Soay",
"https://openreview.net/pdf?id=TpD2aG1h0D",
]
papers = [
"metagpt.pdf",
"longlora.pdf",
"loftq.pdf",
"swebench.pdf",
"selfrag.pdf",
"zipformer.pdf",
"values.pdf",
"finetune_fair_diffusion.pdf",
"knowledge_card.pdf",
"metra.pdf",
"vr_mcl.pdf",
]
import requests
def download_file(url, file_path):
"""Downloads a file from a given URL and saves it to the specified file path.
Args:
url: The URL of the file to download.
file_path: The path to save the downloaded file.
"""
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an exception for non-200 status codes
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # Filter out keep-alive new chunks
f.write(chunk)
print(f"Downloaded file from {url} to {file_path}")
for url, paper in zip(urls, papers):
download_file(url, paper)
Enable async for the notebook¶
import nest_asyncio
nest_asyncio.apply()
Set Up Vector Store¶
Here're two options for using Vector Search
- Option 1: Createa a new Vertex AI Vector Search
- Option 2: If you have an existing Vector Search store, you can use the existing one.
Option 1: Create a new Vertex AI Vector Search¶
Create an empty index
# check if index exists
index_names = [
index.resource_name
for index in aiplatform.MatchingEngineIndex.list(
filter=f"display_name={VS_INDEX_NAME}"
)
]
if len(index_names) == 0:
print(f"Creating Vector Search index {VS_INDEX_NAME} ...")
vs_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
display_name=VS_INDEX_NAME,
dimensions=VS_DIMENSIONS,
distance_measure_type="DOT_PRODUCT_DISTANCE",
approximate_neighbors_count=150,
shard_size="SHARD_SIZE_SMALL",
index_update_method="STREAM_UPDATE", # allowed values BATCH_UPDATE , STREAM_UPDATE
)
print(
f"Vector Search index {vs_index.display_name} created with resource name {vs_index.resource_name}"
)
else:
vs_index = aiplatform.MatchingEngineIndex(index_name=index_names[0])
print(
f"Vector Search index {vs_index.display_name} exists with resource name {vs_index.resource_name}"
)
Create an endpoint
endpoint_names = [
endpoint.resource_name
for endpoint in aiplatform.MatchingEngineIndexEndpoint.list(
filter=f"display_name={VS_INDEX_ENDPOINT_NAME}"
)
]
if len(endpoint_names) == 0:
print(
f"Creating Vector Search index endpoint {VS_INDEX_ENDPOINT_NAME} ..."
)
vs_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
display_name=VS_INDEX_ENDPOINT_NAME, public_endpoint_enabled=True
)
print(
f"Vector Search index endpoint {vs_endpoint.display_name} created with resource name {vs_endpoint.resource_name}"
)
else:
vs_endpoint = aiplatform.MatchingEngineIndexEndpoint(
index_endpoint_name=endpoint_names[0]
)
print(
f"Vector Search index endpoint {vs_endpoint.display_name} exists with resource name {vs_endpoint.resource_name}"
)
Deploy index to endpoint
# check if endpoint exists
# it takes about 30 mins to finish
index_endpoints = [
(deployed_index.index_endpoint, deployed_index.deployed_index_id)
for deployed_index in vs_index.deployed_indexes
]
if len(index_endpoints) == 0:
print(
f"Deploying Vector Search index {vs_index.display_name} at endpoint {vs_endpoint.display_name} ..."
)
vs_deployed_index = vs_endpoint.deploy_index(
index=vs_index,
deployed_index_id=VS_INDEX_NAME,
display_name=VS_INDEX_NAME,
machine_type="e2-standard-16",
min_replica_count=1,
max_replica_count=1,
)
print(
f"Vector Search index {vs_index.display_name} is deployed at endpoint {vs_deployed_index.display_name}"
)
else:
vs_deployed_index = aiplatform.MatchingEngineIndexEndpoint(
index_endpoint_name=index_endpoints[0][0]
)
print(
f"Vector Search index {vs_index.display_name} is already deployed at endpoint {vs_deployed_index.display_name}"
)
Option 2: Use an existing Vertex AI Vector Search¶
# TODO : replace 1234567890123456789 with your actual index ID
vs_index = aiplatform.MatchingEngineIndex(index_name="<your index id>")
# TODO : replace 1234567890123456789 with your actual endpoint ID
vs_endpoint = aiplatform.MatchingEngineIndexEndpoint(
index_endpoint_name="<your endpoint id>"
)
Import libraries¶
# import modules needed
from llama_index.core import (
StorageContext,
Settings,
VectorStoreIndex,
SummaryIndex,
SimpleDirectoryReader,
)
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores.types import (
MetadataFilters,
MetadataFilter,
FilterOperator,
)
from llama_index.llms.vertex import Vertex
from llama_index.embeddings.vertex import VertexTextEmbedding
from llama_index.vector_stores.vertexaivectorsearch import VertexAIVectorStore
from typing import List, Optional
from llama_index.core.vector_stores import FilterCondition
from llama_index.core.tools import FunctionTool
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.tools import QueryEngineTool
from llama_index.core.vector_stores import MetadataFilters
from pathlib import Path
from llama_index.core.agent import FunctionCallingAgentWorker
from llama_index.core.agent import AgentRunner
Set up Vector Search Store¶
# setup vector store
vector_store = VertexAIVectorStore(
project_id=PROJECT_ID,
region=REGION,
index_id=vs_index.name,
endpoint_id=vs_endpoint.name,
gcs_bucket_name=GCS_BUCKET_NAME,
)
# set storage context
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# configure embedding model
embed_model = VertexTextEmbedding(
model_name="textembedding-gecko@003",
project=PROJECT_ID,
location=REGION,
)
vertex_gemini = Vertex(
model="gemini-1.5-pro-preview-0514", temperature=0, additional_kwargs={}
)
# setup the index/query process, ie the embedding model (and completion if used)
Settings.embed_model = embed_model
Settings.llm = vertex_gemini
Task 1: Router query engine¶
Create vector index¶
# load documents
documents = SimpleDirectoryReader(input_files=["metagpt.pdf"]).load_data()
# define index from vector store
vector_index = VectorStoreIndex.from_documents(
documents, storage_context=storage_context
)
splitter = SentenceSplitter(chunk_size=1024)
nodes = splitter.get_nodes_from_documents(documents)
Create summary index¶
summary_index = SummaryIndex(nodes)
Create query engine from vector store¶
summary_query_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
vector_query_engine = vector_index.as_query_engine()
summary_query_engine.query("what's the summary of the document?")
Create tools from query engines¶
summary_tool = QueryEngineTool.from_defaults(
query_engine=summary_query_engine,
description=("Useful for summarization questions related to MetaGPT"),
)
vector_tool = QueryEngineTool.from_defaults(
query_engine=vector_query_engine,
description=(
"Useful for retrieving specific context from the MetaGPT paper."
),
)
from llama_index.core.query_engine.router_query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
query_engine = RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=[
summary_tool,
vector_tool,
],
verbose=True,
)
response = query_engine.query("What is the summary of the document?")
print(str(response))
print(len(response.source_nodes))
response = query_engine.query(
"How do agents share information with other agents?"
)
print(str(response))
Task 2: Tool calling¶
Create auto-retrieval tools with parameters¶
query_engine = vector_index.as_query_engine(
similarity_top_k=2,
filters=MetadataFilters.from_dicts([{"key": "page_label", "value": "2"}]),
)
response = query_engine.query(
"What are some high-level results of MetaGPT?",
)
summary_query_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
summary_tool = QueryEngineTool.from_defaults(
query_engine=summary_query_engine,
description=("Useful for summarization questions related to MetaGPT"),
)
print(str(response))
for n in response.source_nodes:
print(n.metadata)
Define auto-retrieval tools for function calling¶
def vector_query(query: str, page_numbers: List[str]) -> str:
"""Perform a vector search over an index.
query (str): the string query to be embedded.
page_numbers (List[str]): Filter by set of pages. Leave BLANK if we want to perform a vector search
over all pages. Otherwise, filter by the set of specified pages.
"""
metadata_dicts = [{"key": "page_label", "value": p} for p in page_numbers]
query_engine = vector_index.as_query_engine(
similarity_top_k=2,
filters=MetadataFilters.from_dicts(
metadata_dicts, condition=FilterCondition.OR
),
)
response = query_engine.query(query)
return response
vector_query_tool = FunctionTool.from_defaults(
fn=vector_query,
# name='vector_query'
)
def summary_query(
query: str,
) -> str:
"""Perform a summary of document
query (str): the string query to be embedded.
"""
summary_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
response = summary_engine.query(query)
return response
summary_tool = FunctionTool.from_defaults(
fn=summary_query,
# name='summary_query'
)
response = vertex_gemini.predict_and_call(
[vector_query_tool, summary_tool],
"What are the MetaGPT comparisons with ChatDev described on page 8?",
verbose=True,
)
for n in response.source_nodes:
print(n.metadata)
response = vertex_gemini.predict_and_call(
[summary_tool, vector_query_tool],
"What is a summary of the paper?",
verbose=True,
)
Task 3: Building an Agent Reasoning Loop¶
# TODO: abstract all of this into a function that takes in a PDF file name
def get_doc_tools(
file_path: str,
name: str,
) -> str:
"""Get vector query and summary query tools from a document."""
# load documents
documents = SimpleDirectoryReader(input_files=[file_path]).load_data()
splitter = SentenceSplitter(chunk_size=1024)
nodes = splitter.get_nodes_from_documents(documents)
vector_index = VectorStoreIndex.from_documents(
documents, storage_context=storage_context
)
summary_index = SummaryIndex(nodes)
def vector_query(
query: str, page_numbers: Optional[List[str]] = None
) -> str:
"""Use to answer questions over the MetaGPT paper.
Useful if you have specific questions over the MetaGPT paper.
Always leave page_numbers as None UNLESS there is a specific page you want to search for.
Args:
query (str): the string query to be embedded.
page_numbers (Optional[List[str]]): Filter by set of pages. Leave as NONE
if we want to perform a vector search
over all pages. Otherwise, filter by the set of specified pages.
"""
page_numbers = page_numbers or []
metadata_dicts = [
{"key": "page_label", "value": p} for p in page_numbers
]
query_engine = vector_index.as_query_engine(
similarity_top_k=2,
filters=MetadataFilters.from_dicts(
metadata_dicts, condition=FilterCondition.OR
),
)
response = query_engine.query(query)
return response
vector_query_tool = FunctionTool.from_defaults(
name=f"vector_tool_{name}", fn=vector_query
)
def summary_query(
query: str,
) -> str:
"""Perform a summary of document
query (str): the string query to be embedded.
"""
summary_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
response = summary_engine.query(query)
return response
summary_tool = FunctionTool.from_defaults(
fn=summary_query, name=f"summary_tool_{name}"
)
return vector_query_tool, summary_tool
vector_query_tool, summary_tool = get_doc_tools("metagpt.pdf", "metagpt")
# Create Vertex AI client
vertex_gemini = Vertex(model="gemini-1.5-pro-preview-0514")
# Create Agent Runner
agent_worker = FunctionCallingAgentWorker.from_tools(
[vector_query_tool, summary_tool], llm=vertex_gemini, verbose=True
)
agent = AgentRunner(agent_worker)
response = agent.query(
"what are agent roles in MetaGPT, "
"and then how they communicate with each other."
)
Task 4: Multi-document agent¶
papers = [
"metagpt.pdf",
"longlora.pdf",
"loftq.pdf",
"swebench.pdf",
"selfrag.pdf",
"zipformer.pdf",
"values.pdf",
"finetune_fair_diffusion.pdf",
"knowledge_card.pdf",
"metra.pdf",
]
paper_to_tools_dict = {}
for paper in papers:
print(f"Getting tools for paper: {paper}")
vector_tool, summary_tool = get_doc_tools(paper, Path(paper).stem)
paper_to_tools_dict[paper] = [vector_tool, summary_tool]
all_tools = [t for paper in papers for t in paper_to_tools_dict[paper]]
# define an "object" index and retriever over these tools
from llama_index.core import VectorStoreIndex
from llama_index.core.objects import ObjectIndex
obj_index = ObjectIndex.from_objects(
all_tools,
index_cls=VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)
agent_worker = FunctionCallingAgentWorker.from_tools(
tool_retriever=obj_retriever,
llm=vertex_gemini,
system_prompt=""" \
You are an agent designed to answer queries over a set of given papers.
Please use the tools provided to answer a question as possible. Do not rely on prior knowledge. Summarize your answer\
""",
verbose=True,
)
agent = AgentRunner(agent_worker)
response = agent.query(
"What is the evaluation dataset used in MetaGPT? Compare it against SWE-Bench"
)
print(str(response))
response = agent.query(
"Compare and contrast the LoRA papers (LongLoRA, LoftQ). "
"Analyze the approach in each paper first. "
)