Skip to content

Core Agent Classes#

Base Types#

Base agent type.

BaseAgent #

Bases: BaseChatEngine, BaseQueryEngine

Base Agent.

Source code in llama-index-core/llama_index/core/agent/types.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class BaseAgent(BaseChatEngine, BaseQueryEngine):
    """Base Agent."""

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt modules."""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""

    # ===== Query Engine Interface =====
    @trace_method("query")
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = self.chat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    @trace_method("query")
    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = await self.achat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    def stream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("stream_chat not implemented")

    async def astream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("astream_chat not implemented")

BaseAgentWorker #

Bases: PromptMixin

Base agent worker.

Source code in llama-index-core/llama_index/core/agent/types.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class BaseAgentWorker(PromptMixin):
    """Base agent worker."""

    class Config:
        arbitrary_types_allowed = True

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt modules."""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""

    @abstractmethod
    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""

    @abstractmethod
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""

    @abstractmethod
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        raise NotImplementedError

    @abstractmethod
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # TODO: figure out if we need a different type for TaskStepOutput
        raise NotImplementedError

    @abstractmethod
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError

    @abstractmethod
    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""

initialize_step abstractmethod #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/types.py
205
206
207
@abstractmethod
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""

run_step abstractmethod #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/types.py
209
210
211
@abstractmethod
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""

arun_step abstractmethod async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/types.py
213
214
215
216
217
218
@abstractmethod
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    raise NotImplementedError

stream_step abstractmethod #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/types.py
220
221
222
223
224
@abstractmethod
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    # TODO: figure out if we need a different type for TaskStepOutput
    raise NotImplementedError

astream_step abstractmethod async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/types.py
226
227
228
229
230
231
@abstractmethod
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError

finalize_task abstractmethod #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/types.py
233
234
235
@abstractmethod
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/types.py
237
238
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""

Task #

Bases: BaseModel

Agent Task.

Represents a "run" of an agent given a user input.

Source code in llama-index-core/llama_index/core/agent/types.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class Task(BaseModel):
    """Agent Task.

    Represents a "run" of an agent given a user input.

    """

    class Config:
        arbitrary_types_allowed = True

    task_id: str = Field(
        default_factory=lambda: str(uuid.uuid4()), type=str, description="Task ID"
    )
    input: str = Field(..., type=str, description="User input")

    # NOTE: this is state that may be modified throughout the course of execution of the task
    memory: BaseMemory = Field(
        ...,
        type=BaseMemory,
        description=(
            "Conversational Memory. Maintains state before execution of this task."
        ),
    )

    callback_manager: CallbackManager = Field(
        default_factory=CallbackManager,
        exclude=True,
        description="Callback manager for the task.",
    )

    extra_state: Dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Additional user-specified state for a given task. "
            "Can be modified throughout the execution of a task."
        ),
    )

TaskStep #

Bases: BaseModel

Agent task step.

Represents a single input step within the execution run ("Task") of an agent given a user input.

The output is returned as a TaskStepOutput.

Source code in llama-index-core/llama_index/core/agent/types.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class TaskStep(BaseModel):
    """Agent task step.

    Represents a single input step within the execution run ("Task") of an agent
    given a user input.

    The output is returned as a `TaskStepOutput`.

    """

    task_id: str = Field(..., diescription="Task ID")
    step_id: str = Field(..., description="Step ID")
    input: Optional[str] = Field(default=None, description="User input")
    # memory: BaseMemory = Field(
    #     ..., type=BaseMemory, description="Conversational Memory"
    # )
    step_state: Dict[str, Any] = Field(
        default_factory=dict, description="Additional state for a given step."
    )

    # NOTE: the state below may change throughout the course of execution
    # this tracks the relationships to other steps
    next_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict, description="Next steps to be executed."
    )
    prev_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict,
        description="Previous steps that were dependencies for this step.",
    )
    is_ready: bool = Field(
        default=True, description="Is this step ready to be executed?"
    )

    def get_next_step(
        self,
        step_id: str,
        input: Optional[str] = None,
        step_state: Optional[Dict[str, Any]] = None,
    ) -> "TaskStep":
        """Convenience function to get next step.

        Preserve task_id, memory, step_state.

        """
        return TaskStep(
            task_id=self.task_id,
            step_id=step_id,
            input=input,
            # memory=self.memory,
            step_state=step_state or self.step_state,
        )

    def link_step(
        self,
        next_step: "TaskStep",
    ) -> None:
        """Link to next step.

        Add link from this step to next, and from next step to current.

        """
        self.next_steps[next_step.step_id] = next_step
        next_step.prev_steps[self.step_id] = self

get_next_step #

get_next_step(step_id: str, input: Optional[str] = None, step_state: Optional[Dict[str, Any]] = None) -> TaskStep

Convenience function to get next step.

Preserve task_id, memory, step_state.

Source code in llama-index-core/llama_index/core/agent/types.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_next_step(
    self,
    step_id: str,
    input: Optional[str] = None,
    step_state: Optional[Dict[str, Any]] = None,
) -> "TaskStep":
    """Convenience function to get next step.

    Preserve task_id, memory, step_state.

    """
    return TaskStep(
        task_id=self.task_id,
        step_id=step_id,
        input=input,
        # memory=self.memory,
        step_state=step_state or self.step_state,
    )
link_step(next_step: TaskStep) -> None

Link to next step.

Add link from this step to next, and from next step to current.

Source code in llama-index-core/llama_index/core/agent/types.py
121
122
123
124
125
126
127
128
129
130
131
def link_step(
    self,
    next_step: "TaskStep",
) -> None:
    """Link to next step.

    Add link from this step to next, and from next step to current.

    """
    self.next_steps[next_step.step_id] = next_step
    next_step.prev_steps[self.step_id] = self

TaskStepOutput #

Bases: BaseModel

Agent task step output.

Source code in llama-index-core/llama_index/core/agent/types.py
134
135
136
137
138
139
140
141
142
143
144
class TaskStepOutput(BaseModel):
    """Agent task step output."""

    output: Any = Field(..., description="Task step output")
    task_step: TaskStep = Field(..., description="Task step input")
    next_steps: List[TaskStep] = Field(..., description="Next steps to be executed.")
    is_last: bool = Field(default=False, description="Is this the last step?")

    def __str__(self) -> str:
        """String representation."""
        return str(self.output)

Runners#

AgentRunner #

Bases: BaseAgentRunner

Agent runner.

Top-level agent orchestrator that can create tasks, run each step in a task, or run a task e2e. Stores state and keeps track of tasks.

Parameters:

Name Type Description Default
agent_worker BaseAgentWorker

step executor

required
chat_history Optional[List[ChatMessage]]

chat history. Defaults to None.

None
state Optional[AgentState]

agent state. Defaults to None.

None
memory Optional[BaseMemory]

memory. Defaults to None.

None
llm Optional[LLM]

LLM. Defaults to None.

None
callback_manager Optional[CallbackManager]

callback manager. Defaults to None.

None
init_task_state_kwargs Optional[dict]

init task state kwargs. Defaults to None.

None
Source code in llama-index-core/llama_index/core/agent/runner/base.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
class AgentRunner(BaseAgentRunner):
    """Agent runner.

    Top-level agent orchestrator that can create tasks, run each step in a task,
    or run a task e2e. Stores state and keeps track of tasks.

    Args:
        agent_worker (BaseAgentWorker): step executor
        chat_history (Optional[List[ChatMessage]], optional): chat history. Defaults to None.
        state (Optional[AgentState], optional): agent state. Defaults to None.
        memory (Optional[BaseMemory], optional): memory. Defaults to None.
        llm (Optional[LLM], optional): LLM. Defaults to None.
        callback_manager (Optional[CallbackManager], optional): callback manager. Defaults to None.
        init_task_state_kwargs (Optional[dict], optional): init task state kwargs. Defaults to None.

    """

    # # TODO: implement this in Pydantic

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[AgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
        default_tool_choice: str = "auto",
        verbose: bool = False,
    ) -> None:
        """Initialize."""
        self.agent_worker = agent_worker
        self.state = state or AgentState()
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)

        # get and set callback manager
        if callback_manager is not None:
            self.agent_worker.set_callback_manager(callback_manager)
            self.callback_manager = callback_manager
        else:
            # TODO: This is *temporary*
            # Stopgap before having a callback on the BaseAgentWorker interface.
            # Doing that requires a bit more refactoring to make sure existing code
            # doesn't break.
            if hasattr(self.agent_worker, "callback_manager"):
                self.callback_manager = (
                    self.agent_worker.callback_manager or CallbackManager()
                )
            else:
                self.callback_manager = CallbackManager()
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.delete_task_on_finish = delete_task_on_finish
        self.default_tool_choice = default_tool_choice
        self.verbose = verbose

    @staticmethod
    def from_llm(
        tools: Optional[List[BaseTool]] = None,
        llm: Optional[LLM] = None,
        **kwargs: Any,
    ) -> "AgentRunner":
        from llama_index.core.agent import ReActAgent

        if os.getenv("IS_TESTING"):
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

        try:
            from llama_index.llms.openai import OpenAI  # pants: no-infer-dep
            from llama_index.llms.openai.utils import (
                is_function_calling_model,
            )  # pants: no-infer-dep
        except ImportError:
            raise ImportError(
                "`llama-index-llms-openai` package not found. Please "
                "install by running `pip install llama-index-llms-openai`."
            )

        if isinstance(llm, OpenAI) and is_function_calling_model(llm.model):
            from llama_index.agent.openai import OpenAIAgent  # pants: no-infer-dep

            return OpenAIAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )
        else:
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()
        self.state.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """Create task."""
        if not self.init_task_state_kwargs:
            extra_state = kwargs.pop("extra_state", {})
        else:
            if "extra_state" in kwargs:
                raise ValueError(
                    "Cannot specify both `extra_state` and `init_task_state_kwargs`"
                )
            else:
                extra_state = self.init_task_state_kwargs

        callback_manager = kwargs.pop("callback_manager", self.callback_manager)
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=extra_state,
            callback_manager=callback_manager,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = TaskState(
            task=task,
            step_queue=deque([initial_step]),
        )
        # add it to state
        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """Delete task.

        NOTE: this will not delete any previous executions from memory.

        """
        self.state.task_dict.pop(task_id)

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """List tasks."""
        return list(self.state.task_dict.values())

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """Get task."""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """Get upcoming steps."""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """Get completed steps."""
        return self.state.get_completed_steps(task_id)

    @dispatcher.span
    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        dispatcher.event(AgentRunStepStartEvent())
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible

        if mode == ChatResponseMode.WAIT:
            cur_step_output = self.agent_worker.run_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        dispatcher.event(AgentRunStepEndEvent())
        return cur_step_output

    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible
        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (stream)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Finalize response."""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    @dispatcher.span
    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        dispatcher.event(AgentChatWithStepStartEvent())
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = self._run_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        result = self.finalize_response(
            task.task_id,
            result_output,
        )
        dispatcher.event(AgentChatWithStepEndEvent())
        return result

    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = await self._arun_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    def undo_step(self, task_id: str) -> None:
        """Undo previous step."""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

Create task.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def create_task(self, input: str, **kwargs: Any) -> Task:
    """Create task."""
    if not self.init_task_state_kwargs:
        extra_state = kwargs.pop("extra_state", {})
    else:
        if "extra_state" in kwargs:
            raise ValueError(
                "Cannot specify both `extra_state` and `init_task_state_kwargs`"
            )
        else:
            extra_state = self.init_task_state_kwargs

    callback_manager = kwargs.pop("callback_manager", self.callback_manager)
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=extra_state,
        callback_manager=callback_manager,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = TaskState(
        task=task,
        step_queue=deque([initial_step]),
    )
    # add it to state
    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

Delete task.

NOTE: this will not delete any previous executions from memory.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
331
332
333
334
335
336
337
338
339
340
def delete_task(
    self,
    task_id: str,
) -> None:
    """Delete task.

    NOTE: this will not delete any previous executions from memory.

    """
    self.state.task_dict.pop(task_id)

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

List tasks.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
342
343
344
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """List tasks."""
    return list(self.state.task_dict.values())

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

Get task.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
346
347
348
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """Get task."""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(task_id: str, **kwargs: Any) -> List[TaskStep]

Get upcoming steps.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
350
351
352
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """Get upcoming steps."""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(task_id: str, **kwargs: Any) -> List[TaskStepOutput]

Get completed steps.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
354
355
356
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """Get completed steps."""
    return self.state.get_completed_steps(task_id)

run_step #

run_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
434
435
436
437
438
439
440
441
442
443
444
445
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

arun_step async #

arun_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
447
448
449
450
451
452
453
454
455
456
457
458
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
460
461
462
463
464
465
466
467
468
469
470
471
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (stream)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

astream_step async #

astream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/runner/base.py
473
474
475
476
477
478
479
480
481
482
483
484
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async stream)."""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(task_id: str, step_output: Optional[TaskStepOutput] = None) -> AGENT_CHAT_RESPONSE_TYPE

Finalize response.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """Finalize response."""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

Undo previous step.

Source code in llama-index-core/llama_index/core/agent/runner/base.py
673
674
675
def undo_step(self, task_id: str) -> None:
    """Undo previous step."""
    raise NotImplementedError("undo_step not implemented")

ParallelAgentRunner #

Bases: BaseAgentRunner

Parallel agent runner.

Executes steps in queue in parallel. Requires async support.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class ParallelAgentRunner(BaseAgentRunner):
    """Parallel agent runner.

    Executes steps in queue in parallel. Requires async support.

    """

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[DAGAgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
    ) -> None:
        """Initialize."""
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)
        self.state = state or DAGAgentState()
        self.callback_manager = callback_manager or CallbackManager([])
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.agent_worker = agent_worker
        self.delete_task_on_finish = delete_task_on_finish

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """Create task."""
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=self.init_task_state_kwargs,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # add it to state
        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = DAGTaskState(
            task=task,
            root_step=initial_step,
            step_queue=deque([initial_step]),
        )

        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """Delete task.

        NOTE: this will not delete any previous executions from memory.

        """
        self.state.task_dict.pop(task_id)

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """List tasks."""
        task_states = list(self.state.task_dict.values())
        return [task_state.task for task_state in task_states]

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """Get task."""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """Get upcoming steps."""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """Get completed steps."""
        return self.state.get_completed_steps(task_id)

    def run_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """Execute steps in queue.

        Run all steps in queue, clearing it out.

        Assume that all steps can be run in parallel.

        """
        return asyncio.run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

    async def arun_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """Execute all steps in queue.

        All steps in queue are assumed to be ready.

        """
        # first pop all steps from step_queue
        steps: List[TaskStep] = []
        while len(self.state.get_step_queue(task_id)) > 0:
            steps.append(self.state.get_step_queue(task_id).popleft())

        # take every item in the queue, and run it
        tasks = []
        for step in steps:
            tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

        return await asyncio.gather(*tasks)

    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output: TaskStepOutput = self.agent_worker.run_step(
                step, task, **kwargs
            )
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Execute step."""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step."""
        return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async)."""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.WAIT, **kwargs
        )

    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (stream)."""
        return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.STREAM, **kwargs
        )

    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Finalize response."""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = self.run_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """Chat with step executor."""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = await self.arun_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    def undo_step(self, task_id: str) -> None:
        """Undo previous step."""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

Create task.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def create_task(self, input: str, **kwargs: Any) -> Task:
    """Create task."""
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=self.init_task_state_kwargs,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # add it to state
    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = DAGTaskState(
        task=task,
        root_step=initial_step,
        step_queue=deque([initial_step]),
    )

    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

Delete task.

NOTE: this will not delete any previous executions from memory.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
128
129
130
131
132
133
134
135
136
137
def delete_task(
    self,
    task_id: str,
) -> None:
    """Delete task.

    NOTE: this will not delete any previous executions from memory.

    """
    self.state.task_dict.pop(task_id)

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

List tasks.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
139
140
141
142
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """List tasks."""
    task_states = list(self.state.task_dict.values())
    return [task_state.task for task_state in task_states]

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

Get task.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
144
145
146
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """Get task."""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(task_id: str, **kwargs: Any) -> List[TaskStep]

Get upcoming steps.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
148
149
150
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """Get upcoming steps."""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(task_id: str, **kwargs: Any) -> List[TaskStepOutput]

Get completed steps.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
152
153
154
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """Get completed steps."""
    return self.state.get_completed_steps(task_id)

run_steps_in_queue #

run_steps_in_queue(task_id: str, mode: ChatResponseMode = ChatResponseMode.WAIT, **kwargs: Any) -> List[TaskStepOutput]

Execute steps in queue.

Run all steps in queue, clearing it out.

Assume that all steps can be run in parallel.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def run_steps_in_queue(
    self,
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any,
) -> List[TaskStepOutput]:
    """Execute steps in queue.

    Run all steps in queue, clearing it out.

    Assume that all steps can be run in parallel.

    """
    return asyncio.run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

arun_steps_in_queue async #

arun_steps_in_queue(task_id: str, mode: ChatResponseMode = ChatResponseMode.WAIT, **kwargs: Any) -> List[TaskStepOutput]

Execute all steps in queue.

All steps in queue are assumed to be ready.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def arun_steps_in_queue(
    self,
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any,
) -> List[TaskStepOutput]:
    """Execute all steps in queue.

    All steps in queue are assumed to be ready.

    """
    # first pop all steps from step_queue
    steps: List[TaskStep] = []
    while len(self.state.get_step_queue(task_id)) > 0:
        steps.append(self.state.get_step_queue(task_id).popleft())

    # take every item in the queue, and run it
    tasks = []
    for step in steps:
        tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

    return await asyncio.gather(*tasks)

run_step #

run_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
260
261
262
263
264
265
266
267
268
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step."""
    return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

arun_step async #

arun_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
270
271
272
273
274
275
276
277
278
279
280
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async)."""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
282
283
284
285
286
287
288
289
290
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (stream)."""
    return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

astream_step async #

astream_step(task_id: str, input: Optional[str] = None, step: Optional[TaskStep] = None, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
292
293
294
295
296
297
298
299
300
301
302
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """Run step (async stream)."""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(task_id: str, step_output: Optional[TaskStepOutput] = None) -> AGENT_CHAT_RESPONSE_TYPE

Finalize response.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """Finalize response."""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

Undo previous step.

Source code in llama-index-core/llama_index/core/agent/runner/parallel.py
476
477
478
def undo_step(self, task_id: str) -> None:
    """Undo previous step."""
    raise NotImplementedError("undo_step not implemented")

Workers#

CustomSimpleAgentWorker #

Bases: BaseModel, BaseAgentWorker

Custom simple agent worker.

This is "simple" in the sense that some of the scaffolding is setup already. Assumptions: - assumes that the agent has tools, llm, callback manager, and tool retriever - has a from_tools convenience function - assumes that the agent is sequential, and doesn't take in any additional intermediate inputs.

Parameters:

Name Type Description Default
tools Sequence[BaseTool]

Tools to use for reasoning

required
llm LLM

LLM to use

required
callback_manager CallbackManager

Callback manager

None
tool_retriever Optional[ObjectRetriever[BaseTool]]

Tool retriever

None
verbose bool

Whether to print out reasoning steps

False
Source code in llama-index-core/llama_index/core/agent/custom/simple.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
class CustomSimpleAgentWorker(BaseModel, BaseAgentWorker):
    """Custom simple agent worker.

    This is "simple" in the sense that some of the scaffolding is setup already.
    Assumptions:
    - assumes that the agent has tools, llm, callback manager, and tool retriever
    - has a `from_tools` convenience function
    - assumes that the agent is sequential, and doesn't take in any additional
    intermediate inputs.

    Args:
        tools (Sequence[BaseTool]): Tools to use for reasoning
        llm (LLM): LLM to use
        callback_manager (CallbackManager): Callback manager
        tool_retriever (Optional[ObjectRetriever[BaseTool]]): Tool retriever
        verbose (bool): Whether to print out reasoning steps

    """

    tools: Sequence[BaseTool] = Field(..., description="Tools to use for reasoning")
    llm: LLM = Field(..., description="LLM to use")
    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]), exclude=True
    )
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = Field(
        default=None, description="Tool retriever"
    )
    verbose: bool = Field(False, description="Whether to print out reasoning steps")

    _get_tools: Callable[[str], Sequence[BaseTool]] = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        tools: Sequence[BaseTool],
        llm: LLM,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    ) -> None:
        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

        super().__init__(
            tools=tools,
            llm=llm,
            callback_manager=callback_manager,
            tool_retriever=tool_retriever,
            verbose=verbose,
        )

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "CustomSimpleAgentWorker":
        """Convenience constructor method from set of of BaseTools (Optional)."""
        llm = llm or Settings.llm
        if callback_manager is not None:
            llm.callback_manager = callback_manager
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            llm=llm,
            callback_manager=callback_manager,
            verbose=verbose,
        )

    @abstractmethod
    def _initialize_state(self, task: Task, **kwargs: Any) -> Dict[str, Any]:
        """Initialize state."""

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        step_state = self._initialize_state(task, **kwargs)
        # if intersecting keys, error
        if set(step_state.keys()).intersection(set(initial_state.keys())):
            raise ValueError(
                f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
                f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
            )
        step_state.update(initial_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=step_state,
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """Get tools."""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @abstractmethod
    def _run_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step.

        Returns:
            Tuple of (agent_response, is_done)

        """

    async def _arun_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step (async).

        Can override this method if you want to run the step asynchronously.

        Returns:
            Tuple of (agent_response, is_done)

        """
        raise NotImplementedError(
            "This agent does not support async." "Please implement _arun_step."
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        agent_response, is_done = self._run_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        agent_response, is_done = await self._arun_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @abstractmethod
    def _finalize_task(self, state: Dict[str, Any], **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed.

        State is all the step states.

        """

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()
        self._finalize_task(task.extra_state, **kwargs)

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(tools: Optional[Sequence[BaseTool]] = None, tool_retriever: Optional[ObjectRetriever[BaseTool]] = None, llm: Optional[LLM] = None, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> CustomSimpleAgentWorker

Convenience constructor method from set of of BaseTools (Optional).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def from_tools(
    cls,
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    llm: Optional[LLM] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "CustomSimpleAgentWorker":
    """Convenience constructor method from set of of BaseTools (Optional)."""
    llm = llm or Settings.llm
    if callback_manager is not None:
        llm.callback_manager = callback_manager
    return cls(
        tools=tools or [],
        tool_retriever=tool_retriever,
        llm=llm,
        callback_manager=callback_manager,
        verbose=verbose,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    step_state = self._initialize_state(task, **kwargs)
    # if intersecting keys, error
    if set(step_state.keys()).intersection(set(initial_state.keys())):
        raise ValueError(
            f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
            f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
        )
    step_state.update(initial_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=step_state,
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

Get tools.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
153
154
155
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """Get tools."""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
205
206
207
208
209
210
211
212
213
214
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    agent_response, is_done = self._run_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
216
217
218
219
220
221
222
223
224
225
226
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    agent_response, is_done = await self._arun_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
228
229
230
231
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
233
234
235
236
237
238
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
248
249
250
251
252
253
254
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()
    self._finalize_task(task.extra_state, **kwargs)

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/custom/simple.py
256
257
258
259
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

MultimodalReActAgentWorker #

Bases: BaseAgentWorker

Multimodal ReAct Agent worker.

NOTE: This is a BETA feature.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
class MultimodalReActAgentWorker(BaseAgentWorker):
    """Multimodal ReAct Agent worker.

    **NOTE**: This is a BETA feature.

    """

    def __init__(
        self,
        tools: Sequence[BaseTool],
        multi_modal_llm: MultiModalLLM,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    ) -> None:
        self._multi_modal_llm = multi_modal_llm
        self.callback_manager = callback_manager or CallbackManager([])
        self._max_iterations = max_iterations
        self._react_chat_formatter = react_chat_formatter or ReActChatFormatter(
            system_header=REACT_MM_CHAT_SYSTEM_HEADER
        )
        self._output_parser = output_parser or ReActOutputParser()
        self._verbose = verbose

        try:
            from llama_index.multi_modal_llms.openai.utils import (
                generate_openai_multi_modal_chat_message,
            )  # pants: no-infer-dep

            self._add_user_step_to_reasoning = partial(
                add_user_step_to_reasoning,
                generate_chat_message_fn=generate_openai_multi_modal_chat_message,
            )
        except ImportError:
            raise ImportError(
                "`llama-index-multi-modal-llms-openai` package cannot be found. "
                "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
            )

        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        multi_modal_llm: Optional[MultiModalLLM] = None,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "MultimodalReActAgentWorker":
        """Convenience constructor method from set of of BaseTools (Optional).

        NOTE: kwargs should have been exhausted by this point. In other words
        the various upstream components such as BaseSynthesizer (response synthesizer)
        or BaseRetriever should have picked up off their respective kwargs in their
        constructions.

        Returns:
            ReActAgent
        """
        if multi_modal_llm is None:
            try:
                from llama_index.multi_modal_llms.openai import (
                    OpenAIMultiModal,
                )  # pants: no-infer-dep

                multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                    model="gpt-4-vision-preview", max_new_tokens=1000
                )
            except ImportError:
                raise ImportError(
                    "`llama-index-multi-modal-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
                )
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            multi_modal_llm=multi_modal_llm,
            max_iterations=max_iterations,
            react_chat_formatter=react_chat_formatter,
            output_parser=output_parser,
            callback_manager=callback_manager,
            verbose=verbose,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        current_reasoning: List[BaseReasoningStep] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # validation
        if "image_docs" not in task.extra_state:
            raise ValueError("Image docs not found in task extra state.")

        # initialize task state
        task_state = {
            "sources": sources,
            "current_reasoning": current_reasoning,
            "new_memory": new_memory,
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """Get tools."""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _extract_reasoning_step(
        self, output: ChatResponse, is_streaming: bool = False
    ) -> Tuple[str, List[BaseReasoningStep], bool]:
        """
        Extracts the reasoning step from the given output.

        This method parses the message content from the output,
        extracts the reasoning step, and determines whether the processing is
        complete. It also performs validation checks on the output and
        handles possible errors.
        """
        if output.message.content is None:
            raise ValueError("Got empty message.")
        message_content = output.message.content
        current_reasoning = []
        try:
            reasoning_step = self._output_parser.parse(message_content, is_streaming)
        except BaseException as exc:
            raise ValueError(f"Could not parse output: {message_content}") from exc
        if self._verbose:
            print_text(f"{reasoning_step.get_content()}\n", color="pink")
        current_reasoning.append(reasoning_step)

        if reasoning_step.is_done:
            return message_content, current_reasoning, True

        reasoning_step = cast(ActionReasoningStep, reasoning_step)
        if not isinstance(reasoning_step, ActionReasoningStep):
            raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

        return message_content, current_reasoning, False

    def _process_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict: Dict[str, AsyncBaseTool] = {
            tool.metadata.get_name(): tool for tool in tools
        }
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = tool.call(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(observation=str(tool_output))
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, False

    async def _aprocess_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict = {tool.metadata.name: tool for tool in tools}
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = await tool.acall(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(observation=str(tool_output))
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, False

    def _get_response(
        self,
        current_reasoning: List[BaseReasoningStep],
        sources: List[ToolOutput],
    ) -> AgentChatResponse:
        """Get response from reasoning steps."""
        if len(current_reasoning) == 0:
            raise ValueError("No reasoning steps were taken.")
        elif len(current_reasoning) == self._max_iterations:
            raise ValueError("Reached max iterations.")

        if isinstance(current_reasoning[-1], ResponseReasoningStep):
            response_step = cast(ResponseReasoningStep, current_reasoning[-1])
            response_str = response_step.response
        else:
            response_str = current_reasoning[-1].get_content()

        # TODO: add sources from reasoning steps
        return AgentChatResponse(response=response_str, sources=sources)

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    def _run_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        # This is either not None on the first step or if the user specifies
        # an intermediate step in the middle
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step,
                task.extra_state["new_memory"],
                task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )

        # send prompt
        chat_response = self._multi_modal_llm.chat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = self._process_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    async def _arun_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step,
                task.extra_state["new_memory"],
                task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )
        # send prompt
        chat_response = await self._multi_modal_llm.achat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = await self._aprocess_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    def _run_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        raise NotImplementedError("Stream step not implemented yet.")

    async def _arun_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """Run step."""
        raise NotImplementedError("Stream step not implemented yet.")

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        return self._run_step(step, task)

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        return await self._arun_step(step, task)

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # TODO: figure out if we need a different type for TaskStepOutput
        return self._run_step_stream(step, task)

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        return await self._arun_step_stream(step, task)

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(
            task.memory.get_all() + task.extra_state["new_memory"].get_all()
        )
        # reset new memory
        task.extra_state["new_memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(tools: Optional[Sequence[BaseTool]] = None, tool_retriever: Optional[ObjectRetriever[BaseTool]] = None, multi_modal_llm: Optional[MultiModalLLM] = None, max_iterations: int = 10, react_chat_formatter: Optional[ReActChatFormatter] = None, output_parser: Optional[ReActOutputParser] = None, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> MultimodalReActAgentWorker

Convenience constructor method from set of of BaseTools (Optional).

NOTE: kwargs should have been exhausted by this point. In other words the various upstream components such as BaseSynthesizer (response synthesizer) or BaseRetriever should have picked up off their respective kwargs in their constructions.

Returns:

Type Description
MultimodalReActAgentWorker

ReActAgent

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@classmethod
def from_tools(
    cls,
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    multi_modal_llm: Optional[MultiModalLLM] = None,
    max_iterations: int = 10,
    react_chat_formatter: Optional[ReActChatFormatter] = None,
    output_parser: Optional[ReActOutputParser] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "MultimodalReActAgentWorker":
    """Convenience constructor method from set of of BaseTools (Optional).

    NOTE: kwargs should have been exhausted by this point. In other words
    the various upstream components such as BaseSynthesizer (response synthesizer)
    or BaseRetriever should have picked up off their respective kwargs in their
    constructions.

    Returns:
        ReActAgent
    """
    if multi_modal_llm is None:
        try:
            from llama_index.multi_modal_llms.openai import (
                OpenAIMultiModal,
            )  # pants: no-infer-dep

            multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                model="gpt-4-vision-preview", max_new_tokens=1000
            )
        except ImportError:
            raise ImportError(
                "`llama-index-multi-modal-llms-openai` package cannot be found. "
                "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
            )
    return cls(
        tools=tools or [],
        tool_retriever=tool_retriever,
        multi_modal_llm=multi_modal_llm,
        max_iterations=max_iterations,
        react_chat_formatter=react_chat_formatter,
        output_parser=output_parser,
        callback_manager=callback_manager,
        verbose=verbose,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    current_reasoning: List[BaseReasoningStep] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # validation
    if "image_docs" not in task.extra_state:
        raise ValueError("Image docs not found in task extra state.")

    # initialize task state
    task_state = {
        "sources": sources,
        "current_reasoning": current_reasoning,
        "new_memory": new_memory,
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

Get tools.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
229
230
231
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """Get tools."""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
478
479
480
481
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    return self._run_step(step, task)

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
483
484
485
486
487
488
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    return await self._arun_step(step, task)

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
490
491
492
493
494
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    # TODO: figure out if we need a different type for TaskStepOutput
    return self._run_step_stream(step, task)

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
496
497
498
499
500
501
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    return await self._arun_step_stream(step, task)

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
503
504
505
506
507
508
509
510
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(
        task.memory.get_all() + task.extra_state["new_memory"].get_all()
    )
    # reset new memory
    task.extra_state["new_memory"].reset()

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/react_multimodal/step.py
512
513
514
515
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

QueryPipelineAgentWorker #

Bases: BaseModel, BaseAgentWorker

Query Pipeline agent worker.

Barebones agent worker that takes in a query pipeline.

Assumes that the first component in the query pipeline is an AgentInputComponent and last is AgentFnComponent.

Parameters:

Name Type Description Default
pipeline QueryPipeline

Query pipeline

required
Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class QueryPipelineAgentWorker(BaseModel, BaseAgentWorker):
    """Query Pipeline agent worker.

    Barebones agent worker that takes in a query pipeline.

    Assumes that the first component in the query pipeline is an
    `AgentInputComponent` and last is `AgentFnComponent`.

    Args:
        pipeline (QueryPipeline): Query pipeline

    """

    pipeline: QueryPipeline = Field(..., description="Query pipeline")
    callback_manager: CallbackManager = Field(..., exclude=True)

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        pipeline: QueryPipeline,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        """Initialize."""
        if callback_manager is not None:
            # set query pipeline callback
            pipeline.set_callback_manager(callback_manager)
        else:
            callback_manager = pipeline.callback_manager
        super().__init__(
            pipeline=pipeline,
            callback_manager=callback_manager,
        )
        # validate query pipeline
        self.agent_input_component
        self.agent_components

    @property
    def agent_input_component(self) -> AgentInputComponent:
        """Get agent input component."""
        root_key = self.pipeline.get_root_keys()[0]
        if not isinstance(self.pipeline.module_dict[root_key], AgentInputComponent):
            raise ValueError(
                "Query pipeline first component must be AgentInputComponent, got "
                f"{self.pipeline.module_dict[root_key]}"
            )

        return cast(AgentInputComponent, self.pipeline.module_dict[root_key])

    @property
    def agent_components(self) -> List[AgentFnComponent]:
        """Get agent output component."""
        return _get_agent_components(self.pipeline)

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=initial_state,
        )

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        # partial agent output component with task and step
        for agent_fn_component in self.agent_components:
            agent_fn_component.partial(task=task, state=step.step_state)

        agent_response, is_done = self.pipeline.run(state=step.step_state, task=task)
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        # partial agent output component with task and step
        for agent_fn_component in self.agent_components:
            agent_fn_component.partial(task=task, state=step.step_state)

        agent_response, is_done = await self.pipeline.arun(
            state=step.step_state, task=task
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("This agent does not support streaming.")

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager
        self.pipeline.set_callback_manager(callback_manager)

agent_input_component property #

agent_input_component: AgentInputComponent

Get agent input component.

agent_components property #

agent_components: List[AgentFnComponent]

Get agent output component.

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=initial_state,
    )

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
147
148
149
150
151
152
153
154
155
156
157
158
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    # partial agent output component with task and step
    for agent_fn_component in self.agent_components:
        agent_fn_component.partial(task=task, state=step.step_state)

    agent_response, is_done = self.pipeline.run(state=step.step_state, task=task)
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    # partial agent output component with task and step
    for agent_fn_component in self.agent_components:
        agent_fn_component.partial(task=task, state=step.step_state)

    agent_response, is_done = await self.pipeline.arun(
        state=step.step_state, task=task
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
176
177
178
179
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
181
182
183
184
185
186
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
188
189
190
191
192
193
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/agent/custom/pipeline_worker.py
195
196
197
198
199
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager
    self.pipeline.set_callback_manager(callback_manager)