Skip to content

Workflow

AgentWorkflow #

Bases: Workflow, PromptMixin

A workflow for managing multiple agents with handoffs.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta):
    """A workflow for managing multiple agents with handoffs."""

    def __init__(
        self,
        agents: List[BaseWorkflowAgent],
        initial_state: Optional[Dict] = None,
        root_agent: Optional[str] = None,
        handoff_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        timeout: Optional[float] = None,
        **workflow_kwargs: Any,
    ):
        super().__init__(timeout=timeout, **workflow_kwargs)
        if not agents:
            raise ValueError("At least one agent must be provided")

        self.agents = {cfg.name: cfg for cfg in agents}
        if len(agents) == 1:
            root_agent = agents[0].name
        elif root_agent is None:
            raise ValueError("Exactly one root agent must be provided")
        else:
            root_agent = root_agent

        if root_agent not in self.agents:
            raise ValueError(f"Root agent {root_agent} not found in provided agents")

        self.root_agent = root_agent
        self.initial_state = initial_state or {}

        handoff_prompt = handoff_prompt or DEFAULT_HANDOFF_PROMPT
        if isinstance(handoff_prompt, str):
            handoff_prompt = PromptTemplate(handoff_prompt)
            if "{agent_info}" not in handoff_prompt.get_template():
                raise ValueError("Handoff prompt must contain {agent_info}")
        self.handoff_prompt = handoff_prompt

        state_prompt = state_prompt or DEFAULT_STATE_PROMPT
        if isinstance(state_prompt, str):
            state_prompt = PromptTemplate(state_prompt)
            if (
                "{state}" not in state_prompt.get_template()
                or "{msg}" not in state_prompt.get_template()
            ):
                raise ValueError("State prompt must contain {state} and {msg}")
        self.state_prompt = state_prompt

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {
            "handoff_prompt": self.handoff_prompt,
            "state_prompt": self.state_prompt,
        }

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {agent.name: agent for agent in self.agents.values()}

    def _update_prompts(self, prompts_dict: PromptDictType) -> None:
        """Update prompts."""
        if "handoff_prompt" in prompts_dict:
            self.handoff_prompt = prompts_dict["handoff_prompt"]
        if "state_prompt" in prompts_dict:
            self.state_prompt = prompts_dict["state_prompt"]

    def _ensure_tools_are_async(
        self, tools: Sequence[BaseTool]
    ) -> Sequence[AsyncBaseTool]:
        """Ensure all tools are async."""
        return [adapt_to_async_tool(tool) for tool in tools]

    def _get_handoff_tool(
        self, current_agent: BaseWorkflowAgent
    ) -> Optional[AsyncBaseTool]:
        """Creates a handoff tool for the given agent."""
        agent_info = {cfg.name: cfg.description for cfg in self.agents.values()}

        # Filter out agents that the current agent cannot handoff to
        configs_to_remove = []
        for name in agent_info:
            if name == current_agent.name:
                configs_to_remove.append(name)
            elif (
                current_agent.can_handoff_to is not None
                and name not in current_agent.can_handoff_to
            ):
                configs_to_remove.append(name)

        for name in configs_to_remove:
            agent_info.pop(name)

        if not agent_info:
            return None

        fn_tool_prompt = self.handoff_prompt.format(agent_info=str(agent_info))
        return FunctionTool.from_defaults(
            async_fn=handoff, description=fn_tool_prompt, return_direct=True
        )

    async def get_tools(
        self, agent_name: str, input_str: Optional[str] = None
    ) -> Sequence[AsyncBaseTool]:
        """Get tools for the given agent."""
        agent_tools = self.agents[agent_name].tools or []
        tools = [*agent_tools]
        retriever = self.agents[agent_name].tool_retriever
        if retriever is not None:
            retrieved_tools = await retriever.aretrieve(input_str or "")
            tools.extend(retrieved_tools)

        if (
            self.agents[agent_name].can_handoff_to
            or self.agents[agent_name].can_handoff_to is None
        ):
            handoff_tool = self._get_handoff_tool(self.agents[agent_name])
            if handoff_tool:
                tools.append(handoff_tool)

        return self._ensure_tools_are_async(tools)

    async def _init_context(self, ctx: Context, ev: StartEvent) -> None:
        """Initialize the context once, if needed."""
        if not await ctx.get("memory", default=None):
            default_memory = ev.get("memory", default=None)
            default_memory = default_memory or ChatMemoryBuffer.from_defaults(
                llm=self.agents[self.root_agent].llm or Settings.llm
            )
            await ctx.set("memory", default_memory)
        if not await ctx.get("agents", default=None):
            await ctx.set("agents", list(self.agents.keys()))
        if not await ctx.get("state", default=None):
            await ctx.set("state", self.initial_state)
        if not await ctx.get("current_agent_name", default=None):
            await ctx.set("current_agent_name", self.root_agent)

    async def _call_tool(
        self,
        ctx: Context,
        tool: AsyncBaseTool,
        tool_input: dict,
    ) -> ToolOutput:
        """Call the given tool with the given input."""
        try:
            if isinstance(tool, FunctionTool) and tool.requires_context:
                tool_output = await tool.acall(ctx=ctx, **tool_input)
            else:
                tool_output = await tool.acall(**tool_input)
        except Exception as e:
            tool_output = ToolOutput(
                content=str(e),
                tool_name=tool.metadata.name,
                raw_input=tool_input,
                raw_output=str(e),
                is_error=True,
            )

        return tool_output

    @step
    async def init_run(self, ctx: Context, ev: StartEvent) -> AgentInput:
        """Sets up the workflow and validates inputs."""
        await self._init_context(ctx, ev)

        user_msg = ev.get("user_msg")
        chat_history = ev.get("chat_history")
        if user_msg and chat_history:
            raise ValueError("Cannot provide both user_msg and chat_history")

        if isinstance(user_msg, str):
            user_msg = ChatMessage(role="user", content=user_msg)

        await ctx.set("user_msg_str", user_msg.content)

        # Add messages to memory
        memory: BaseMemory = await ctx.get("memory")
        if user_msg:
            # Add the state to the user message if it exists and if requested
            current_state = await ctx.get("state")
            if current_state:
                user_msg.content = self.state_prompt.format(
                    state=current_state, msg=user_msg.content
                )

            await memory.aput(user_msg)
            input_messages = memory.get(input=user_msg.content)
        else:
            memory.set(chat_history)
            input_messages = memory.get()

        # send to the current agent
        current_agent_name: str = await ctx.get("current_agent_name")
        return AgentInput(input=input_messages, current_agent_name=current_agent_name)

    @step
    async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
        """Main agent handling logic."""
        current_agent_name = ev.current_agent_name
        agent = self.agents[current_agent_name]
        llm_input = ev.input

        if agent.system_prompt:
            llm_input = [
                ChatMessage(role="system", content=agent.system_prompt),
                *llm_input,
            ]

        return AgentSetup(
            input=llm_input,
            current_agent_name=ev.current_agent_name,
        )

    @step
    async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
        """Run the agent."""
        memory: BaseMemory = await ctx.get("memory")
        agent = self.agents[ev.current_agent_name]
        tools = await self.get_tools(ev.current_agent_name, ev.input[-1].content or "")

        agent_output = await agent.take_step(
            ctx,
            ev.input,
            tools,
            memory,
        )

        ctx.write_event_to_stream(agent_output)
        return agent_output

    @step
    async def parse_agent_output(
        self, ctx: Context, ev: AgentOutput
    ) -> Union[StopEvent, ToolCall, None]:
        if not ev.tool_calls:
            agent = self.agents[ev.current_agent_name]
            memory: BaseMemory = await ctx.get("memory")
            output = await agent.finalize(ctx, ev, memory)

            return StopEvent(result=output)

        await ctx.set("num_tool_calls", len(ev.tool_calls))

        for tool_call in ev.tool_calls:
            ctx.send_event(
                ToolCall(
                    tool_name=tool_call.tool_name,
                    tool_kwargs=tool_call.tool_kwargs,
                    tool_id=tool_call.tool_id,
                )
            )

        return None

    @step
    async def call_tool(self, ctx: Context, ev: ToolCall) -> ToolCallResult:
        """Calls the tool and handles the result."""
        ctx.write_event_to_stream(
            ToolCall(
                tool_name=ev.tool_name,
                tool_kwargs=ev.tool_kwargs,
                tool_id=ev.tool_id,
            )
        )

        current_agent_name = await ctx.get("current_agent_name")
        tools = await self.get_tools(current_agent_name, ev.tool_name)
        tools_by_name = {tool.metadata.name: tool for tool in tools}
        if ev.tool_name not in tools_by_name:
            tool = None
            result = ToolOutput(
                content=f"Tool {ev.tool_name} not found. Please select a tool that is available.",
                tool_name=ev.tool_name,
                raw_input=ev.tool_kwargs,
                raw_output=None,
                is_error=True,
            )
        else:
            tool = tools_by_name[ev.tool_name]
            result = await self._call_tool(ctx, tool, ev.tool_kwargs)

        result_ev = ToolCallResult(
            tool_name=ev.tool_name,
            tool_kwargs=ev.tool_kwargs,
            tool_id=ev.tool_id,
            tool_output=result,
            return_direct=tool.metadata.return_direct if tool else False,
        )

        ctx.write_event_to_stream(result_ev)
        return result_ev

    @step
    async def aggregate_tool_results(
        self, ctx: Context, ev: ToolCallResult
    ) -> Union[AgentInput, StopEvent, None]:
        """Aggregate tool results and return the next agent input."""
        num_tool_calls = await ctx.get("num_tool_calls", default=0)
        if num_tool_calls == 0:
            raise ValueError("No tool calls found, cannot aggregate results.")

        tool_call_results: list[ToolCallResult] = ctx.collect_events(  # type: ignore
            ev, expected=[ToolCallResult] * num_tool_calls
        )
        if not tool_call_results:
            return None

        memory: BaseMemory = await ctx.get("memory")
        agent_name: str = await ctx.get("current_agent_name")
        agent: BaseWorkflowAgent = self.agents[agent_name]

        await agent.handle_tool_call_results(ctx, tool_call_results, memory)

        # set the next agent, if needed
        # the handoff tool sets this
        next_agent_name = await ctx.get("next_agent", default=None)
        if next_agent_name:
            await ctx.set("current_agent_name", next_agent_name)

        if any(
            tool_call_result.return_direct for tool_call_result in tool_call_results
        ):
            # if any tool calls return directly, take the first one
            return_direct_tool = next(
                tool_call_result
                for tool_call_result in tool_call_results
                if tool_call_result.return_direct
            )

            # always finalize the agent, even if we're just handing off
            result = AgentOutput(
                response=ChatMessage(
                    role="assistant",
                    content=return_direct_tool.tool_output.content or "",
                ),
                tool_calls=[
                    ToolSelection(
                        tool_id=t.tool_id,
                        tool_name=t.tool_name,
                        tool_kwargs=t.tool_kwargs,
                    )
                    for t in tool_call_results
                ],
                raw=str(return_direct_tool.tool_output.raw_output),
                current_agent_name=agent.name,
            )
            result = await agent.finalize(ctx, result, memory)

            # we don't want to stop the system if we're just handing off
            if return_direct_tool.tool_name != "handoff":
                return StopEvent(result=result)

        user_msg_str = await ctx.get("user_msg_str")
        input_messages = memory.get(input=user_msg_str)

        # get this again, in case it changed
        agent_name = await ctx.get("current_agent_name")
        agent = self.agents[agent_name]

        return AgentInput(input=input_messages, current_agent_name=agent.name)

    def run(
        self,
        user_msg: Optional[Union[str, ChatMessage]] = None,
        chat_history: Optional[List[ChatMessage]] = None,
        memory: Optional[BaseMemory] = None,
        ctx: Optional[Context] = None,
        stepwise: bool = False,
        checkpoint_callback: Optional[CheckpointCallback] = None,
        **kwargs: Any,
    ) -> WorkflowHandler:
        return super().run(
            user_msg=user_msg,
            chat_history=chat_history,
            memory=memory,
            ctx=ctx,
            stepwise=stepwise,
            checkpoint_callback=checkpoint_callback,
            **kwargs,
        )

    @classmethod
    def from_tools_or_functions(
        cls,
        tools_or_functions: List[Union[BaseTool, Callable]],
        llm: Optional[LLM] = None,
        system_prompt: Optional[str] = None,
        state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        initial_state: Optional[dict] = None,
        timeout: Optional[float] = None,
    ) -> "AgentWorkflow":
        """Initializes an AgentWorkflow from a list of tools or functions.

        The workflow will be initialized with a single agent that uses the provided tools or functions.

        If the LLM is a function calling model, the workflow will use the FunctionAgent.
        Otherwise, it will use the ReActAgent.
        """
        llm = llm or Settings.llm
        agent_cls = (
            FunctionAgent if llm.metadata.is_function_calling_model else ReActAgent
        )

        tools = [
            FunctionTool.from_defaults(fn=tool)
            if not isinstance(tool, FunctionTool)
            else tool
            for tool in tools_or_functions
        ]
        return cls(
            agents=[
                agent_cls(
                    name="Agent",
                    description="A single agent that uses the provided tools or functions.",
                    tools=tools,
                    llm=llm,
                    system_prompt=system_prompt,
                )
            ],
            state_prompt=state_prompt,
            initial_state=initial_state,
            timeout=timeout,
        )

get_tools async #

get_tools(agent_name: str, input_str: Optional[str] = None) -> Sequence[AsyncBaseTool]

Get tools for the given agent.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
async def get_tools(
    self, agent_name: str, input_str: Optional[str] = None
) -> Sequence[AsyncBaseTool]:
    """Get tools for the given agent."""
    agent_tools = self.agents[agent_name].tools or []
    tools = [*agent_tools]
    retriever = self.agents[agent_name].tool_retriever
    if retriever is not None:
        retrieved_tools = await retriever.aretrieve(input_str or "")
        tools.extend(retrieved_tools)

    if (
        self.agents[agent_name].can_handoff_to
        or self.agents[agent_name].can_handoff_to is None
    ):
        handoff_tool = self._get_handoff_tool(self.agents[agent_name])
        if handoff_tool:
            tools.append(handoff_tool)

    return self._ensure_tools_are_async(tools)

init_run async #

init_run(ctx: Context, ev: StartEvent) -> AgentInput

Sets up the workflow and validates inputs.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@step
async def init_run(self, ctx: Context, ev: StartEvent) -> AgentInput:
    """Sets up the workflow and validates inputs."""
    await self._init_context(ctx, ev)

    user_msg = ev.get("user_msg")
    chat_history = ev.get("chat_history")
    if user_msg and chat_history:
        raise ValueError("Cannot provide both user_msg and chat_history")

    if isinstance(user_msg, str):
        user_msg = ChatMessage(role="user", content=user_msg)

    await ctx.set("user_msg_str", user_msg.content)

    # Add messages to memory
    memory: BaseMemory = await ctx.get("memory")
    if user_msg:
        # Add the state to the user message if it exists and if requested
        current_state = await ctx.get("state")
        if current_state:
            user_msg.content = self.state_prompt.format(
                state=current_state, msg=user_msg.content
            )

        await memory.aput(user_msg)
        input_messages = memory.get(input=user_msg.content)
    else:
        memory.set(chat_history)
        input_messages = memory.get()

    # send to the current agent
    current_agent_name: str = await ctx.get("current_agent_name")
    return AgentInput(input=input_messages, current_agent_name=current_agent_name)

setup_agent async #

setup_agent(ctx: Context, ev: AgentInput) -> AgentSetup

Main agent handling logic.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
@step
async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
    """Main agent handling logic."""
    current_agent_name = ev.current_agent_name
    agent = self.agents[current_agent_name]
    llm_input = ev.input

    if agent.system_prompt:
        llm_input = [
            ChatMessage(role="system", content=agent.system_prompt),
            *llm_input,
        ]

    return AgentSetup(
        input=llm_input,
        current_agent_name=ev.current_agent_name,
    )

run_agent_step async #

run_agent_step(ctx: Context, ev: AgentSetup) -> AgentOutput

Run the agent.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
@step
async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
    """Run the agent."""
    memory: BaseMemory = await ctx.get("memory")
    agent = self.agents[ev.current_agent_name]
    tools = await self.get_tools(ev.current_agent_name, ev.input[-1].content or "")

    agent_output = await agent.take_step(
        ctx,
        ev.input,
        tools,
        memory,
    )

    ctx.write_event_to_stream(agent_output)
    return agent_output

call_tool async #

call_tool(ctx: Context, ev: ToolCall) -> ToolCallResult

Calls the tool and handles the result.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@step
async def call_tool(self, ctx: Context, ev: ToolCall) -> ToolCallResult:
    """Calls the tool and handles the result."""
    ctx.write_event_to_stream(
        ToolCall(
            tool_name=ev.tool_name,
            tool_kwargs=ev.tool_kwargs,
            tool_id=ev.tool_id,
        )
    )

    current_agent_name = await ctx.get("current_agent_name")
    tools = await self.get_tools(current_agent_name, ev.tool_name)
    tools_by_name = {tool.metadata.name: tool for tool in tools}
    if ev.tool_name not in tools_by_name:
        tool = None
        result = ToolOutput(
            content=f"Tool {ev.tool_name} not found. Please select a tool that is available.",
            tool_name=ev.tool_name,
            raw_input=ev.tool_kwargs,
            raw_output=None,
            is_error=True,
        )
    else:
        tool = tools_by_name[ev.tool_name]
        result = await self._call_tool(ctx, tool, ev.tool_kwargs)

    result_ev = ToolCallResult(
        tool_name=ev.tool_name,
        tool_kwargs=ev.tool_kwargs,
        tool_id=ev.tool_id,
        tool_output=result,
        return_direct=tool.metadata.return_direct if tool else False,
    )

    ctx.write_event_to_stream(result_ev)
    return result_ev

aggregate_tool_results async #

aggregate_tool_results(ctx: Context, ev: ToolCallResult) -> Union[AgentInput, StopEvent, None]

Aggregate tool results and return the next agent input.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@step
async def aggregate_tool_results(
    self, ctx: Context, ev: ToolCallResult
) -> Union[AgentInput, StopEvent, None]:
    """Aggregate tool results and return the next agent input."""
    num_tool_calls = await ctx.get("num_tool_calls", default=0)
    if num_tool_calls == 0:
        raise ValueError("No tool calls found, cannot aggregate results.")

    tool_call_results: list[ToolCallResult] = ctx.collect_events(  # type: ignore
        ev, expected=[ToolCallResult] * num_tool_calls
    )
    if not tool_call_results:
        return None

    memory: BaseMemory = await ctx.get("memory")
    agent_name: str = await ctx.get("current_agent_name")
    agent: BaseWorkflowAgent = self.agents[agent_name]

    await agent.handle_tool_call_results(ctx, tool_call_results, memory)

    # set the next agent, if needed
    # the handoff tool sets this
    next_agent_name = await ctx.get("next_agent", default=None)
    if next_agent_name:
        await ctx.set("current_agent_name", next_agent_name)

    if any(
        tool_call_result.return_direct for tool_call_result in tool_call_results
    ):
        # if any tool calls return directly, take the first one
        return_direct_tool = next(
            tool_call_result
            for tool_call_result in tool_call_results
            if tool_call_result.return_direct
        )

        # always finalize the agent, even if we're just handing off
        result = AgentOutput(
            response=ChatMessage(
                role="assistant",
                content=return_direct_tool.tool_output.content or "",
            ),
            tool_calls=[
                ToolSelection(
                    tool_id=t.tool_id,
                    tool_name=t.tool_name,
                    tool_kwargs=t.tool_kwargs,
                )
                for t in tool_call_results
            ],
            raw=str(return_direct_tool.tool_output.raw_output),
            current_agent_name=agent.name,
        )
        result = await agent.finalize(ctx, result, memory)

        # we don't want to stop the system if we're just handing off
        if return_direct_tool.tool_name != "handoff":
            return StopEvent(result=result)

    user_msg_str = await ctx.get("user_msg_str")
    input_messages = memory.get(input=user_msg_str)

    # get this again, in case it changed
    agent_name = await ctx.get("current_agent_name")
    agent = self.agents[agent_name]

    return AgentInput(input=input_messages, current_agent_name=agent.name)

from_tools_or_functions classmethod #

from_tools_or_functions(tools_or_functions: List[Union[BaseTool, Callable]], llm: Optional[LLM] = None, system_prompt: Optional[str] = None, state_prompt: Optional[Union[str, BasePromptTemplate]] = None, initial_state: Optional[dict] = None, timeout: Optional[float] = None) -> AgentWorkflow

Initializes an AgentWorkflow from a list of tools or functions.

The workflow will be initialized with a single agent that uses the provided tools or functions.

If the LLM is a function calling model, the workflow will use the FunctionAgent. Otherwise, it will use the ReActAgent.

Source code in llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
@classmethod
def from_tools_or_functions(
    cls,
    tools_or_functions: List[Union[BaseTool, Callable]],
    llm: Optional[LLM] = None,
    system_prompt: Optional[str] = None,
    state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
    initial_state: Optional[dict] = None,
    timeout: Optional[float] = None,
) -> "AgentWorkflow":
    """Initializes an AgentWorkflow from a list of tools or functions.

    The workflow will be initialized with a single agent that uses the provided tools or functions.

    If the LLM is a function calling model, the workflow will use the FunctionAgent.
    Otherwise, it will use the ReActAgent.
    """
    llm = llm or Settings.llm
    agent_cls = (
        FunctionAgent if llm.metadata.is_function_calling_model else ReActAgent
    )

    tools = [
        FunctionTool.from_defaults(fn=tool)
        if not isinstance(tool, FunctionTool)
        else tool
        for tool in tools_or_functions
    ]
    return cls(
        agents=[
            agent_cls(
                name="Agent",
                description="A single agent that uses the provided tools or functions.",
                tools=tools,
                llm=llm,
                system_prompt=system_prompt,
            )
        ],
        state_prompt=state_prompt,
        initial_state=initial_state,
        timeout=timeout,
    )

BaseWorkflowAgent #

Bases: BaseModel, PromptMixin, ABC

Base class for all agents, combining config and logic.

Source code in llama-index-core/llama_index/core/agent/workflow/base_agent.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class BaseWorkflowAgent(BaseModel, PromptMixin, ABC):
    """Base class for all agents, combining config and logic."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str = Field(description="The name of the agent")
    description: str = Field(
        description="The description of what the agent does and is responsible for"
    )
    system_prompt: Optional[str] = Field(
        default=None, description="The system prompt for the agent"
    )
    tools: Optional[List[BaseTool]] = Field(
        default=None, description="The tools that the agent can use"
    )
    tool_retriever: Optional[ObjectRetriever] = Field(
        default=None,
        description="The tool retriever for the agent, can be provided instead of tools",
    )
    can_handoff_to: Optional[List[str]] = Field(
        default=None, description="The agent names that this agent can hand off to"
    )
    llm: LLM = Field(
        default_factory=get_default_llm, description="The LLM that the agent uses"
    )

    @field_validator("tools", mode="before")
    def validate_tools(
        cls, v: Optional[Sequence[Union[BaseTool, Callable]]]
    ) -> Optional[Sequence[BaseTool]]:
        """Validate tools.

        If tools are not of type BaseTool, they will be converted to FunctionTools.
        This assumes the inputs are tools or callable functions.
        """
        if v is None:
            return None

        validated_tools: List[BaseTool] = []
        for tool in v:
            if not isinstance(tool, BaseTool):
                validated_tools.append(FunctionTool.from_defaults(tool))
            else:
                validated_tools.append(tool)
        return validated_tools  # type: ignore[return-value]

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {}

    def _update_prompts(self, prompts_dict: PromptDictType) -> None:
        """Update prompts."""

    @abstractmethod
    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the agent."""

    @abstractmethod
    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results."""

    @abstractmethod
    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """Finalize the agent's execution."""

validate_tools #

validate_tools(v: Optional[Sequence[Union[BaseTool, Callable]]]) -> Optional[Sequence[BaseTool]]

Validate tools.

If tools are not of type BaseTool, they will be converted to FunctionTools. This assumes the inputs are tools or callable functions.

Source code in llama-index-core/llama_index/core/agent/workflow/base_agent.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@field_validator("tools", mode="before")
def validate_tools(
    cls, v: Optional[Sequence[Union[BaseTool, Callable]]]
) -> Optional[Sequence[BaseTool]]:
    """Validate tools.

    If tools are not of type BaseTool, they will be converted to FunctionTools.
    This assumes the inputs are tools or callable functions.
    """
    if v is None:
        return None

    validated_tools: List[BaseTool] = []
    for tool in v:
        if not isinstance(tool, BaseTool):
            validated_tools.append(FunctionTool.from_defaults(tool))
        else:
            validated_tools.append(tool)
    return validated_tools  # type: ignore[return-value]

take_step abstractmethod async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

Take a single step with the agent.

Source code in llama-index-core/llama_index/core/agent/workflow/base_agent.py
84
85
86
87
88
89
90
91
92
@abstractmethod
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the agent."""

handle_tool_call_results abstractmethod async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

Handle tool call results.

Source code in llama-index-core/llama_index/core/agent/workflow/base_agent.py
94
95
96
97
98
@abstractmethod
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results."""

finalize abstractmethod async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

Finalize the agent's execution.

Source code in llama-index-core/llama_index/core/agent/workflow/base_agent.py
100
101
102
103
104
@abstractmethod
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """Finalize the agent's execution."""

FunctionAgent #

Bases: BaseWorkflowAgent

Function calling agent implementation.

Source code in llama-index-core/llama_index/core/agent/workflow/function_agent.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class FunctionAgent(BaseWorkflowAgent):
    """Function calling agent implementation."""

    scratchpad_key: str = "scratchpad"

    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the function calling agent."""
        if not self.llm.metadata.is_function_calling_model:
            raise ValueError("LLM must be a FunctionCallingLLM")

        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        current_llm_input = [*llm_input, *scratchpad]

        ctx.write_event_to_stream(
            AgentInput(input=current_llm_input, current_agent_name=self.name)
        )

        response = await self.llm.astream_chat_with_tools(  # type: ignore
            tools, chat_history=current_llm_input, allow_parallel_tool_calls=True
        )
        async for r in response:
            tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
                r, error_on_no_tool_call=False
            )
            raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
            ctx.write_event_to_stream(
                AgentStream(
                    delta=r.delta or "",
                    response=r.message.content or "",
                    tool_calls=tool_calls or [],
                    raw=raw,
                    current_agent_name=self.name,
                )
            )

        tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
            r, error_on_no_tool_call=False
        )

        # only add to scratchpad if we didn't select the handoff tool
        scratchpad.append(r.message)
        await ctx.set(self.scratchpad_key, scratchpad)

        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        return AgentOutput(
            response=r.message,
            tool_calls=tool_calls or [],
            raw=raw,
            current_agent_name=self.name,
        )

    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results for function calling agent."""
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

        for tool_call_result in results:
            scratchpad.append(
                ChatMessage(
                    role="tool",
                    content=str(tool_call_result.tool_output.content),
                    additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                )
            )

            if tool_call_result.return_direct:
                scratchpad.append(
                    ChatMessage(
                        role="assistant",
                        content=str(tool_call_result.tool_output.content),
                        additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                    )
                )
                break

        await ctx.set(self.scratchpad_key, scratchpad)

    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """Finalize the function calling agent.

        Adds all in-progress messages to memory.
        """
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        for msg in scratchpad:
            await memory.aput(msg)

        # reset scratchpad
        await ctx.set(self.scratchpad_key, [])

        return output

take_step async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

Take a single step with the function calling agent.

Source code in llama-index-core/llama_index/core/agent/workflow/function_agent.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the function calling agent."""
    if not self.llm.metadata.is_function_calling_model:
        raise ValueError("LLM must be a FunctionCallingLLM")

    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    current_llm_input = [*llm_input, *scratchpad]

    ctx.write_event_to_stream(
        AgentInput(input=current_llm_input, current_agent_name=self.name)
    )

    response = await self.llm.astream_chat_with_tools(  # type: ignore
        tools, chat_history=current_llm_input, allow_parallel_tool_calls=True
    )
    async for r in response:
        tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
            r, error_on_no_tool_call=False
        )
        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        ctx.write_event_to_stream(
            AgentStream(
                delta=r.delta or "",
                response=r.message.content or "",
                tool_calls=tool_calls or [],
                raw=raw,
                current_agent_name=self.name,
            )
        )

    tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
        r, error_on_no_tool_call=False
    )

    # only add to scratchpad if we didn't select the handoff tool
    scratchpad.append(r.message)
    await ctx.set(self.scratchpad_key, scratchpad)

    raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
    return AgentOutput(
        response=r.message,
        tool_calls=tool_calls or [],
        raw=raw,
        current_agent_name=self.name,
    )

handle_tool_call_results async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

Handle tool call results for function calling agent.

Source code in llama-index-core/llama_index/core/agent/workflow/function_agent.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results for function calling agent."""
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

    for tool_call_result in results:
        scratchpad.append(
            ChatMessage(
                role="tool",
                content=str(tool_call_result.tool_output.content),
                additional_kwargs={"tool_call_id": tool_call_result.tool_id},
            )
        )

        if tool_call_result.return_direct:
            scratchpad.append(
                ChatMessage(
                    role="assistant",
                    content=str(tool_call_result.tool_output.content),
                    additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                )
            )
            break

    await ctx.set(self.scratchpad_key, scratchpad)

finalize async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

Finalize the function calling agent.

Adds all in-progress messages to memory.

Source code in llama-index-core/llama_index/core/agent/workflow/function_agent.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """Finalize the function calling agent.

    Adds all in-progress messages to memory.
    """
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    for msg in scratchpad:
        await memory.aput(msg)

    # reset scratchpad
    await ctx.set(self.scratchpad_key, [])

    return output

ReActAgent #

Bases: BaseWorkflowAgent

React agent implementation.

Source code in llama-index-core/llama_index/core/agent/workflow/react_agent.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class ReActAgent(BaseWorkflowAgent):
    """React agent implementation."""

    reasoning_key: str = "current_reasoning"
    output_parser: ReActOutputParser = Field(
        default_factory=ReActOutputParser, description="The react output parser"
    )
    formatter: ReActChatFormatter = Field(
        default_factory=default_formatter,
        description="The react chat formatter to format the reasoning steps and chat history into an llm input.",
    )

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct formatter does not explicitly specify PromptTemplate
        # objects, but wrap it in this to obey the interface
        react_header = self.formatter.system_header
        return {"react_header": PromptTemplate(react_header)}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "system_prompt" in prompts:
            react_header = cast(PromptTemplate, prompts["react_header"])
            self.formatter.system_header = react_header.template

    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the React agent."""
        # remove system prompt, since the react prompt will be combined with it
        if llm_input[0].role == "system":
            system_prompt = llm_input[0].content or ""
            llm_input = llm_input[1:]
        else:
            system_prompt = ""

        output_parser = self.output_parser
        react_chat_formatter = self.formatter
        react_chat_formatter.context = system_prompt

        # Format initial chat input
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )
        input_chat = react_chat_formatter.format(
            tools,
            chat_history=llm_input,
            current_reasoning=current_reasoning,
        )
        ctx.write_event_to_stream(
            AgentInput(input=input_chat, current_agent_name=self.name)
        )

        # Initial LLM call
        response = await self.llm.astream_chat(input_chat)
        async for r in response:
            raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
            ctx.write_event_to_stream(
                AgentStream(
                    delta=r.delta or "",
                    response=r.message.content or "",
                    tool_calls=[],
                    raw=raw,
                    current_agent_name=self.name,
                )
            )

        # Parse reasoning step and check if done
        message_content = r.message.content
        if not message_content:
            raise ValueError("Got empty message")

        try:
            reasoning_step = output_parser.parse(message_content, is_streaming=False)
        except ValueError as e:
            error_msg = f"Error: Could not parse output. Please follow the thought-action-input format. Try again. Details: {e!s}"
            await memory.aput(r.message)
            await memory.aput(ChatMessage(role="user", content=error_msg))

            raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
            return AgentOutput(
                response=r.message,
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )

        # add to reasoning if not a handoff
        current_reasoning.append(reasoning_step)
        await ctx.set(self.reasoning_key, current_reasoning)

        # If response step, we're done
        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        if reasoning_step.is_done:
            return AgentOutput(
                response=r.message,
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )

        reasoning_step = cast(ActionReasoningStep, reasoning_step)
        if not isinstance(reasoning_step, ActionReasoningStep):
            raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

        # Create tool call
        tool_calls = [
            ToolSelection(
                tool_id=str(uuid.uuid4()),
                tool_name=reasoning_step.action,
                tool_kwargs=reasoning_step.action_input,
            )
        ]

        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        return AgentOutput(
            response=r.message,
            tool_calls=tool_calls,
            raw=raw,
            current_agent_name=self.name,
        )

    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results for React agent."""
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )
        for tool_call_result in results:
            obs_step = ObservationReasoningStep(
                observation=str(tool_call_result.tool_output.content),
                return_direct=tool_call_result.return_direct,
            )
            current_reasoning.append(obs_step)

            if tool_call_result.return_direct:
                current_reasoning.append(
                    ResponseReasoningStep(
                        thought=obs_step.observation,
                        response=obs_step.observation,
                        is_streaming=False,
                    )
                )
                break

        await ctx.set(self.reasoning_key, current_reasoning)

    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """Finalize the React agent."""
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )

        reasoning_str = "\n".join([x.get_content() for x in current_reasoning])

        if reasoning_str:
            reasoning_msg = ChatMessage(role="assistant", content=reasoning_str)
            await memory.aput(reasoning_msg)
            await ctx.set(self.reasoning_key, [])

        # remove "Answer:" from the response
        if output.response.content and "Answer:" in output.response.content:
            start_idx = output.response.content.find("Answer:")
            if start_idx != -1:
                output.response.content = output.response.content[
                    start_idx + len("Answer:") :
                ].strip()

        # clear scratchpad
        await ctx.set(self.reasoning_key, [])

        return output

take_step async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

Take a single step with the React agent.

Source code in llama-index-core/llama_index/core/agent/workflow/react_agent.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the React agent."""
    # remove system prompt, since the react prompt will be combined with it
    if llm_input[0].role == "system":
        system_prompt = llm_input[0].content or ""
        llm_input = llm_input[1:]
    else:
        system_prompt = ""

    output_parser = self.output_parser
    react_chat_formatter = self.formatter
    react_chat_formatter.context = system_prompt

    # Format initial chat input
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )
    input_chat = react_chat_formatter.format(
        tools,
        chat_history=llm_input,
        current_reasoning=current_reasoning,
    )
    ctx.write_event_to_stream(
        AgentInput(input=input_chat, current_agent_name=self.name)
    )

    # Initial LLM call
    response = await self.llm.astream_chat(input_chat)
    async for r in response:
        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        ctx.write_event_to_stream(
            AgentStream(
                delta=r.delta or "",
                response=r.message.content or "",
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )
        )

    # Parse reasoning step and check if done
    message_content = r.message.content
    if not message_content:
        raise ValueError("Got empty message")

    try:
        reasoning_step = output_parser.parse(message_content, is_streaming=False)
    except ValueError as e:
        error_msg = f"Error: Could not parse output. Please follow the thought-action-input format. Try again. Details: {e!s}"
        await memory.aput(r.message)
        await memory.aput(ChatMessage(role="user", content=error_msg))

        raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
        return AgentOutput(
            response=r.message,
            tool_calls=[],
            raw=raw,
            current_agent_name=self.name,
        )

    # add to reasoning if not a handoff
    current_reasoning.append(reasoning_step)
    await ctx.set(self.reasoning_key, current_reasoning)

    # If response step, we're done
    raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
    if reasoning_step.is_done:
        return AgentOutput(
            response=r.message,
            tool_calls=[],
            raw=raw,
            current_agent_name=self.name,
        )

    reasoning_step = cast(ActionReasoningStep, reasoning_step)
    if not isinstance(reasoning_step, ActionReasoningStep):
        raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

    # Create tool call
    tool_calls = [
        ToolSelection(
            tool_id=str(uuid.uuid4()),
            tool_name=reasoning_step.action,
            tool_kwargs=reasoning_step.action_input,
        )
    ]

    raw = r.raw.model_dump() if isinstance(r.raw, BaseModel) else r.raw
    return AgentOutput(
        response=r.message,
        tool_calls=tool_calls,
        raw=raw,
        current_agent_name=self.name,
    )

handle_tool_call_results async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

Handle tool call results for React agent.

Source code in llama-index-core/llama_index/core/agent/workflow/react_agent.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results for React agent."""
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )
    for tool_call_result in results:
        obs_step = ObservationReasoningStep(
            observation=str(tool_call_result.tool_output.content),
            return_direct=tool_call_result.return_direct,
        )
        current_reasoning.append(obs_step)

        if tool_call_result.return_direct:
            current_reasoning.append(
                ResponseReasoningStep(
                    thought=obs_step.observation,
                    response=obs_step.observation,
                    is_streaming=False,
                )
            )
            break

    await ctx.set(self.reasoning_key, current_reasoning)

finalize async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

Finalize the React agent.

Source code in llama-index-core/llama_index/core/agent/workflow/react_agent.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """Finalize the React agent."""
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )

    reasoning_str = "\n".join([x.get_content() for x in current_reasoning])

    if reasoning_str:
        reasoning_msg = ChatMessage(role="assistant", content=reasoning_str)
        await memory.aput(reasoning_msg)
        await ctx.set(self.reasoning_key, [])

    # remove "Answer:" from the response
    if output.response.content and "Answer:" in output.response.content:
        start_idx = output.response.content.find("Answer:")
        if start_idx != -1:
            output.response.content = output.response.content[
                start_idx + len("Answer:") :
            ].strip()

    # clear scratchpad
    await ctx.set(self.reasoning_key, [])

    return output

AgentInput #

Bases: Event

LLM input.

Source code in llama-index-core/llama_index/core/agent/workflow/workflow_events.py
 8
 9
10
11
12
class AgentInput(Event):
    """LLM input."""

    input: list[ChatMessage]
    current_agent_name: str

AgentStream #

Bases: Event

Agent stream.

Source code in llama-index-core/llama_index/core/agent/workflow/workflow_events.py
22
23
24
25
26
27
28
29
class AgentStream(Event):
    """Agent stream."""

    delta: str
    response: str
    current_agent_name: str
    tool_calls: list[ToolSelection]
    raw: Any

AgentOutput #

Bases: Event

LLM output.

Source code in llama-index-core/llama_index/core/agent/workflow/workflow_events.py
32
33
34
35
36
37
38
39
40
41
class AgentOutput(Event):
    """LLM output."""

    response: ChatMessage
    tool_calls: list[ToolSelection]
    raw: Any
    current_agent_name: str

    def __str__(self) -> str:
        return self.response.content or ""

ToolCall #

Bases: Event

All tool calls are surfaced.

Source code in llama-index-core/llama_index/core/agent/workflow/workflow_events.py
44
45
46
47
48
49
class ToolCall(Event):
    """All tool calls are surfaced."""

    tool_name: str
    tool_kwargs: dict
    tool_id: str

ToolCallResult #

Bases: ToolCall

Tool call result.

Source code in llama-index-core/llama_index/core/agent/workflow/workflow_events.py
52
53
54
55
56
class ToolCallResult(ToolCall):
    """Tool call result."""

    tool_output: ToolOutput
    return_direct: bool