Building a Custom Agent#

In this cookbook we show you how to build a custom agent using LlamaIndex.

The easiest way to build a custom agent is to simply subclass CustomSimpleAgentWorker and implement a few required functions. You have complete flexibility in defining the agent step-wise logic.

This lets you add arbitrarily complex reasoning logic on top of your RAG pipeline.

We show you how to build a simple agent that adds a retry layer on top of a RouterQueryEngine, allowing it to retry queries until the task is complete. We build this on top of both a SQL tool and a vector index query tool. Even if the tool makes an error or only answers part of the question, the agent can continue retrying the question until the task is complete.

Setup the Custom Agent#

Here we setup the custom agent.

Refresher#

An agent in LlamaIndex consists of both an agent runner + agent worker. An agent runner is an orchestrator that stores state like memory, whereas an agent worker controls the step-wise execution of a Task. Agent runners include sequential, parallel execution. More details can be found in our lower level API guide.

Most core agent logic (e.g. ReAct, function calling loops), can be executed in the agent worker. Therefore we’ve made it easy to subclass an agent worker, letting you plug it into any agent runner.

Creating a Custom Agent Worker Subclass#

As mentioned above we subclass CustomSimpleAgentWorker. This is a class that already sets up some scaffolding for you. This includes being able to take in tools, callbacks, LLM, and also ensures that the state/steps are properly formatted. In the meantime you mostly have to implement the following functions:

  • _initialize_state

  • _run_step

  • _finalize_task

Some additional notes:

  • You can implement _arun_step as well if you want to support async chat in the agent.

  • You can choose to override __init__ as long as you pass all remaining args, kwargs to super()

  • CustomSimpleAgentWorker is implemented as a Pydantic BaseModel meaning that you can define your own custom properties as well.

Here are the full set of base properties on each CustomSimpleAgentWorker (that you need to/can pass in when constructing your custom agent):

  • tools: Sequence[BaseTool]

  • tool_retriever: Optional[ObjectRetriever[BaseTool]]

  • llm: LLM

  • callback_manager: CallbackManager

  • verbose: bool

Note that tools and tool_retriever are mutually exclusive, you can only pass in one or the either (e.g. define a static list of tools or define a callable function that returns relevant tools given a user message). You can call get_tools(message: str) to return relevant tools given a message.

All of these properties are accessible via self when defining your custom agent.

%pip install llama-index-readers-wikipedia
%pip install llama-index-llms-openai
from llama_index.core.agent import (
    CustomSimpleAgentWorker,
    Task,
    AgentChatResponse,
)
from typing import Dict, Any, List, Tuple
from llama_index.core.tools import BaseTool, QueryEngineTool
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core import ChatPromptTemplate, PromptTemplate
from llama_index.core.selectors import PydanticSingleSelector
from llama_index.core.bridge.pydantic import Field, BaseModel

Here we define some helper variables and methods. E.g. the prompt template to use to detect errors as well as the response format in Pydantic.

from llama_index.core.llms import ChatMessage, MessageRole

DEFAULT_PROMPT_STR = """
Given previous question/response pairs, please determine if an error has occurred in the response, and suggest \
    a modified question that will not trigger the error.

Examples of modified questions:
- The question itself is modified to elicit a non-erroneous response
- The question is augmented with context that will help the downstream system better answer the question.
- The question is augmented with examples of negative responses, or other negative questions.

An error means that either an exception has triggered, or the response is completely irrelevant to the question.

Please return the evaluation of the response in the following JSON format.

"""


def get_chat_prompt_template(
    system_prompt: str, current_reasoning: Tuple[str, str]
) -> ChatPromptTemplate:
    system_msg = ChatMessage(role=MessageRole.SYSTEM, content=system_prompt)
    messages = [system_msg]
    for raw_msg in current_reasoning:
        if raw_msg[0] == "user":
            messages.append(
                ChatMessage(role=MessageRole.USER, content=raw_msg[1])
            )
        else:
            messages.append(
                ChatMessage(role=MessageRole.ASSISTANT, content=raw_msg[1])
            )
    return ChatPromptTemplate(message_templates=messages)


class ResponseEval(BaseModel):
    """Evaluation of whether the response has an error."""

    has_error: bool = Field(
        ..., description="Whether the response has an error."
    )
    new_question: str = Field(..., description="The suggested new question.")
    explanation: str = Field(
        ...,
        description=(
            "The explanation for the error as well as for the new question."
            "Can include the direct stack trace as well."
        ),
    )
from llama_index.core.bridge.pydantic import PrivateAttr


class RetryAgentWorker(CustomSimpleAgentWorker):
    """Agent worker that adds a retry layer on top of a router.

    Continues iterating until there's no errors / task is done.

    """

    prompt_str: str = Field(default=DEFAULT_PROMPT_STR)
    max_iterations: int = Field(default=10)

    _router_query_engine: RouterQueryEngine = PrivateAttr()

    def __init__(self, tools: List[BaseTool], **kwargs: Any) -> None:
        """Init params."""
        # validate that all tools are query engine tools
        for tool in tools:
            if not isinstance(tool, QueryEngineTool):
                raise ValueError(
                    f"Tool {tool.metadata.name} is not a query engine tool."
                )
        self._router_query_engine = RouterQueryEngine(
            selector=PydanticSingleSelector.from_defaults(),
            query_engine_tools=tools,
            verbose=kwargs.get("verbose", False),
        )
        super().__init__(
            tools=tools,
            **kwargs,
        )

    def _initialize_state(self, task: Task, **kwargs: Any) -> Dict[str, Any]:
        """Initialize state."""
        return {"count": 0, "current_reasoning": []}

    def _run_step(
        self, state: Dict[str, Any], task: Task
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step.

        Returns:
            Tuple of (agent_response, is_done)

        """
        if "new_input" not in state:
            new_input = task.input
        else:
            new_input = state["new_input"]

        # first run router query engine
        response = self._router_query_engine.query(new_input)

        # append to current reasoning
        state["current_reasoning"].extend(
            [("user", new_input), ("assistant", str(response))]
        )

        # Then, check for errors
        # dynamically create pydantic program for structured output extraction based on template
        chat_prompt_tmpl = get_chat_prompt_template(
            self.prompt_str, state["current_reasoning"]
        )
        llm_program = LLMTextCompletionProgram.from_defaults(
            output_parser=PydanticOutputParser(output_cls=ResponseEval),
            prompt=chat_prompt_tmpl,
            llm=self.llm,
        )
        # run program, look at the result
        response_eval = llm_program(
            query_str=new_input, response_str=str(response)
        )
        if not response_eval.has_error:
            is_done = True
        else:
            is_done = False
        state["new_input"] = response_eval.new_question

        if self.verbose:
            print(f"> Question: {new_input}")
            print(f"> Response: {response}")
            print(f"> Response eval: {response_eval.dict()}")

        # return response
        return AgentChatResponse(response=str(response)), is_done

    def _finalize_task(self, state: Dict[str, Any], **kwargs) -> None:
        """Finalize task."""
        # nothing to finalize here
        # this is usually if you want to modify any sort of
        # internal state beyond what is set in `_initialize_state`
        pass

Setup Data and Tools#

We setup both a SQL Tool as well as vector index tools for each city.

from llama_index.core.tools import QueryEngineTool

Setup SQL DB + Tool#

from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
    select,
    column,
)
from llama_index.core import SQLDatabase

engine = create_engine("sqlite:///:memory:", future=True)
metadata_obj = MetaData()
# create city SQL table
table_name = "city_stats"
city_stats_table = Table(
    table_name,
    metadata_obj,
    Column("city_name", String(16), primary_key=True),
    Column("population", Integer),
    Column("country", String(16), nullable=False),
)

metadata_obj.create_all(engine)
from sqlalchemy import insert

rows = [
    {"city_name": "Toronto", "population": 2930000, "country": "Canada"},
    {"city_name": "Tokyo", "population": 13960000, "country": "Japan"},
    {"city_name": "Berlin", "population": 3645000, "country": "Germany"},
]
for row in rows:
    stmt = insert(city_stats_table).values(**row)
    with engine.begin() as connection:
        cursor = connection.execute(stmt)
from llama_index.core.query_engine import NLSQLTableQueryEngine

sql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(
    sql_database=sql_database, tables=["city_stats"], verbose=True
)
sql_tool = QueryEngineTool.from_defaults(
    query_engine=sql_query_engine,
    description=(
        "Useful for translating a natural language query into a SQL query over"
        " a table containing: city_stats, containing the population/country of"
        " each city"
    ),
)

Setup Vector Tools#

from llama_index.readers.wikipedia import WikipediaReader
from llama_index.core import VectorStoreIndex
cities = ["Toronto", "Berlin", "Tokyo"]
wiki_docs = WikipediaReader().load_data(pages=cities)
# build a separate vector index per city
# You could also choose to define a single vector index across all docs, and annotate each chunk by metadata
vector_tools = []
for city, wiki_doc in zip(cities, wiki_docs):
    vector_index = VectorStoreIndex.from_documents([wiki_doc])
    vector_query_engine = vector_index.as_query_engine()
    vector_tool = QueryEngineTool.from_defaults(
        query_engine=vector_query_engine,
        description=f"Useful for answering semantic questions about {city}",
    )
    vector_tools.append(vector_tool)

Build Custom Agent#

from llama_index.core.agent import AgentRunner
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
callback_manager = llm.callback_manager

query_engine_tools = [sql_tool] + vector_tools
agent_worker = RetryAgentWorker.from_tools(
    query_engine_tools,
    llm=llm,
    verbose=True,
    callback_manager=callback_manager,
)
agent = AgentRunner(agent_worker, callback_manager=callback_manager)

Try Out Some Queries#

response = agent.chat("Which countries are each city from?")
print(str(response))
Selecting query engine 0: The choice is about translating a natural language query into a SQL query over a table containing city_stats, which likely includes information about the country of each city..
> Table desc str: Table 'city_stats' has columns: city_name (VARCHAR(16)), population (INTEGER), country (VARCHAR(16)), and foreign keys: .
> Predicted SQL query: SELECT city_name, country FROM city_stats
> Question: Which countries are each city from?
> Response: The city of Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': True, 'new_question': 'Which country is each of the following cities from: Toronto, Tokyo, Berlin?', 'explanation': 'The original question was too vague as it did not specify which cities the question was referring to. The new question provides specific cities for which the country of origin is being asked.'}
Selecting query engine 0: This choice is relevant because it mentions a table containing city_stats, which likely includes information about the country of each city..
> Table desc str: Table 'city_stats' has columns: city_name (VARCHAR(16)), population (INTEGER), country (VARCHAR(16)), and foreign keys: .
> Predicted SQL query: SELECT city_name, country
FROM city_stats
WHERE city_name IN ('Toronto', 'Tokyo', 'Berlin')
> Question: Which country is each of the following cities from: Toronto, Tokyo, Berlin?
> Response: Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
response = agent.chat(
    "What are the top modes of transporation fo the city with the higehest population?"
)
print(str(response))
Selecting query engine 0: The question is asking about the top modes of transportation for the city with the highest population. Choice (1) is the most relevant because it mentions a table containing city_stats, which likely includes information about the population of each city..
> Table desc str: Table 'city_stats' has columns: city_name (VARCHAR(16)), population (INTEGER), country (VARCHAR(16)), and foreign keys: .
> Predicted SQL query: SELECT city_name, population, mode_of_transportation
FROM city_stats
WHERE population = (SELECT MAX(population) FROM city_stats)
ORDER BY mode_of_transportation ASC
LIMIT 5;
> Question: What are the top modes of transporation fo the city with the higehest population?
> Response: I'm sorry, but there was an error in retrieving the information. Please try again later.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation for the city with the highest population?', 'explanation': 'The original question had spelling errors which might have caused the system to not understand the question correctly. The corrected question should now be clear and understandable for the system.'}
Selecting query engine 0: The first choice is the most relevant because it mentions translating a natural language query into a SQL query over a table containing city_stats, which likely includes information about the population of each city..
> Table desc str: Table 'city_stats' has columns: city_name (VARCHAR(16)), population (INTEGER), country (VARCHAR(16)), and foreign keys: .
> Predicted SQL query: SELECT city_name, population, country
FROM city_stats
WHERE population = (SELECT MAX(population) FROM city_stats)
> Question: What are the top modes of transportation for the city with the highest population?
> Response: The city with the highest population is Tokyo, Japan with a population of 13,960,000.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation for Tokyo, Japan?', 'explanation': 'The assistant failed to answer the original question correctly. The response was about the city with the highest population, but it did not mention anything about the top modes of transportation in that city. The new question directly asks about the top modes of transportation in Tokyo, Japan, which is the city with the highest population.'}
Selecting query engine 3: The question specifically asks about Tokyo, and choice (4) is about answering semantic questions about Tokyo..
> Question: What are the top modes of transportation for Tokyo, Japan?
> Response: The top modes of transportation for Tokyo, Japan are trains and subways, which are considered clean and efficient. Tokyo has an extensive network of electric train lines and over 900 train stations. Buses, monorails, and trams also play a secondary role in the public transportation system. Additionally, expressways connect Tokyo to other points in the Greater Tokyo Area and beyond. Taxis and long-distance ferries are also available for transportation within the city and to the surrounding islands.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation for Tokyo, Japan?', 'explanation': 'The original question was not answered correctly because the assistant did not provide information on the top modes of transportation for the city with the highest population. The new question directly asks for the top modes of transportation for Tokyo, Japan, which is the city with the highest population.'}
Selecting query engine 3: Tokyo is mentioned in choice 4.
> Question: What are the top modes of transportation for Tokyo, Japan?
> Response: The top modes of transportation for Tokyo, Japan are trains and subways, which are considered clean and efficient. Tokyo has an extensive network of electric train lines and over 900 train stations. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has two major airports, Narita International Airport and Haneda Airport, which offer domestic and international flights. Expressways and taxis are also available for transportation within the city.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation for Tokyo, Japan?', 'explanation': 'The response is erroneous because it does not answer the question asked. The question asks for the top modes of transportation in the city with the highest population, but the response only provides the population of the city. The new question directly asks for the top modes of transportation in Tokyo, Japan, which is the city with the highest population.'}
Selecting query engine 3: The question specifically asks about Tokyo, and choice 4 is about answering semantic questions about Tokyo..
> Question: What are the top modes of transportation for Tokyo, Japan?
> Response: The top modes of transportation for Tokyo, Japan are trains and subways, which are considered clean and efficient. Tokyo has an extensive network of electric train lines and over 900 train stations. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has two major airports, Narita International Airport and Haneda Airport, which offer domestic and international flights. Expressways and taxis are also available for transportation within the city.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
The top modes of transportation for Tokyo, Japan are trains and subways, which are considered clean and efficient. Tokyo has an extensive network of electric train lines and over 900 train stations. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has two major airports, Narita International Airport and Haneda Airport, which offer domestic and international flights. Expressways and taxis are also available for transportation within the city.
print(str(response))
The top modes of transportation for Tokyo, Japan are trains and subways, which are considered clean and efficient. Tokyo has an extensive network of electric train lines and over 900 train stations. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has two major airports, Narita International Airport and Haneda Airport, which offer domestic and international flights. Expressways and taxis are also available for transportation within the city.
response = agent.chat("What are the sports teams of each city in Asia?")
print(str(response))
Selecting query engine 3: The question is asking about sports teams in Asia, and Tokyo is located in Asia..
> Question: What are the sports teams of each city in Asia?
> Response: I'm sorry, but the context information does not provide a comprehensive list of sports teams in each city in Asia. It only mentions some sports teams in Tokyo, Japan. To get a complete list of sports teams in each city in Asia, you would need to consult a reliable source or conduct further research.
> Response eval: {'has_error': True, 'new_question': 'What are some popular sports teams in Tokyo, Japan?', 'explanation': 'The original question is too broad and requires extensive data that the system may not possess. The new question is more specific and focuses on a single city, making it more likely to receive a correct and comprehensive answer.'}
Selecting query engine 3: The question specifically asks about Tokyo, and choice 4 is about answering semantic questions about Tokyo..
> Question: What are some popular sports teams in Tokyo, Japan?
> Response: Some popular sports teams in Tokyo, Japan include the Yomiuri Giants and Tokyo Yakult Swallows in baseball, F.C. Tokyo and Tokyo Verdy 1969 in soccer, and Hitachi SunRockers, Toyota Alvark Tokyo, and Tokyo Excellence in basketball. Tokyo is also known for its sumo wrestling tournaments held at the Ryōgoku Kokugikan sumo arena.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
Some popular sports teams in Tokyo, Japan include the Yomiuri Giants and Tokyo Yakult Swallows in baseball, F.C. Tokyo and Tokyo Verdy 1969 in soccer, and Hitachi SunRockers, Toyota Alvark Tokyo, and Tokyo Excellence in basketball. Tokyo is also known for its sumo wrestling tournaments held at the Ryōgoku Kokugikan sumo arena.