Skip to content

Core Agent Classes#

Base Types#

Base agent types.

BaseAgent #

Bases: BaseChatEngine, BaseQueryEngine

Base Agent.

Source code in llama-index-core/llama_index/core/base/agent/types.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class BaseAgent(BaseChatEngine, BaseQueryEngine):
    """Base Agent."""

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt modules."""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""

    # ===== Query Engine Interface =====
    @trace_method("query")
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = self.chat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    @trace_method("query")
    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = await self.achat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    def stream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("stream_chat not implemented")

    async def astream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("astream_chat not implemented")

BaseAgentWorker #

Bases: PromptMixin, DispatcherSpanMixin

Base agent worker.

Source code in llama-index-core/llama_index/core/base/agent/types.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class BaseAgentWorker(PromptMixin, DispatcherSpanMixin):
    """Base agent worker."""

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt modules."""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""

    @abstractmethod
    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""

    @abstractmethod
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""

    @abstractmethod
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        raise NotImplementedError

    @abstractmethod
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # TODO: figure out if we need a different type for TaskStepOutput
        raise NotImplementedError

    @abstractmethod
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError

    @abstractmethod
    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""

    async def afinalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        self.finalize_task(task, **kwargs)

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)

    def as_agent(self, **kwargs: Any) -> "AgentRunner":
        """Return as an agent runner."""
        from llama_index.core.agent.runner.base import AgentRunner

        return AgentRunner(self, **kwargs)

initialize_step abstractmethod #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/base/agent/types.py
208
209
210
@abstractmethod
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""

run_step abstractmethod #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/base/agent/types.py
212
213
214
@abstractmethod
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""

arun_step abstractmethod async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/base/agent/types.py
216
217
218
219
220
221
@abstractmethod
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    raise NotImplementedError

stream_step abstractmethod #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/base/agent/types.py
223
224
225
226
227
@abstractmethod
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    # TODO: figure out if we need a different type for TaskStepOutput
    raise NotImplementedError

astream_step abstractmethod async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/base/agent/types.py
229
230
231
232
233
234
@abstractmethod
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError

finalize_task abstractmethod #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/base/agent/types.py
236
237
238
@abstractmethod
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""

afinalize_task async #

afinalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/base/agent/types.py
240
241
242
async def afinalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    self.finalize_task(task, **kwargs)

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/base/agent/types.py
244
245
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""

as_agent #

as_agent(**kwargs: Any) -> AgentRunner

Return as an agent runner.

Source code in llama-index-core/llama_index/core/base/agent/types.py
248
249
250
251
252
def as_agent(self, **kwargs: Any) -> "AgentRunner":
    """Return as an agent runner."""
    from llama_index.core.agent.runner.base import AgentRunner

    return AgentRunner(self, **kwargs)

Task #

Bases: BaseModel

Agent Task.

Represents a "run" of an agent given a user input.

Parameters:

Name Type Description Default
task_id str

Task ID

'f684bf96-c946-4e9d-9aff-34dabeabb873'
input str

User input

required
memory BaseMemory

Conversational Memory. Maintains state before execution of this task.

required
callback_manager CallbackManager

Callback manager for the task.

<llama_index.core.callbacks.base.CallbackManager object at 0x7f75796bccb0>
Source code in llama-index-core/llama_index/core/base/agent/types.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class Task(BaseModel):
    """Agent Task.

    Represents a "run" of an agent given a user input.

    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    task_id: str = Field(
        default_factory=lambda: str(uuid.uuid4()), description="Task ID"
    )
    input: str = Field(..., description="User input")

    # NOTE: this is state that may be modified throughout the course of execution of the task
    memory: SerializeAsAny[BaseMemory] = Field(
        ...,
        description=(
            "Conversational Memory. Maintains state before execution of this task."
        ),
    )

    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]),
        exclude=True,
        description="Callback manager for the task.",
    )

    extra_state: Dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Additional user-specified state for a given task. "
            "Can be modified throughout the execution of a task."
        ),
    )

TaskStep #

Bases: BaseModel

Agent task step.

Represents a single input step within the execution run ("Task") of an agent given a user input.

The output is returned as a TaskStepOutput.

Parameters:

Name Type Description Default
task_id str

Task ID

required
step_id str

Step ID

required
input str | None

User input

None
is_ready bool

Is this step ready to be executed?

True
Source code in llama-index-core/llama_index/core/base/agent/types.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class TaskStep(BaseModel):
    """Agent task step.

    Represents a single input step within the execution run ("Task") of an agent
    given a user input.

    The output is returned as a `TaskStepOutput`.

    """

    task_id: str = Field(..., description="Task ID")
    step_id: str = Field(..., description="Step ID")
    input: Optional[str] = Field(default=None, description="User input")
    # memory: BaseMemory = Field(
    #     ..., description="Conversational Memory"
    # )
    step_state: Dict[str, Any] = Field(
        default_factory=dict, description="Additional state for a given step."
    )

    # NOTE: the state below may change throughout the course of execution
    # this tracks the relationships to other steps
    next_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict, description="Next steps to be executed."
    )
    prev_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict,
        description="Previous steps that were dependencies for this step.",
    )
    is_ready: bool = Field(
        default=True, description="Is this step ready to be executed?"
    )

    def get_next_step(
        self,
        step_id: str,
        input: Optional[str] = None,
        step_state: Optional[Dict[str, Any]] = None,
    ) -> "TaskStep":
        """Convenience function to get next step.

        Preserve task_id, memory, step_state.

        """
        return TaskStep(
            task_id=self.task_id,
            step_id=step_id,
            input=input,
            # memory=self.memory,
            step_state=step_state or self.step_state,
        )

    def link_step(
        self,
        next_step: "TaskStep",
    ) -> None:
        """Link to next step.

        Add link from this step to next, and from next step to current.

        """
        self.next_steps[next_step.step_id] = next_step
        next_step.prev_steps[self.step_id] = self

get_next_step #

get_next_step(step_id: str, input: Optional[str] = None, step_state: Optional[Dict[str, Any]] = None) -> TaskStep

Convenience function to get next step.

Preserve task_id, memory, step_state.

Source code in llama-index-core/llama_index/core/base/agent/types.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def get_next_step(
    self,
    step_id: str,
    input: Optional[str] = None,
    step_state: Optional[Dict[str, Any]] = None,
) -> "TaskStep":
    """Convenience function to get next step.

    Preserve task_id, memory, step_state.

    """
    return TaskStep(
        task_id=self.task_id,
        step_id=step_id,
        input=input,
        # memory=self.memory,
        step_state=step_state or self.step_state,
    )
link_step(next_step: TaskStep) -> None

Link to next step.

Add link from this step to next, and from next step to current.

Source code in llama-index-core/llama_index/core/base/agent/types.py
130
131
132
133
134
135
136
137
138
139
140
def link_step(
    self,
    next_step: "TaskStep",
) -> None:
    """Link to next step.

    Add link from this step to next, and from next step to current.

    """
    self.next_steps[next_step.step_id] = next_step
    next_step.prev_steps[self.step_id] = self

TaskStepOutput #

Bases: BaseModel

Agent task step output.

Parameters:

Name Type Description Default
output Any

Task step output

required
task_step TaskStep

Task step input

required
next_steps List[TaskStep]

Next steps to be executed.

required
is_last bool

Is this the last step?

False
Source code in llama-index-core/llama_index/core/base/agent/types.py
143
144
145
146
147
148
149
150
151
152
153
class TaskStepOutput(BaseModel):
    """Agent task step output."""

    output: Any = Field(..., description="Task step output")
    task_step: TaskStep = Field(..., description="Task step input")
    next_steps: List[TaskStep] = Field(..., description="Next steps to be executed.")
    is_last: bool = Field(default=False, description="Is this the last step?")

    def __str__(self) -> str:
        """String representation."""
        return str(self.output)

Runners#

AgentRunner #

Bases: BaseAgentRunner

Agent runner.

Top-level agent orchestrator that can create tasks, run each step in a task, or run a task e2e. Stores state and keeps track of tasks.

Parameters:

Name Type Description Default
agent_worker BaseAgentWorker

step executor

required
chat_history Optional[List[ChatMessage]]

chat history. Defaults to None.

None
state Optional[AgentState]

agent state. Defaults to None.

None
memory Optional[BaseMemory]

memory. Defaults to None.

None
llm Optional[LLM]

LLM. Defaults to None.

None
callback_manager Optional[CallbackManager]

callback manager. Defaults to None.

None
init_task_state_kwargs Optional[dict]

init task state kwargs. Defaults to None.

None
Source code in llama-index-core/llama_index/core/agent/runner/base.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
class AgentRunner(BaseAgentRunner):
    """Agent runner.

    Top-level agent orchestrator that can create tasks, run each step in a task,
    or run a task e2e. Stores state and keeps track of tasks.

    Args:
        agent_worker (BaseAgentWorker): step executor
        chat_history (Optional[List[ChatMessage]], optional): chat history. Defaults to None.
        state (Optional[AgentState], optional): agent state. Defaults to None.
        memory (Optional[BaseMemory], optional): memory. Defaults to None.
        llm (Optional[LLM], optional): LLM. Defaults to None.
        callback_manager (Optional[CallbackManager], optional): callback manager. Defaults to None.
        init_task_state_kwargs (Optional[dict], optional): init task state kwargs. Defaults to None.

    """

    # # TODO: implement this in Pydantic

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[AgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
        default_tool_choice: str = "auto",
        verbose: bool = False,
    ) -> None:
        """Initialize."""
        self.agent_worker = agent_worker
        self.state = state or AgentState()
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)

        # get and set callback manager
        if callback_manager is not None:
            self.agent_worker.set_callback_manager(callback_manager)
            self.callback_manager = callback_manager
        else:
            # TODO: This is *temporary*
            # Stopgap before having a callback on the BaseAgentWorker interface.
            # Doing that requires a bit more refactoring to make sure existing code
            # doesn't break.
            if hasattr(self.agent_worker, "callback_manager"):
                self.callback_manager = (
                    self.agent_worker.callback_manager or CallbackManager()
                )
            else:
                self.callback_manager = CallbackManager()
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.delete_task_on_finish = delete_task_on_finish
        self.default_tool_choice = default_tool_choice
        self.verbose = verbose

    @staticmethod
    def from_llm(
        tools: Optional[List[BaseTool]] = None,
        llm: Optional[LLM] = None,
        **kwargs: Any,
    ) -> "AgentRunner":
        from llama_index.core.agent import ReActAgent

        if os.getenv("IS_TESTING"):
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

        try:
            from llama_index.llms.openai import OpenAI  # pants: no-infer-dep
            from llama_index.llms.openai.utils import (
                is_function_calling_model,
            )  # pants: no-infer-dep
        except ImportError:
            raise ImportError(
                "`llama-index-llms-openai` package not found. Please "
                "install by running `pip install llama-index-llms-openai`."
            )

        if isinstance(llm, OpenAI) and is_function_calling_model(llm.model):
            from llama_index.agent.openai import OpenAIAgent  # pants: no-infer-dep

            return OpenAIAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )
        else:
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()
        self.state.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """Create task."""
        if not self.init_task_state_kwargs:
            extra_state = kwargs.pop("extra_state", {})
        else:
            if "extra_state" in kwargs:
                raise ValueError(
                    "Cannot specify both `extra_state` and `init_task_state_kwargs`"
                )
            else:
                extra_state = self.init_task_state_kwargs

        callback_manager = kwargs.pop("callback_manager", self.callback_manager)
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=extra_state,
            callback_manager=callback_manager,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = TaskState(
            task=task,
            step_queue=deque([initial_step]),
        )
        # add it to state
        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """Delete task.

        NOTE: this will not delete any previous executions from memory.

        """
        self.state.task_dict.pop(task_id)

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """List tasks."""
        return [task_state.task for task_state in self.state.task_dict.values()]

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """Get task."""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """Get upcoming steps."""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """Get completed steps."""
        return self.state.get_completed_steps(task_id)

    def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
        """Get task output."""
        completed_steps = self.get_completed_steps(task_id)
        if len(completed_steps) == 0:
            raise ValueError(f"No completed steps for task_id: {task_id}")
        return completed_steps[-1]

    def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
        """Get completed tasks."""
        task_states = list(self.state.task_dict.values())
        completed_tasks = []
        for task_state in task_states:
            completed_steps = self.get_completed_steps(task_state.task.task_id)
            if len(completed_steps) > 0 and completed_steps[-1].is_last:
                completed_tasks.append(task_state.task)

        return completed_tasks

    @dispatcher.span
    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        dispatcher.event(
            AgentRunStepStartEvent(task_id=task_id, step=step, input=input)
        )

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible

        if mode == ChatResponseMode.WAIT:
            cur_step_output = self.agent_worker.run_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        dispatcher.event(AgentRunStepEndEvent(step_output=cur_step_output))
        return cur_step_output

    @dispatcher.span
    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        dispatcher.event(
            AgentRunStepStartEvent(task_id=task_id, step=step, input=input)
        )
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible
        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        dispatcher.event(AgentRunStepEndEvent(step_output=cur_step_output))
        return cur_step_output

    @dispatcher.span
    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    @dispatcher.span
    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    @dispatcher.span
    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (stream)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    @dispatcher.span
    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    @dispatcher.span
    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Finalize response."""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        # Attach all sources generated across all steps
        step_output.output.sources = self.get_task(task_id).extra_state.get(
            "sources", []
        )
        step_output.output.set_source_nodes()

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    @dispatcher.span
    async def afinalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Finalize response."""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        await self.agent_worker.afinalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        # Attach all sources generated across all steps
        step_output.output.sources = self.get_task(task_id).extra_state.get(
            "sources", []
        )
        step_output.output.set_source_nodes()

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    @dispatcher.span
    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        dispatcher.event(AgentChatWithStepStartEvent(user_msg=message))
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = self._run_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        result = self.finalize_response(
            task.task_id,
            result_output,
        )
        dispatcher.event(AgentChatWithStepEndEvent(response=result))
        return result

    @dispatcher.span
    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        dispatcher.event(AgentChatWithStepStartEvent(user_msg=message))
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = await self._arun_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        result = await self.afinalize_response(
            task.task_id,
            result_output,
        )
        dispatcher.event(AgentChatWithStepEndEvent(response=result))
        return result

    @dispatcher.span
    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse) or (
                isinstance(chat_response, AgentChatResponse)
                and chat_response.is_dummy_stream
            )
            e.on_end(payload={EventPayload.RESPONSE: chat_response})

        return chat_response  # type: ignore

    @dispatcher.span
    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse) or (
                isinstance(chat_response, AgentChatResponse)
                and chat_response.is_dummy_stream
            )
            e.on_end(payload={EventPayload.RESPONSE: chat_response})

        return chat_response  # type: ignore

    def undo_step(self, task_id: str) -> None:
        """Undo previous step."""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

Create task.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def create_task(self, input: str, **kwargs: Any) -> Task:
    """Create task."""
    if not self.init_task_state_kwargs:
        extra_state = kwargs.pop("extra_state", {})
    else:
        if "extra_state" in kwargs:
            raise ValueError(
                "Cannot specify both `extra_state` and `init_task_state_kwargs`"
            )
        else:
            extra_state = self.init_task_state_kwargs

    callback_manager = kwargs.pop("callback_manager", self.callback_manager)
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=extra_state,
        callback_manager=callback_manager,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = TaskState(
        task=task,
        step_queue=deque([initial_step]),
    )
    # add it to state
    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

Delete task.

NOTE: this will not delete any previous executions from memory.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
348
349
350
351
352
353
354
355
356
357
def delete_task(
    self,
    task_id: str,
) -> None:
    """Delete task.

    NOTE: this will not delete any previous executions from memory.

    """
    self.state.task_dict.pop(task_id)

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

List tasks.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
359
360
361
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """List tasks."""
    return [task_state.task for task_state in self.state.task_dict.values()]

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

Get task.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
363
364
365
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """Get task."""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(task_id: str, **kwargs: Any) -> List[TaskStep]

Get upcoming steps.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
367
368
369
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """Get upcoming steps."""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(task_id: str, **kwargs: Any) -> List[TaskStepOutput]

Get completed steps.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
371
372
373
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """Get completed steps."""
    return self.state.get_completed_steps(task_id)

get_task_output #

get_task_output(task_id: str, **kwargs: Any) -> TaskStepOutput

Get task output.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
375
376
377
378
379
380
def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
    """Get task output."""
    completed_steps = self.get_completed_steps(task_id)
    if len(completed_steps) == 0:
        raise ValueError(f"No completed steps for task_id: {task_id}")
    return completed_steps[-1]

get_completed_tasks #

get_completed_tasks(**kwargs: Any) -> List[Task]

Get completed tasks.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
382
383
384
385
386
387
388
389
390
391
def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
    """Get completed tasks."""
    task_states = list(self.state.task_dict.values())
    completed_tasks = []
    for task_state in task_states:
        completed_steps = self.get_completed_steps(task_state.task.task_id)
        if len(completed_steps) > 0 and completed_steps[-1].is_last:
            completed_tasks.append(task_state.task)

    return completed_tasks

run_step #

run_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
477
478
479
480
481
482
483
484
485
486
487
488
489
@dispatcher.span
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

arun_step async #

arun_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
491
492
493
494
495
496
497
498
499
500
501
502
503
@dispatcher.span
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
505
506
507
508
509
510
511
512
513
514
515
516
517
@dispatcher.span
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (stream)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

astream_step async #

astream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
519
520
521
522
523
524
525
526
527
528
529
530
531
@dispatcher.span
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async stream)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(task_id: str, step_output: Optional[TaskStepOutput] = None) -> AGENT_CHAT_RESPONSE_TYPE

Finalize response.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
@dispatcher.span
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """Finalize response."""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    # Attach all sources generated across all steps
    step_output.output.sources = self.get_task(task_id).extra_state.get(
        "sources", []
    )
    step_output.output.set_source_nodes()

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

afinalize_response async #

afinalize_response(task_id: str, step_output: Optional[TaskStepOutput] = None) -> AGENT_CHAT_RESPONSE_TYPE

Finalize response.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
@dispatcher.span
async def afinalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """Finalize response."""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    await self.agent_worker.afinalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    # Attach all sources generated across all steps
    step_output.output.sources = self.get_task(task_id).extra_state.get(
        "sources", []
    )
    step_output.output.set_source_nodes()

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

Undo previous step.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
779
780
781
def undo_step(self, task_id: str) -> None:
    """Undo previous step."""
    raise NotImplementedError("undo_step not implemented")

ParallelAgentRunner #

Bases: BaseAgentRunner

Parallel agent runner.

Executes steps in queue in parallel. Requires async support.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class ParallelAgentRunner(BaseAgentRunner):
    """Parallel agent runner.

    Executes steps in queue in parallel. Requires async support.

    """

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[DAGAgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
    ) -> None:
        """Initialize."""
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)
        self.state = state or DAGAgentState()
        self.callback_manager = callback_manager or CallbackManager([])
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.agent_worker = agent_worker
        self.delete_task_on_finish = delete_task_on_finish

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """Create task."""
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=self.init_task_state_kwargs,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # add it to state
        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = DAGTaskState(
            task=task,
            root_step=initial_step,
            step_queue=deque([initial_step]),
        )

        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """Delete task.

        NOTE: this will not delete any previous executions from memory.

        """
        self.state.task_dict.pop(task_id)

    def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
        """Get completed tasks."""
        task_states = list(self.state.task_dict.values())
        return [
            task_state.task
            for task_state in task_states
            if len(task_state.completed_steps) > 0
            and task_state.completed_steps[-1].is_last
        ]

    def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
        """Get task output."""
        task_state = self.state.task_dict[task_id]
        if len(task_state.completed_steps) == 0:
            raise ValueError(f"No completed steps for task_id: {task_id}")
        return task_state.completed_steps[-1]

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """List tasks."""
        task_states = list(self.state.task_dict.values())
        return [task_state.task for task_state in task_states]

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """Get task."""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """Get upcoming steps."""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """Get completed steps."""
        return self.state.get_completed_steps(task_id)

    def run_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """Execute steps in queue.

        Run all steps in queue, clearing it out.

        Assume that all steps can be run in parallel.

        """
        return asyncio_run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

    async def arun_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """Execute all steps in queue.

        All steps in queue are assumed to be ready.

        """
        # first pop all steps from step_queue
        steps: List[TaskStep] = []
        while len(self.state.get_step_queue(task_id)) > 0:
            steps.append(self.state.get_step_queue(task_id).popleft())

        # take every item in the queue, and run it
        tasks = []
        for step in steps:
            tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

        return await asyncio.gather(*tasks)

    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output: TaskStepOutput = self.agent_worker.run_step(
                step, task, **kwargs
            )
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step."""
        return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async)."""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.WAIT, **kwargs
        )

    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (stream)."""
        return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.STREAM, **kwargs
        )

    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Finalize response."""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = self.run_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = await self.arun_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            e.on_end(payload={EventPayload.RESPONSE: chat_response})

        return chat_response  # type: ignore

    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )

            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response  # type: ignore

    def undo_step(self, task_id: str) -> None:
        """Undo previous step."""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

Create task.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def create_task(self, input: str, **kwargs: Any) -> Task:
    """Create task."""
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=self.init_task_state_kwargs,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # add it to state
    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = DAGTaskState(
        task=task,
        root_step=initial_step,
        step_queue=deque([initial_step]),
    )

    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

Delete task.

NOTE: this will not delete any previous executions from memory.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
129
130
131
132
133
134
135
136
137
138
def delete_task(
    self,
    task_id: str,
) -> None:
    """Delete task.

    NOTE: this will not delete any previous executions from memory.

    """
    self.state.task_dict.pop(task_id)

get_completed_tasks #

get_completed_tasks(**kwargs: Any) -> List[Task]

Get completed tasks.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
140
141
142
143
144
145
146
147
148
def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
    """Get completed tasks."""
    task_states = list(self.state.task_dict.values())
    return [
        task_state.task
        for task_state in task_states
        if len(task_state.completed_steps) > 0
        and task_state.completed_steps[-1].is_last
    ]

get_task_output #

get_task_output(task_id: str, **kwargs: Any) -> TaskStepOutput

Get task output.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
150
151
152
153
154
155
def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
    """Get task output."""
    task_state = self.state.task_dict[task_id]
    if len(task_state.completed_steps) == 0:
        raise ValueError(f"No completed steps for task_id: {task_id}")
    return task_state.completed_steps[-1]

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

List tasks.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
157
158
159
160
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """List tasks."""
    task_states = list(self.state.task_dict.values())
    return [task_state.task for task_state in task_states]

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

Get task.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
162
163
164
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """Get task."""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(task_id: str, **kwargs: Any) -> List[TaskStep]

Get upcoming steps.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
166
167
168
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """Get upcoming steps."""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(task_id: str, **kwargs: Any) -> List[TaskStepOutput]

Get completed steps.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
170
171
172
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """Get completed steps."""
    return self.state.get_completed_steps(task_id)

run_steps_in_queue #

run_steps_in_queue(task_id: str, mode: ChatResponseMode = WAIT, **kwargs: Any) -> List[TaskStepOutput]

Execute steps in queue.

Run all steps in queue, clearing it out.

Assume that all steps can be run in parallel.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def run_steps_in_queue(
    self,
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any,
) -> List[TaskStepOutput]:
    """Execute steps in queue.

    Run all steps in queue, clearing it out.

    Assume that all steps can be run in parallel.

    """
    return asyncio_run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

arun_steps_in_queue async #

arun_steps_in_queue(task_id: str, mode: ChatResponseMode = WAIT, **kwargs: Any) -> List[TaskStepOutput]

Execute all steps in queue.

All steps in queue are assumed to be ready.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
async def arun_steps_in_queue(
    self,
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any,
) -> List[TaskStepOutput]:
    """Execute all steps in queue.

    All steps in queue are assumed to be ready.

    """
    # first pop all steps from step_queue
    steps: List[TaskStep] = []
    while len(self.state.get_step_queue(task_id)) > 0:
        steps.append(self.state.get_step_queue(task_id).popleft())

    # take every item in the queue, and run it
    tasks = []
    for step in steps:
        tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

    return await asyncio.gather(*tasks)

run_step #

run_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
278
279
280
281
282
283
284
285
286
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step."""
    return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

arun_step async #

arun_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
288
289
290
291
292
293
294
295
296
297
298
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async)."""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
300
301
302
303
304
305
306
307
308
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (stream)."""
    return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

astream_step async #

astream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
310
311
312
313
314
315
316
317
318
319
320
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async stream)."""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(task_id: str, step_output: Optional[TaskStepOutput] = None) -> AGENT_CHAT_RESPONSE_TYPE

Finalize response.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """Finalize response."""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

Undo previous step.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
494
495
496
def undo_step(self, task_id: str) -> None:
    """Undo previous step."""
    raise NotImplementedError("undo_step not implemented")

Workers#

CustomSimpleAgentWorker #

Bases: BaseModel, BaseAgentWorker

Custom simple agent worker.

This is "simple" in the sense that some of the scaffolding is setup already. Assumptions: - assumes that the agent has tools, llm, callback manager, and tool retriever - has a from_tools convenience function - assumes that the agent is sequential, and doesn't take in any additional intermediate inputs.

Parameters:

Name Type Description Default
tools Sequence[BaseTool]

Tools to use for reasoning

required
llm LLM

LLM to use

required
callback_manager CallbackManager

Callback manager

<llama_index.core.callbacks.base.CallbackManager object at 0x7f7578378860>
tool_retriever Optional[ObjectRetriever[BaseTool]]

Tool retriever

None
verbose bool

Whether to print out reasoning steps

False
Source code in llama-index-core/llama_index/core/agent/custom/simple.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class CustomSimpleAgentWorker(BaseModel, BaseAgentWorker):
    """Custom simple agent worker.

    This is "simple" in the sense that some of the scaffolding is setup already.
    Assumptions:
    - assumes that the agent has tools, llm, callback manager, and tool retriever
    - has a `from_tools` convenience function
    - assumes that the agent is sequential, and doesn't take in any additional
    intermediate inputs.

    Args:
        tools (Sequence[BaseTool]): Tools to use for reasoning
        llm (LLM): LLM to use
        callback_manager (CallbackManager): Callback manager
        tool_retriever (Optional[ObjectRetriever[BaseTool]]): Tool retriever
        verbose (bool): Whether to print out reasoning steps

    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    tools: Sequence[BaseTool] = Field(..., description="Tools to use for reasoning")
    llm: LLM = Field(..., description="LLM to use")
    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]), exclude=True
    )
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = Field(
        default=None, description="Tool retriever"
    )
    verbose: bool = Field(False, description="Whether to print out reasoning steps")

    _get_tools: Callable[[str], Sequence[BaseTool]] = PrivateAttr()

    def __init__(
        self,
        tools: Sequence[BaseTool],
        llm: LLM,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        **kwargs: Any,
    ) -> None:
        callback_manager = callback_manager or CallbackManager([])
        super().__init__(
            tools=tools,
            llm=llm,
            callback_manager=callback_manager or CallbackManager([]),
            tool_retriever=tool_retriever,
            verbose=verbose,
            **kwargs,
        )

        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "CustomSimpleAgentWorker":
        """Convenience constructor method from set of BaseTools (Optional)."""
        llm = llm or Settings.llm
        if callback_manager is not None:
            llm.callback_manager = callback_manager
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            llm=llm,
            callback_manager=callback_manager or CallbackManager([]),
            verbose=verbose,
            **kwargs,
        )

    @abstractmethod
    def _initialize_state(self, task: Task, **kwargs: Any) -> Dict[str, Any]:
        """Initialize state."""

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        step_state = self._initialize_state(task, **kwargs)
        # if intersecting keys, error
        if set(step_state.keys()).intersection(set(initial_state.keys())):
            raise ValueError(
                f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
                f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
            )
        step_state.update(initial_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=step_state,
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """Get tools."""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @abstractmethod
    def _run_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step.

        Returns:
            Tuple of (agent_response, is_done)

        """

    async def _arun_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step (async).

        Can override this method if you want to run the step asynchronously.

        Returns:
            Tuple of (agent_response, is_done)

        """
        raise NotImplementedError(
            "This agent does not support async." "Please implement _arun_step."
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        agent_response, is_done = self._run_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        agent_response, is_done = await self._arun_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @abstractmethod
    def _finalize_task(self, state: Dict[str, Any], **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed.

        State is all the step states.

        """

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()
        self._finalize_task(task.extra_state, **kwargs)

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(tools: Optional[Sequence[BaseTool]] = None, tool_retriever: Optional[ObjectRetriever[BaseTool]] = None, llm: Optional[LLM] = None, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> CustomSimpleAgentWorker

Convenience constructor method from set of BaseTools (Optional).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@classmethod
def from_tools(
    cls,
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    llm: Optional[LLM] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "CustomSimpleAgentWorker":
    """Convenience constructor method from set of BaseTools (Optional)."""
    llm = llm or Settings.llm
    if callback_manager is not None:
        llm.callback_manager = callback_manager
    return cls(
        tools=tools or [],
        tool_retriever=tool_retriever,
        llm=llm,
        callback_manager=callback_manager or CallbackManager([]),
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    step_state = self._initialize_state(task, **kwargs)
    # if intersecting keys, error
    if set(step_state.keys()).intersection(set(initial_state.keys())):
        raise ValueError(
            f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
            f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
        )
    step_state.update(initial_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=step_state,
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

Get tools.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
155
156
157
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """Get tools."""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
207
208
209
210
211
212
213
214
215
216
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    agent_response, is_done = self._run_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
218
219
220
221
222
223
224
225
226
227
228
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    agent_response, is_done = await self._arun_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
230
231
232
233
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
235
236
237
238
239
240
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
250
251
252
253
254
255
256
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()
    self._finalize_task(task.extra_state, **kwargs)

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
258
259
260
261
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

MultimodalReActAgentWorker #

Bases: BaseAgentWorker

Multimodal ReAct Agent worker.

NOTE: This is a BETA feature.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
class MultimodalReActAgentWorker(BaseAgentWorker):
    """Multimodal ReAct Agent worker.

    **NOTE**: This is a BETA feature.

    """

    def __init__(
        self,
        tools: Sequence[BaseTool],
        multi_modal_llm: MultiModalLLM,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        generate_chat_message_fn: Optional[ChatMessageCallable] = None,
    ) -> None:
        self._multi_modal_llm = multi_modal_llm
        self.callback_manager = callback_manager or CallbackManager([])
        self._max_iterations = max_iterations
        self._react_chat_formatter = react_chat_formatter or ReActChatFormatter(
            system_header=REACT_MM_CHAT_SYSTEM_HEADER
        )
        self._output_parser = output_parser or ReActOutputParser()
        self._verbose = verbose
        self._generate_chat_message_fn = generate_chat_message_fn
        if self._generate_chat_message_fn is None:
            try:
                from llama_index.multi_modal_llms.openai.utils import (
                    generate_openai_multi_modal_chat_message,
                )  # pants: no-infer-dep

                self._generate_chat_message_fn = (
                    generate_openai_multi_modal_chat_message
                )

            except ImportError:
                raise ImportError(
                    "`llama-index-multi-modal-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
                )

        self._add_user_step_to_reasoning = partial(
            add_user_step_to_reasoning,
            generate_chat_message_fn=self._generate_chat_message_fn,  # type: ignore
        )

        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        multi_modal_llm: Optional[MultiModalLLM] = None,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "MultimodalReActAgentWorker":
        """Convenience constructor method from set of BaseTools (Optional).

        NOTE: kwargs should have been exhausted by this point. In other words
        the various upstream components such as BaseSynthesizer (response synthesizer)
        or BaseRetriever should have picked up off their respective kwargs in their
        constructions.

        Returns:
            ReActAgent
        """
        if multi_modal_llm is None:
            try:
                from llama_index.multi_modal_llms.openai import (
                    OpenAIMultiModal,
                )  # pants: no-infer-dep

                multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                    model="gpt-4-vision-preview", max_new_tokens=1000
                )
            except ImportError:
                raise ImportError(
                    "`llama-index-multi-modal-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
                )
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            multi_modal_llm=multi_modal_llm,
            max_iterations=max_iterations,
            react_chat_formatter=react_chat_formatter,
            output_parser=output_parser,
            callback_manager=callback_manager,
            verbose=verbose,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        current_reasoning: List[BaseReasoningStep] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # validation
        if "image_docs" not in task.extra_state:
            raise ValueError("Image docs not found in task extra state.")

        # initialize task state
        task_state = {
            "sources": sources,
            "current_reasoning": current_reasoning,
            "new_memory": new_memory,
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """Get tools."""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _extract_reasoning_step(
        self, output: ChatResponse, is_streaming: bool = False
    ) -> Tuple[str, List[BaseReasoningStep], bool]:
        """
        Extracts the reasoning step from the given output.

        This method parses the message content from the output,
        extracts the reasoning step, and determines whether the processing is
        complete. It also performs validation checks on the output and
        handles possible errors.
        """
        if output.message.content is None:
            raise ValueError("Got empty message.")
        message_content = output.message.content

        current_reasoning = []
        try:
            reasoning_step = self._output_parser.parse(message_content, is_streaming)
        except BaseException as exc:
            raise ValueError(f"Could not parse output: {message_content}") from exc
        if self._verbose:
            print_text(f"{reasoning_step.get_content()}\n", color="pink")
        current_reasoning.append(reasoning_step)

        if reasoning_step.is_done:
            return message_content, current_reasoning, True

        reasoning_step = cast(ActionReasoningStep, reasoning_step)
        if not isinstance(reasoning_step, ActionReasoningStep):
            raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

        return message_content, current_reasoning, False

    def _process_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict: Dict[str, AsyncBaseTool] = {
            tool.metadata.get_name(): tool for tool in tools
        }
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = tool.call(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(
            observation=str(tool_output), return_direct=tool.metadata.return_direct
        )
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, tool.metadata.return_direct

    async def _aprocess_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict = {tool.metadata.name: tool for tool in tools}
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = await tool.acall(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(
            observation=str(tool_output), return_direct=tool.metadata.return_direct
        )
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, tool.metadata.return_direct

    def _get_response(
        self,
        current_reasoning: List[BaseReasoningStep],
        sources: List[ToolOutput],
    ) -> AgentChatResponse:
        """Get response from reasoning steps."""
        if len(current_reasoning) == 0:
            raise ValueError("No reasoning steps were taken.")
        elif len(current_reasoning) == self._max_iterations:
            raise ValueError("Reached max iterations.")

        if isinstance(current_reasoning[-1], ResponseReasoningStep):
            response_step = cast(ResponseReasoningStep, current_reasoning[-1])
            response_str = response_step.response
        elif (
            isinstance(current_reasoning[-1], ObservationReasoningStep)
            and current_reasoning[-1].return_direct
        ):
            response_str = current_reasoning[-1].observation
        else:
            response_str = current_reasoning[-1].get_content()

        # TODO: add sources from reasoning steps
        return AgentChatResponse(response=response_str, sources=sources)

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    def _run_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        # This is either not None on the first step or if the user specifies
        # an intermediate step in the middle
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step=step,
                memory=task.extra_state["new_memory"],
                current_reasoning=task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )

        # send prompt
        chat_response = self._multi_modal_llm.chat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = self._process_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    async def _arun_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step=step,
                memory=task.extra_state["new_memory"],
                current_reasoning=task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )
        # send prompt
        chat_response = await self._multi_modal_llm.achat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = await self._aprocess_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    def _run_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        raise NotImplementedError("Stream step not implemented yet.")

    async def _arun_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        raise NotImplementedError("Stream step not implemented yet.")

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        return self._run_step(step, task)

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        return await self._arun_step(step, task)

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # TODO: figure out if we need a different type for TaskStepOutput
        return self._run_step_stream(step, task)

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        return await self._arun_step_stream(step, task)

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(
            task.memory.get_all() + task.extra_state["new_memory"].get_all()
        )
        # reset new memory
        task.extra_state["new_memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(tools: Optional[Sequence[BaseTool]] = None, tool_retriever: Optional[ObjectRetriever[BaseTool]] = None, multi_modal_llm: Optional[MultiModalLLM] = None, max_iterations: int = 10, react_chat_formatter: Optional[ReActChatFormatter] = None, output_parser: Optional[ReActOutputParser] = None, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> MultimodalReActAgentWorker

Convenience constructor method from set of BaseTools (Optional).

NOTE: kwargs should have been exhausted by this point. In other words the various upstream components such as BaseSynthesizer (response synthesizer) or BaseRetriever should have picked up off their respective kwargs in their constructions.

Returns:

Type Description
MultimodalReActAgentWorker

ReActAgent

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@classmethod
def from_tools(
    cls,
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    multi_modal_llm: Optional[MultiModalLLM] = None,
    max_iterations: int = 10,
    react_chat_formatter: Optional[ReActChatFormatter] = None,
    output_parser: Optional[ReActOutputParser] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "MultimodalReActAgentWorker":
    """Convenience constructor method from set of BaseTools (Optional).

    NOTE: kwargs should have been exhausted by this point. In other words
    the various upstream components such as BaseSynthesizer (response synthesizer)
    or BaseRetriever should have picked up off their respective kwargs in their
    constructions.

    Returns:
        ReActAgent
    """
    if multi_modal_llm is None:
        try:
            from llama_index.multi_modal_llms.openai import (
                OpenAIMultiModal,
            )  # pants: no-infer-dep

            multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                model="gpt-4-vision-preview", max_new_tokens=1000
            )
        except ImportError:
            raise ImportError(
                "`llama-index-multi-modal-llms-openai` package cannot be found. "
                "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
            )
    return cls(
        tools=tools or [],
        tool_retriever=tool_retriever,
        multi_modal_llm=multi_modal_llm,
        max_iterations=max_iterations,
        react_chat_formatter=react_chat_formatter,
        output_parser=output_parser,
        callback_manager=callback_manager,
        verbose=verbose,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    current_reasoning: List[BaseReasoningStep] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # validation
    if "image_docs" not in task.extra_state:
        raise ValueError("Image docs not found in task extra state.")

    # initialize task state
    task_state = {
        "sources": sources,
        "current_reasoning": current_reasoning,
        "new_memory": new_memory,
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

Get tools.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
235
236
237
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """Get tools."""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
494
495
496
497
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    return self._run_step(step, task)

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
499
500
501
502
503
504
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    return await self._arun_step(step, task)

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
506
507
508
509
510
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    # TODO: figure out if we need a different type for TaskStepOutput
    return self._run_step_stream(step, task)

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
512
513
514
515
516
517
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    return await self._arun_step_stream(step, task)

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
519
520
521
522
523
524
525
526
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(
        task.memory.get_all() + task.extra_state["new_memory"].get_all()
    )
    # reset new memory
    task.extra_state["new_memory"].reset()

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
528
529
530
531
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

QueryPipelineAgentWorker #

Bases: BaseModel, BaseAgentWorker

Query Pipeline agent worker.

NOTE: This is now deprecated. Use FnAgentWorker instead to build a stateful agent.

Barebones agent worker that takes in a query pipeline.

Default Workflow: The default workflow assumes that you compose a query pipeline with StatefulFnComponent objects. This allows you to store, update and retrieve state throughout the executions of the query pipeline by the agent.

The task and step state of the agent are stored in this state variable via a special key. Of course you can choose to store other variables in this state as well.

Deprecated Workflow: The deprecated workflow assumes that the first component in the query pipeline is an AgentInputComponent and last is AgentFnComponent.

Parameters:

Name Type Description Default
pipeline QueryPipeline

Query pipeline

required
callback_manager CallbackManager
required
task_key str

Key to store task in state

'task'
step_state_key str

Key to store step in state

'step_state'
Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
@deprecated("Use `FnAgentWorker` instead to build a stateful agent.")
class QueryPipelineAgentWorker(BaseModel, BaseAgentWorker):
    """Query Pipeline agent worker.

    NOTE: This is now deprecated. Use `FnAgentWorker` instead to build a stateful agent.

    Barebones agent worker that takes in a query pipeline.

    **Default Workflow**: The default workflow assumes that you compose
    a query pipeline with `StatefulFnComponent` objects. This allows you to store, update
    and retrieve state throughout the executions of the query pipeline by the agent.

    The task and step state of the agent are stored in this `state` variable via a special key.
    Of course you can choose to store other variables in this state as well.

    **Deprecated Workflow**: The deprecated workflow assumes that the first component in the
    query pipeline is an `AgentInputComponent` and last is `AgentFnComponent`.

    Args:
        pipeline (QueryPipeline): Query pipeline

    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    pipeline: QueryPipeline = Field(..., description="Query pipeline")
    callback_manager: CallbackManager = Field(..., exclude=True)
    task_key: str = Field("task", description="Key to store task in state")
    step_state_key: str = Field("step_state", description="Key to store step in state")

    def __init__(
        self,
        pipeline: QueryPipeline,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize."""
        if callback_manager is not None:
            # set query pipeline callback
            pipeline.set_callback_manager(callback_manager)
        else:
            callback_manager = pipeline.callback_manager
        super().__init__(
            pipeline=pipeline,
            callback_manager=callback_manager,
            **kwargs,
        )
        # validate query pipeline
        # self.agent_input_component
        self.agent_components

    @property
    def agent_input_component(self) -> AgentInputComponent:
        """Get agent input component.

        NOTE: This is deprecated and will be removed in the future.

        """
        root_key = self.pipeline.get_root_keys()[0]
        if not isinstance(self.pipeline.module_dict[root_key], AgentInputComponent):
            raise ValueError(
                "Query pipeline first component must be AgentInputComponent, got "
                f"{self.pipeline.module_dict[root_key]}"
            )

        return cast(AgentInputComponent, self.pipeline.module_dict[root_key])

    @property
    def agent_components(self) -> Sequence[BaseAgentComponent]:
        """Get agent output component."""
        return _get_agent_components(self.pipeline)

    def preprocess(self, task: Task, step: TaskStep) -> None:
        """Preprocessing flow.

        This runs preprocessing to propagate the task and step as variables
        to relevant components in the query pipeline.

        Contains deprecated flow of updating agent components.
        But also contains main flow of updating StatefulFnComponent components.

        """
        # NOTE: this is deprecated
        # partial agent output component with task and step
        for agent_fn_component in self.agent_components:
            agent_fn_component.partial(task=task, state=step.step_state)

        # update stateful components
        self.pipeline.update_state(
            {self.task_key: task, self.step_state_key: step.step_state}
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=initial_state,
        )

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        self.preprocess(task, step)

        # HACK: do a try/except for now. Fine since old agent components are deprecated
        try:
            self.agent_input_component
            uses_deprecated = True
        except ValueError:
            uses_deprecated = False

        if uses_deprecated:
            agent_response, is_done = self.pipeline.run(
                state=step.step_state, task=task
            )
        else:
            agent_response, is_done = self.pipeline.run()
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        self.preprocess(task, step)

        # HACK: do a try/except for now. Fine since old agent components are deprecated
        try:
            self.agent_input_component
            uses_deprecated = True
        except ValueError:
            uses_deprecated = False

        if uses_deprecated:
            agent_response, is_done = await self.pipeline.arun(
                state=step.step_state, task=task
            )
        else:
            agent_response, is_done = await self.pipeline.arun()
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager
        self.pipeline.set_callback_manager(callback_manager)

agent_input_component property #

agent_input_component: AgentInputComponent

Get agent input component.

NOTE: This is deprecated and will be removed in the future.

agent_components property #

agent_components: Sequence[BaseAgentComponent]

Get agent output component.

preprocess #

preprocess(task: Task, step: TaskStep) -> None

Preprocessing flow.

This runs preprocessing to propagate the task and step as variables to relevant components in the query pipeline.

Contains deprecated flow of updating agent components. But also contains main flow of updating StatefulFnComponent components.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def preprocess(self, task: Task, step: TaskStep) -> None:
    """Preprocessing flow.

    This runs preprocessing to propagate the task and step as variables
    to relevant components in the query pipeline.

    Contains deprecated flow of updating agent components.
    But also contains main flow of updating StatefulFnComponent components.

    """
    # NOTE: this is deprecated
    # partial agent output component with task and step
    for agent_fn_component in self.agent_components:
        agent_fn_component.partial(task=task, state=step.step_state)

    # update stateful components
    self.pipeline.update_state(
        {self.task_key: task, self.step_state_key: step.step_state}
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=initial_state,
    )

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    self.preprocess(task, step)

    # HACK: do a try/except for now. Fine since old agent components are deprecated
    try:
        self.agent_input_component
        uses_deprecated = True
    except ValueError:
        uses_deprecated = False

    if uses_deprecated:
        agent_response, is_done = self.pipeline.run(
            state=step.step_state, task=task
        )
    else:
        agent_response, is_done = self.pipeline.run()
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    self.preprocess(task, step)

    # HACK: do a try/except for now. Fine since old agent components are deprecated
    try:
        self.agent_input_component
        uses_deprecated = True
    except ValueError:
        uses_deprecated = False

    if uses_deprecated:
        agent_response, is_done = await self.pipeline.arun(
            state=step.step_state, task=task
        )
    else:
        agent_response, is_done = await self.pipeline.arun()
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
233
234
235
236
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
238
239
240
241
242
243
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
245
246
247
248
249
250
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
252
253
254
255
256
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager
    self.pipeline.set_callback_manager(callback_manager)