Workflows#
A Workflow
in LlamaIndex is an event-driven abstraction used to chain together several events. Workflows are made up of steps
, with each step responsible for handling certain event types and emitting new events.
Workflow
s in LlamaIndex work by decorating function with a @step
decorator. This is used to infer the input and output types of each workflow for validation, and ensures each step only runs when an accepted event is ready.
You can create a Workflow
to do anything! Build an agent, a RAG flow, an extraction flow, or anything else you want.
Workflows are also automatically instrumented, so you get observability into each step using tools like Arize Pheonix. (NOTE: Observability works for integrations that take advantage of the newer instrumentation system. Usage may vary.)
Tip
Workflows make async a first-class citizen, and this page assumes you are running in an async environment. What this means for you is setting up your code for async properly. If you are already running in a server like FastAPI, or in a notebook, you can freely use await already!
If you are running your own python scripts, its best practice to have a single async entry point.
async def main():
w = MyWorkflow(...)
result = await w.run(...)
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Getting Started#
As an illustrative example, let's consider a naive workflow where a joke is generated and then critiqued.
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
class JokeFlow(Workflow):
llm = OpenAI()
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
There's a few moving pieces here, so let's go through this piece by piece.
Defining Workflow Events#
class JokeEvent(Event):
joke: str
Events are user-defined pydantic objects. You control the attributes and any other auxiliary methods. In this case, our workflow relies on a single user-defined event, the JokeEvent
.
Setting up the Workflow Class#
class JokeFlow(Workflow):
llm = OpenAI(model="gpt-4o-mini")
...
Our workflow is implemented by subclassing the Workflow
class. For simplicity, we attached a static OpenAI
llm instance.
Workflow Entry Points#
class JokeFlow(Workflow):
...
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
...
Here, we come to the entry-point of our workflow. While events are use-defined, there are two special-case events, the StartEvent
and the StopEvent
. Here, the StartEvent
signifies where to send the initial workflow input.
The StartEvent
is a bit of a special object since it can hold arbitrary attributes. Here, we accessed the topic with ev.topic
, which would raise an error if it wasn't there. You could also do ev.get("topic")
to handle the case where the attribute might not be there without raising an error.
At this point, you may have noticed that we haven't explicitly told the workflow what events are handled by which steps. Instead, the @step
decorator is used to infer the input and output types of each step. Furthermore, these inferred input and output types are also used to verify for you that the workflow is valid before running!
Workflow Exit Points#
class JokeFlow(Workflow):
...
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
...
Here, we have our second, and last step, in the workflow. We know its the last step because the special StopEvent
is returned. When the workflow encounters a returned StopEvent
, it immediately stops the workflow and returns whatever the result was.
In this case, the result is a string, but it could be a dictionary, list, or any other object.
Running the Workflow#
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
Lastly, we create and run the workflow. There are some settings like timeouts (in seconds) and verbosity to help with debugging.
The .run()
method is async, so we use await here to wait for the result.
Drawing the Workflow#
Workflows can be visualized, using the power of type annotations in your step definitions. You can either draw all possible paths through the workflow, or the most recent execution, to help with debugging.
Firs install:
pip install llama-index-utils-workflow
Then import and use:
from llama_index.utils.workflow import (
draw_all_possible_flows,
draw_most_recent_execution,
)
# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")
Working with Global Context/State#
Optionally, you can choose to use global context between steps. For example, maybe multiple steps access the original query
input from the user. You can store this in global context so that every step has access.
from llama_index.core.workflow import Context
@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:
# retrieve from context
query = await ctx.get("query")
# do something with context and event
val = ...
result = ...
# store in context
await ctx.set("key", val)
return StopEvent(result=result)
Waiting for Multiple Events#
The context does more than just hold data, it also provides utilities to buffer and wait for multiple events.
For example, you might have a step that waits for a query and retrieved nodes before synthesizing a response:
from llama_index.core import get_response_synthesizer
@step
async def synthesize(
self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:
data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
# check if we can run
if data is None:
return None
# unpack -- data is returned in order
query_event, retrieve_event = data
# run response synthesis
synthesizer = get_response_synthesizer()
response = synthesizer.synthesize(
query_event.query, nodes=retrieve_event.nodes
)
return StopEvent(result=response)
Using ctx.collect_events()
we can buffer and wait for ALL expected events to arrive. This function will only return data (in the requested order) once all events have arrived.
Manually Triggering Events#
Normally, events are triggered by returning another event during a step. However, events can also be manually dispatched using the ctx.send_event(event)
method within a workflow.
Here is a short toy example showing how this would be used:
from llama_index.core.workflow import step, Context, Event, Workflow
class MyEvent(Event):
pass
class MyEventResult(Event):
result: str
class GatherEvent(Event):
pass
class MyWorkflow(Workflow):
@step
async def dispatch_step(
self, ctx: Context, ev: StartEvent
) -> MyEvent | GatherEvent:
ctx.send_event(MyEvent())
ctx.send_event(MyEvent())
return GatherEvent()
@step
async def handle_my_event(self, ev: MyEvent) -> MyEventResult:
return MyEventResult(result="result")
@step
async def gather(
self, ctx: Context, ev: GatherEvent | MyEventResult
) -> StopEvent | None:
# wait for events to finish
events = ctx.collect_events([MyEventResult, MyEventResult])
if not events:
return None
return StopEvent(result=events)
Stepwise Execution#
Workflows have built-in utilities for stepwise execution, allowing you to control execution and debug state as things progress.
w = JokeFlow(...)
# Kick off the workflow
w.run_step(topic="Pirates")
# Iterate until done
while not w.is_done():
w.run_step()
# Get the final result
result = w.get_result()
Decorating non-class Functions#
You can also decorate and attach steps to a workflow without subclassing it.
Below is the JokeFlow
from earlier, but defined without subclassing.
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
joke_flow = Workflow(timeout=60, verbose=True)
@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
llm = OpenAI()
response = await llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = (
f"Give a thorough analysis and critique of the following joke: {joke}"
)
response = await llm.acomplete(prompt)
return StopEvent(result=str(response))
Examples#
You can find many useful examples of using workflows in the notebooks below:
- Advanced Text-to-SQL
- Citation Query Engine
- Common Workflow Patterns
- Corrective RAG
- Function Calling Agent
- JSON Query Engine
- Long RAG
- Multi-Step Query Engine
- Multi-Strategy Workflow
- RAG + Reranking
- ReAct Agent
- Reliable Structured Generation
- Router Query Engine
- Self Discover Workflow
- Sub-Question Query Engine
- Utilizing Concurrency