Workflow for a ReAct Agent¶
This notebook walks through setting up a Workflow
to construct a ReAct agent from (mostly) scratch.
React calling agents work by prompting an LLM to either invoke tools/functions, or return a final response.
Our workflow will be stateful with memory, and will be able to call the LLM to select tools and process incoming user messages.
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj--..."
[Optional] Set up observability with Llamatrace¶
Set up tracing to visualize each step in the workflow.
!pip install "llama-index-core>=0.10.43" "openinference-instrumentation-llama-index>=2" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Add Phoenix API Key for tracing
PHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
)
# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)
# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use asyncio.run()
to start an async event loop if one isn't already running.
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Designing the Workflow¶
An agent consists of several steps
- Handling the latest incoming user message, including adding to memory and preparing the chat history
- Using the chat history and tools to construct a ReAct prompt
- Calling the llm with the react prompt, and parsing out function/tool calls
- If no tool calls, we can return
- If there are tool calls, we need to execute them, and then loop back for a fresh ReAct prompt using the latest tool calls
The Workflow Events¶
To handle these steps, we need to define a few events:
- An event to handle new messages and prepare the chat history
- An event to prompt the LLM with the react prompt
- An event to trigger tool calls, if any
- An event to handle the results of tool calls, if any
The other steps will use the built-in StartEvent
and StopEvent
events.
In addition to events, we will also use the global context to store the current react reasoning!
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class PrepEvent(Event):
pass
class InputEvent(Event):
input: list[ChatMessage]
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
The Workflow Itself¶
With our events defined, we can construct our workflow and steps.
Note that the workflow automatically validates itself using type annotations, so the type annotations on our steps are very helpful!
from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class ReActAgent(Workflow):
def __init__(
self,
*args: Any,
llm: LLM | None = None,
tools: list[BaseTool] | None = None,
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
self.formatter = ReActChatFormatter(context=extra_context or "")
self.output_parser = ReActOutputParser()
self.sources = []
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
# clear sources
self.sources = []
# get user input
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
self.memory.put(user_msg)
# clear current reasoning
await ctx.set("current_reasoning", [])
return PrepEvent()
@step
async def prepare_chat_history(
self, ctx: Context, ev: PrepEvent
) -> InputEvent:
# get chat history
chat_history = self.memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
return InputEvent(input=llm_input)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
response = await self.llm.achat(chat_history)
try:
reasoning_step = self.output_parser.parse(response.message.content)
(await ctx.get("current_reasoning", default=[])).append(
reasoning_step
)
if reasoning_step.is_done:
self.memory.put(
ChatMessage(
role="assistant", content=reasoning_step.response
)
)
return StopEvent(
result={
"response": reasoning_step.response,
"sources": [*self.sources],
"reasoning": await ctx.get(
"current_reasoning", default=[]
),
}
)
elif isinstance(reasoning_step, ActionReasoningStep):
tool_name = reasoning_step.action
tool_args = reasoning_step.action_input
return ToolCallEvent(
tool_calls=[
ToolSelection(
tool_id="fake",
tool_name=tool_name,
tool_kwargs=tool_args,
)
]
)
except Exception as e:
(await ctx.get("current_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"There was an error in parsing my reasoning: {e}"
)
)
# if no tool calls or final response, iterate again
return PrepEvent()
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
# call tools -- safely!
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
(await ctx.get("current_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"Tool {tool_call.tool_name} does not exist"
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
self.sources.append(tool_output)
(await ctx.get("current_reasoning", default=[])).append(
ObservationReasoningStep(observation=tool_output.content)
)
except Exception as e:
(await ctx.get("current_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
)
)
# prep the next iteraiton
return PrepEvent()
And thats it! Let's explore the workflow we wrote a bit.
new_user_msg()
:
Adds the user message to memory, and clears the global context to keep track of a fresh string of reasoning.
prepare_chat_history()
:
Prepares the react prompt, using the chat history, tools, and current reasoning (if any)
handle_llm_input()
:
Prompts the LLM with our react prompt, and uses some utility functions to parse the output. If there are no tool calls, we can stop and emit a StopEvent
. Otherwise, we emit a ToolCallEvent
to handle tool calls. Lastly, if there are no tool calls, and no final response, we simply loop again.
handle_tool_calls()
:
Safely calls tools with error handling, adding the tool outputs to the current reasoning. Then, by emitting a PrepEvent
, we loop around for another round of ReAct prompting and parsing.
Run the Workflow!¶
NOTE: With loops, we need to be mindful of runtime. Here, we set a timeout of 120s.
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = ReActAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
Hello! How can I assist you today? ```
ret = await agent.run(input="What is (2123 + 2321) * 312?")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
The result of (2123 + 2321) * 312 is 1,386,528.