AgentWorkflow Basic Introduction¶
The AgentWorkflow
is an orchestrator for running a system of one or more agents. In this example, we'll create a simple workflow with a single agent, and use that to cover the basic functionality of the AgentWorkflow
class.
%pip install llama-index
Setup¶
In this example, we will use OpenAI
as our LLM. For all LLMs, check out the examples documentation or LlamaHub for a list of all supported LLMs and how to install/use them.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o-mini", api_key="sk-...")
To make our agent more useful, we can give it tools/actions to use. In this case, we'll use Tavily to implement a tool that can search the web for information. You can get a free API key from Tavily.
%pip install tavily-python
When creating a tool, its very important to:
- give the tool a proper name and docstring/description. The LLM uses this to understand what the tool does.
- annotate the types. This helps the LLM understand the expected input and output types.
- use async when possible, since this will make the workflow more efficient.
from tavily import AsyncTavilyClient
async def search_web(query: str) -> str:
"""Useful for using the web to answer questions."""
client = AsyncTavilyClient(api_key="tvly-...")
return str(await client.search(query))
With the tool and and LLM defined, we can create an AgentWorkflow
that uses the tool.
from llama_index.core.agent.workflow import AgentWorkflow
workflow = AgentWorkflow.from_tools_or_functions(
[search_web],
llm=llm,
system_prompt="You are a helpful assistant that can search the web for information.",
)
Running the Agent¶
Now that our agent is created, we can run it!
response = await workflow.run(user_msg="What is the weather in San Francisco?")
print(str(response))
The current weather in San Francisco is as follows: - **Temperature**: 13.3°C (55.9°F) - **Condition**: Sunny - **Wind**: 4.0 mph (6.5 kph) from the NNE - **Humidity**: 57% - **Pressure**: 1021 mb (30.16 in) - **Visibility**: 16 km (9 miles) For more details, you can check the weather [here](https://www.weatherapi.com/).
Maintaining State¶
By default, the AgentWorkflow
will maintain statless between runs. This means that the agent will not have any memory of previous runs.
To maintain state, we need to keep track of the previous state. Since the AgentWorkflow
is a Workflow
, the state is stored in the Context
. This can be passed between runs to maintain state and history.
from llama_index.core.workflow import Context
ctx = Context(workflow)
response = await workflow.run(
user_msg="My name is Logan, nice to meet you!", ctx=ctx
)
print(str(response))
Nice to meet you, Logan! How can I assist you today?
response = await workflow.run(user_msg="What is my name?", ctx=ctx)
print(str(response))
Your name is Logan.
The context is serializable, so it can be saved to a database, file, etc. and loaded back in later.
The JsonSerializer
is a simple serializer that uses json.dumps
and json.loads
to serialize and deserialize the context.
The JsonPickleSerializer
is a serializer that uses pickle
to serialize and deserialize the context. If you have objects in your context that are not serializable, you can use this serializer.
from llama_index.core.workflow import JsonPickleSerializer, JsonSerializer
ctx_dict = ctx.to_dict(serializer=JsonSerializer())
restored_ctx = Context.from_dict(
workflow, ctx_dict, serializer=JsonSerializer()
)
response = await workflow.run(
user_msg="Do you still remember my name?", ctx=restored_ctx
)
print(str(response))
Yes, I remember your name is Logan.
Streaming¶
The AgentWorkflow
also supports streaming. Since the AgentWorkflow
is a Workflow
, it can be streamed like any other Workflow
. This works by using the handler that is returned from the workflow. There are a few key events that are streamed, feel free to explore below.
If you only want to stream the LLM output, you can use the AgentStream
events.
from llama_index.core.agent.workflow import (
AgentInput,
AgentOutput,
ToolCall,
ToolCallResult,
AgentStream,
)
handler = workflow.run(user_msg="What is the weather in Saskatoon?")
async for event in handler.stream_events():
if isinstance(event, AgentStream):
print(event.delta, end="", flush=True)
# print(event.response) # the current full response
# print(event.raw) # the raw llm api response
# print(event.current_agent_name) # the current agent name
# elif isinstance(event, AgentInput):
# print(event.input) # the current input messages
# print(event.current_agent_name) # the current agent name
# elif isinstance(event, AgentOutput):
# print(event.response) # the current full response
# print(event.tool_calls) # the selected tool calls, if any
# print(event.raw) # the raw llm api response
# elif isinstance(event, ToolCallResult):
# print(event.tool_name) # the tool name
# print(event.tool_kwargs) # the tool kwargs
# print(event.tool_output) # the tool output
# elif isinstance(event, ToolCall):
# print(event.tool_name) # the tool name
# print(event.tool_kwargs) # the tool kwargs
The current weather in Saskatoon is as follows: - **Temperature**: 0.1°C (32.2°F) - **Condition**: Partly cloudy - **Wind**: 13.2 mph (21.2 kph) from the SSE - **Humidity**: 80% - **Feels Like**: -5.3°C (22.5°F) - **Visibility**: 24 km (14 miles) For more details, you can check the full weather report [here](https://www.weatherapi.com/).
Tools and State¶
Tools can also be defined that have access to the workflow context. This means you can set and retrieve variables from the context and use them in the tool or between tools.
Note: The Context
parameter should be the first parameter of the tool.
from llama_index.core.workflow import Context
async def set_name(ctx: Context, name: str) -> str:
await ctx.set("name", name)
return f"Name set to {name}"
workflow = AgentWorkflow.from_tools_or_functions(
[set_name],
llm=llm,
system_prompt="You are a helpful assistant that can set a name.",
initial_state={"name": "unset"},
)
ctx = Context(workflow)
response = await workflow.run(user_msg="My name is Logan", ctx=ctx)
print(str(response))
name = await ctx.get("name")
print(name)
Your name has been set to Logan. Logan
Human in the Loop¶
Tools can also be defined that involve a human in the loop. This is useful for tasks that require human input, such as confirming a tool call or providing feedback.
Using workflow events, we can emit events that require a response from the user. Here, we use the built-in InputRequiredEvent
and HumanResponseEvent
to handle the human in the loop, but you can also define your own events.
from llama_index.core.workflow import (
Context,
InputRequiredEvent,
HumanResponseEvent,
)
async def dangerous_task(ctx: Context) -> str:
"""A dangerous task that requires human confirmation."""
ctx.write_event_to_stream(
InputRequiredEvent(
prefix="Are you sure you want to proceed?",
user_name="Logan",
)
)
response = await ctx.wait_for_event(
HumanResponseEvent, requirements={"user_name": "Logan"}
)
if response.response == "yes":
return "Dangerous task completed successfully."
else:
return "Dangerous task aborted."
workflow = AgentWorkflow.from_tools_or_functions(
[dangerous_task],
llm=llm,
system_prompt="You are a helpful assistant that can perform dangerous tasks.",
)
handler = workflow.run(user_msg="I want to proceed with the dangerous task.")
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
response = input(event.prefix).strip().lower()
handler.ctx.send_event(
HumanResponseEvent(
response=response,
user_name=event.user_name,
)
)
response = await handler
print(str(response))
The dangerous task has been completed successfully. If you need anything else, feel free to ask!
In production scenarios, you might handle human-in-the-loop over a websocket or multiple API requests.
As mentioned before, the Context
object is serializable, and this means we can also save the workflow mid-run and restore it later.
NOTE: Any functions/steps that were in-progress will start from the beginning when the workflow is restored.
from llama_index.core.workflow import JsonSerializer
handler = workflow.run(user_msg="I want to proceed with the dangerous task.")
input_ev = None
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
input_ev = event
break
# save the context somewhere for later
ctx_dict = handler.ctx.to_dict(serializer=JsonSerializer())
# get the response from the user
response_str = input(input_ev.prefix).strip().lower()
# restore the workflow
restored_ctx = Context.from_dict(
workflow, ctx_dict, serializer=JsonSerializer()
)
handler = workflow.run(ctx=restored_ctx)
handler.ctx.send_event(
HumanResponseEvent(
response=response_str,
user_name=input_ev.user_name,
)
)
response = await handler
print(str(response))
The dangerous task has been initiated. Please confirm if you would like to proceed with it.