Skip to content

Introspective

IntrospectiveAgentWorker #

Bases: BaseAgentWorker

Introspective Agent Worker.

This agent worker implements the Reflection AI agentic pattern. It does so by merely delegating the work to two other agents in a purely deterministic fashion.

The task this agent performs (again via delegation) is to generate a response to a query and perform reflection and correction on the response. This agent delegates the task to (optionally) first a main_agent_worker that generates the initial response to the query. This initial response is then passed to the reflective_agent_worker to perform the reflection and correction of the initial response. Optionally, the main_agent_worker can be skipped if none is provided. In this case, the users input query will be assumed to contain the original response that needs to go thru reflection and correction.

Attributes:

Name Type Description
reflective_agent_worker BaseAgentWorker

Reflective agent responsible for performing reflection and correction of the initial response.

main_agent_worker Optional[BaseAgentWorker]

Main agent responsible for generating an initial response to the user query. Defaults to None. If None, the user input is assumed as the initial response.

verbose bool

Whether execution should be verbose. Defaults to False.

callback_manager Optional[CallbackManager]

Callback manager. Defaults to None.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class IntrospectiveAgentWorker(BaseAgentWorker):
    """Introspective Agent Worker.

    This agent worker implements the Reflection AI agentic pattern. It does
    so by merely delegating the work to two other agents in a purely
    deterministic fashion.

    The task this agent performs (again via delegation) is to generate a response
    to a query and perform reflection and correction on the response. This
    agent delegates the task to (optionally) first a `main_agent_worker` that
    generates the initial response to the query. This initial response is then
    passed to the `reflective_agent_worker` to perform the reflection and
    correction of the initial response. Optionally, the `main_agent_worker`
    can be skipped if none is provided. In this case, the users input query
    will be assumed to contain the original response that needs to go thru
    reflection and correction.

    Attributes:
        reflective_agent_worker (BaseAgentWorker): Reflective agent responsible for
            performing reflection and correction of the initial response.
        main_agent_worker (Optional[BaseAgentWorker], optional): Main agent responsible
            for generating an initial response to the user query. Defaults to None.
            If None, the user input is assumed as the initial response.
        verbose (bool, optional): Whether execution should be verbose. Defaults to False.
        callback_manager (Optional[CallbackManager], optional): Callback manager. Defaults to None.
    """

    def __init__(
        self,
        reflective_agent_worker: BaseAgentWorker,
        main_agent_worker: Optional[BaseAgentWorker] = None,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        """Init params."""
        self._verbose = verbose
        self._main_agent_worker = main_agent_worker
        self._reflective_agent_worker = reflective_agent_worker
        self.callback_manager = callback_manager or CallbackManager([])

    @classmethod
    def from_defaults(
        cls,
        reflective_agent_worker: BaseAgentWorker,
        main_agent_worker: Optional[BaseAgentWorker] = None,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> "IntrospectiveAgentWorker":
        """Create an IntrospectiveAgentWorker from args.

        Similar to `from_defaults` in other classes, this method will
        infer defaults for a variety of parameters, including the LLM,
        if they are not specified.
        """
        return cls(
            main_agent_worker=main_agent_worker,
            reflective_agent_worker=reflective_agent_worker,
            verbose=verbose,
            callback_manager=callback_manager,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        # temporary memory for new messages
        main_memory = ChatMemoryBuffer.from_defaults()
        reflective_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            main_memory.put(message)

        # initialize task state
        task_state = {
            "main": {
                "memory": main_memory,
                "sources": [],
            },
            "reflection": {"memory": reflective_memory, "sources": []},
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
        )

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            +task.memory.get()
            + task.extra_state["main"]["memory"].get_all()
            + task.extra_state["reflection"]["memory"].get_all()
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        # run main agent
        if self._main_agent_worker is not None:
            main_agent_messages = task.extra_state["main"]["memory"].get()
            main_agent = self._main_agent_worker.as_agent(
                chat_history=main_agent_messages
            )
            main_agent_response = main_agent.chat(task.input)
            original_response = main_agent_response.response
            task.extra_state["main"]["sources"] = main_agent_response.sources
            task.extra_state["main"]["memory"] = main_agent.memory
        else:
            pass

        # run reflective agent
        reflective_agent_messages = task.extra_state["main"]["memory"].get()
        reflective_agent = self._reflective_agent_worker.as_agent(
            chat_history=reflective_agent_messages
        )
        # NOTE: atm you *need* to pass an input string to `chat`, even if the memory is already
        # preloaded. Input will be concatenated on top of chat history from memory
        # which will be used to generate the response.
        # TODO: make agent interface more flexible
        reflective_agent_response = reflective_agent.chat(original_response)
        task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
        task.extra_state["reflection"]["memory"] = reflective_agent.memory

        agent_response = AgentChatResponse(
            response=str(reflective_agent_response.response),
            sources=task.extra_state["main"]["sources"]
            + task.extra_state["reflection"]["sources"],
        )

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=True,
            next_steps=[],
        )

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        # run main agent if one is supplied otherwise assume user input
        # is the original response to be reflected on and subsequently corrected
        if self._main_agent_worker is not None:
            main_agent_messages = task.extra_state["main"]["memory"].get()
            main_agent = self._main_agent_worker.as_agent(
                chat_history=main_agent_messages, verbose=self._verbose
            )
            main_agent_response = await main_agent.achat(task.input)
            original_response = main_agent_response.response
            task.extra_state["main"]["sources"] = main_agent_response.sources
            task.extra_state["main"]["memory"] = main_agent.memory
        else:
            add_user_step_to_memory(
                step, task.extra_state["main"]["memory"], verbose=self._verbose
            )
            original_response = step.input
            # fictitious agent's initial response (to get reflection/correction cycle started)
            task.extra_state["main"]["memory"].put(
                ChatMessage(content=original_response, role="assistant")
            )

        # run reflective agent
        reflective_agent_messages = task.extra_state["main"]["memory"].get()
        reflective_agent = self._reflective_agent_worker.as_agent(
            chat_history=reflective_agent_messages, verbose=self._verbose
        )
        reflective_agent_response = await reflective_agent.achat(original_response)
        task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
        task.extra_state["reflection"]["memory"] = reflective_agent.memory

        agent_response = AgentChatResponse(
            response=str(reflective_agent_response.response),
            sources=task.extra_state["main"]["sources"]
            + task.extra_state["reflection"]["sources"],
        )

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=True,
            next_steps=[],
        )

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("Stream not supported for introspective agent")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("Stream not supported for introspective agent")

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        main_memory = task.extra_state["main"][
            "memory"
        ].get_all()  # contains initial response as final message
        final_corrected_message = task.extra_state["reflection"]["memory"].get_all()[-1]
        # swap main workers response with the reflected/corrected one
        finalized_task_memory = main_memory[:-1] + [final_corrected_message]
        task.memory.set(finalized_task_memory)

from_defaults classmethod #

from_defaults(reflective_agent_worker: BaseAgentWorker, main_agent_worker: Optional[BaseAgentWorker] = None, verbose: bool = False, callback_manager: Optional[CallbackManager] = None, **kwargs: Any) -> IntrospectiveAgentWorker

Create an IntrospectiveAgentWorker from args.

Similar to from_defaults in other classes, this method will infer defaults for a variety of parameters, including the LLM, if they are not specified.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@classmethod
def from_defaults(
    cls,
    reflective_agent_worker: BaseAgentWorker,
    main_agent_worker: Optional[BaseAgentWorker] = None,
    verbose: bool = False,
    callback_manager: Optional[CallbackManager] = None,
    **kwargs: Any,
) -> "IntrospectiveAgentWorker":
    """Create an IntrospectiveAgentWorker from args.

    Similar to `from_defaults` in other classes, this method will
    infer defaults for a variety of parameters, including the LLM,
    if they are not specified.
    """
    return cls(
        main_agent_worker=main_agent_worker,
        reflective_agent_worker=reflective_agent_worker,
        verbose=verbose,
        callback_manager=callback_manager,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    # temporary memory for new messages
    main_memory = ChatMemoryBuffer.from_defaults()
    reflective_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        main_memory.put(message)

    # initialize task state
    task_state = {
        "main": {
            "memory": main_memory,
            "sources": [],
        },
        "reflection": {"memory": reflective_memory, "sources": []},
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
    )

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    # run main agent
    if self._main_agent_worker is not None:
        main_agent_messages = task.extra_state["main"]["memory"].get()
        main_agent = self._main_agent_worker.as_agent(
            chat_history=main_agent_messages
        )
        main_agent_response = main_agent.chat(task.input)
        original_response = main_agent_response.response
        task.extra_state["main"]["sources"] = main_agent_response.sources
        task.extra_state["main"]["memory"] = main_agent.memory
    else:
        pass

    # run reflective agent
    reflective_agent_messages = task.extra_state["main"]["memory"].get()
    reflective_agent = self._reflective_agent_worker.as_agent(
        chat_history=reflective_agent_messages
    )
    # NOTE: atm you *need* to pass an input string to `chat`, even if the memory is already
    # preloaded. Input will be concatenated on top of chat history from memory
    # which will be used to generate the response.
    # TODO: make agent interface more flexible
    reflective_agent_response = reflective_agent.chat(original_response)
    task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
    task.extra_state["reflection"]["memory"] = reflective_agent.memory

    agent_response = AgentChatResponse(
        response=str(reflective_agent_response.response),
        sources=task.extra_state["main"]["sources"]
        + task.extra_state["reflection"]["sources"],
    )

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=True,
        next_steps=[],
    )

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    # run main agent if one is supplied otherwise assume user input
    # is the original response to be reflected on and subsequently corrected
    if self._main_agent_worker is not None:
        main_agent_messages = task.extra_state["main"]["memory"].get()
        main_agent = self._main_agent_worker.as_agent(
            chat_history=main_agent_messages, verbose=self._verbose
        )
        main_agent_response = await main_agent.achat(task.input)
        original_response = main_agent_response.response
        task.extra_state["main"]["sources"] = main_agent_response.sources
        task.extra_state["main"]["memory"] = main_agent.memory
    else:
        add_user_step_to_memory(
            step, task.extra_state["main"]["memory"], verbose=self._verbose
        )
        original_response = step.input
        # fictitious agent's initial response (to get reflection/correction cycle started)
        task.extra_state["main"]["memory"].put(
            ChatMessage(content=original_response, role="assistant")
        )

    # run reflective agent
    reflective_agent_messages = task.extra_state["main"]["memory"].get()
    reflective_agent = self._reflective_agent_worker.as_agent(
        chat_history=reflective_agent_messages, verbose=self._verbose
    )
    reflective_agent_response = await reflective_agent.achat(original_response)
    task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
    task.extra_state["reflection"]["memory"] = reflective_agent.memory

    agent_response = AgentChatResponse(
        response=str(reflective_agent_response.response),
        sources=task.extra_state["main"]["sources"]
        + task.extra_state["reflection"]["sources"],
    )

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=True,
        next_steps=[],
    )

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
214
215
216
217
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("Stream not supported for introspective agent")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
219
220
221
222
223
224
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("Stream not supported for introspective agent")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/step.py
226
227
228
229
230
231
232
233
234
235
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    main_memory = task.extra_state["main"][
        "memory"
    ].get_all()  # contains initial response as final message
    final_corrected_message = task.extra_state["reflection"]["memory"].get_all()[-1]
    # swap main workers response with the reflected/corrected one
    finalized_task_memory = main_memory[:-1] + [final_corrected_message]
    task.memory.set(finalized_task_memory)

SelfReflectionAgentWorker #

Bases: BaseModel, BaseAgentWorker

Self Reflection Agent Worker.

This agent performs a reflection without any tools on a given response and subsequently performs correction. It should be noted that this reflection implementation has been inspired by two works:

  1. Reflexion: Language Agents with Verbal Reinforcement Learning, by Shinn et al. (2023) (https://arxiv.org/pdf/2303.11366.pdf)
  2. CRITIC: Large Language Models Can Self-Correct with Tool-Interactive Critiquing, by Gou et al. (2024) (https://arxiv.org/pdf/2305.11738.pdf)

This agent performs cycles of reflection and correction on an initial response until a satisfactory correction has been generated or a max number of cycles has been reached. To perform reflection, this agent utilizes a user-specified LLM along with a PydanticProgram (thru structured_predict) to generate a structured output that contains an LLM generated reflection of the current response. After reflection, the same user-specified LLM is used again but this time with another PydanticProgram to generate a structured output that contains an LLM generated corrected version of the current response against the priorly generated reflection.

Attr

max_iterations (int, optional): The max number of reflection & correction. Defaults to DEFAULT_MAX_ITERATIONS. callback_manager (Optional[CallbackManager], optional): Callback manager. Defaults to None. llm (Optional[LLM], optional): The LLM used to perform reflection and correction. Must be an OpenAI LLM at this time. Defaults to None. verbose (bool, optional): Whether execution should be verbose. Defaults to False.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
class SelfReflectionAgentWorker(BaseModel, BaseAgentWorker):
    """Self Reflection Agent Worker.

    This agent performs a reflection without any tools on a given response
    and subsequently performs correction. It should be noted that this reflection
    implementation has been inspired by two works:

    1. Reflexion: Language Agents with Verbal Reinforcement Learning, by Shinn et al. (2023)
        (https://arxiv.org/pdf/2303.11366.pdf)
    2. CRITIC: Large Language Models Can Self-Correct with Tool-Interactive Critiquing, by Gou et al. (2024)
       (https://arxiv.org/pdf/2305.11738.pdf)

    This agent performs cycles of reflection and correction on an initial response
    until a satisfactory correction has been generated or a max number of cycles
    has been reached. To perform reflection, this agent utilizes a user-specified
    LLM along with a PydanticProgram (thru structured_predict) to generate a structured
    output that contains an LLM generated reflection of the current response. After reflection,
    the same user-specified LLM is used again but this time with another PydanticProgram
    to generate a structured output that contains an LLM generated corrected
    version of the current response against the priorly generated reflection.

    Attr:
        max_iterations (int, optional): The max number of reflection & correction.
            Defaults to DEFAULT_MAX_ITERATIONS.
        callback_manager (Optional[CallbackManager], optional): Callback manager.
            Defaults to None.
        llm (Optional[LLM], optional): The LLM used to perform reflection and correction.
            Must be an OpenAI LLM at this time. Defaults to None.
        verbose (bool, optional): Whether execution should be verbose. Defaults to False.
    """

    callback_manager: CallbackManager = Field(default=CallbackManager([]))
    max_iterations: int = Field(default=DEFAULT_MAX_ITERATIONS)
    _llm: LLM = PrivateAttr()
    _verbose: bool = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        callback_manager: Optional[CallbackManager] = None,
        llm: Optional[LLM] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        """__init__."""
        super().__init__(
            callback_manager=callback_manager or CallbackManager([]),
            max_iterations=max_iterations,
            **kwargs,
        )
        self._llm = llm
        self._verbose = verbose

    @classmethod
    def from_defaults(
        cls,
        llm: Optional[LLM] = None,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "SelfReflectionAgentWorker":
        """Convenience constructor."""
        if llm is None:
            try:
                from llama_index.llms.openai import OpenAI
            except ImportError:
                raise ImportError(
                    "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
                )
            llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

        return cls(
            llm=llm,
            max_iterations=max_iterations,
            callback_manager=callback_manager,
            verbose=verbose,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            new_memory.put(message)
        # inject new input into memory
        new_memory.put(ChatMessage(content=task.input, role=MessageRole.USER))

        # initialize task state
        task_state = {
            "new_memory": new_memory,
            "sources": [],
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"count": 0},
        )

    def _remove_correction_str_prefix(self, correct_msg: str) -> str:
        """Helper function to format correction message for final response."""
        return correct_msg.replace(CORRECT_RESPONSE_PREFIX, "")

    @dispatcher.span
    def _reflect(
        self, chat_history: List[ChatMessage]
    ) -> Tuple[Reflection, ChatMessage]:
        """Reflect on the trajectory."""
        reflection = self._llm.structured_predict(
            Reflection,
            PromptTemplate(REFLECTION_PROMPT_TEMPLATE),
            chat_history=messages_to_prompt(chat_history),
        )

        if self._verbose:
            print(f"> Reflection: {reflection.model_dump()}")

        # end state: return user message
        reflection_output_str = (
            f"Is Done: {reflection.is_done}\nCritique: {reflection.feedback}"
        )
        critique = REFLECTION_RESPONSE_TEMPLATE.format(
            reflection_output=reflection_output_str
        )

        return reflection, ChatMessage.from_str(critique, role="user")

    @dispatcher.span
    def _correct(self, input_str: str, critique: str) -> ChatMessage:
        correction = self._llm.structured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            feedback=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        state = step.step_state
        state["count"] += 1

        # new_memory should at the very least contain the user input
        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # reflect phase
        reflection, reflection_msg = self._reflect(chat_history=messages)
        is_done = reflection.is_done

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=reflection_msg.content
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=prev_correct_str_without_prefix,
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = self._correct(
                input_str=prev_correct_str_without_prefix,
                critique=reflection_msg.content,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            if self.max_iterations == state["count"]:
                # this will be the last iteration
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT,
                        content=correct_str_without_prefix,
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(response=str(correct_msg))
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Async methods
    @dispatcher.span
    async def _areflect(
        self, chat_history: List[ChatMessage]
    ) -> Tuple[Reflection, ChatMessage]:
        """Reflect on the trajectory."""
        reflection = await self._llm.astructured_predict(
            Reflection,
            PromptTemplate(REFLECTION_PROMPT_TEMPLATE),
            chat_history=messages_to_prompt(chat_history),
        )

        if self._verbose:
            print(f"> Reflection: {reflection.model_dump()}")

        # end state: return user message
        reflection_output_str = (
            f"Is Done: {reflection.is_done}\nCritique: {reflection.feedback}"
        )
        critique = REFLECTION_RESPONSE_TEMPLATE.format(
            reflection_output=reflection_output_str
        )

        return reflection, ChatMessage.from_str(critique, role="user")

    @dispatcher.span
    async def _acorrect(self, input_str: str, critique: str) -> ChatMessage:
        correction = await self._llm.astructured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            feedback=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # reflect
        reflection, reflection_msg = await self._areflect(chat_history=messages)
        is_done = reflection.is_done

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=reflection_msg.content
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=prev_correct_str_without_prefix,
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = await self._acorrect(
                input_str=prev_correct_str_without_prefix,
                critique=reflection_msg.content,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            if self.max_iterations == state["count"]:
                # this will be the last iteration
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT,
                        content=correct_str_without_prefix,
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(response=str(correct_msg))
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Stream methods
    @dispatcher.span
    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError("Stream not supported for self reflection agent")

    @dispatcher.span
    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError("Stream not supported for self reflection agent")

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            self.prefix_messages
            + task.memory.get()
            + task.extra_state["new_memory"].get_all()
        )

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.extra_state["new_memory"].get_all())
        # reset new memory
        task.extra_state["new_memory"].reset()

from_defaults classmethod #

from_defaults(llm: Optional[LLM] = None, max_iterations: int = DEFAULT_MAX_ITERATIONS, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> SelfReflectionAgentWorker

Convenience constructor.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@classmethod
def from_defaults(
    cls,
    llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "SelfReflectionAgentWorker":
    """Convenience constructor."""
    if llm is None:
        try:
            from llama_index.llms.openai import OpenAI
        except ImportError:
            raise ImportError(
                "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
            )
        llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

    return cls(
        llm=llm,
        max_iterations=max_iterations,
        callback_manager=callback_manager,
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        new_memory.put(message)
    # inject new input into memory
    new_memory.put(ChatMessage(content=task.input, role=MessageRole.USER))

    # initialize task state
    task_state = {
        "new_memory": new_memory,
        "sources": [],
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"count": 0},
    )

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@dispatcher.span
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    state = step.step_state
    state["count"] += 1

    # new_memory should at the very least contain the user input
    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # reflect phase
    reflection, reflection_msg = self._reflect(chat_history=messages)
    is_done = reflection.is_done

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=reflection_msg.content
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT,
                content=prev_correct_str_without_prefix,
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = self._correct(
            input_str=prev_correct_str_without_prefix,
            critique=reflection_msg.content,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        if self.max_iterations == state["count"]:
            # this will be the last iteration
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=correct_str_without_prefix,
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(response=str(correct_msg))
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@dispatcher.span
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # reflect
    reflection, reflection_msg = await self._areflect(chat_history=messages)
    is_done = reflection.is_done

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=reflection_msg.content
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT,
                content=prev_correct_str_without_prefix,
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = await self._acorrect(
            input_str=prev_correct_str_without_prefix,
            critique=reflection_msg.content,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        if self.max_iterations == state["count"]:
            # this will be the last iteration
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=correct_str_without_prefix,
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(response=str(correct_msg))
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
451
452
453
454
455
@dispatcher.span
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError("Stream not supported for self reflection agent")

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
457
458
459
460
461
462
463
@dispatcher.span
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError("Stream not supported for self reflection agent")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/self_reflection.py
472
473
474
475
476
477
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.extra_state["new_memory"].get_all())
    # reset new memory
    task.extra_state["new_memory"].reset()

ToolInteractiveReflectionAgentWorker #

Bases: BaseModel, BaseAgentWorker

Tool-Interactive Reflection Agent Worker.

This agent worker implements the CRITIC reflection framework introduced by Gou, Zhibin, et al. (2024) ICLR. (source: https://arxiv.org/pdf/2305.11738)

CRITIC stands for Correcting with tool-interactive critiquing. It works by performing a reflection on a response to a task/query using external tools (e.g., fact checking using a Google search tool) and subsequently using the critique to generate a corrected response. It cycles thru tool-interactive reflection and correction until a specific stopping criteria has been met or a max number of iterations has been reached.

This agent delegates the critique subtask to a user-supplied critique_agent_worker that is of FunctionCallingAgentWorker type i.e. it uses tools to perform tasks. For correction, it uses a user-specified correction_llm with a PydanticProgram (determined dynamically with llm.structured_predict) in order to produce a structured output, namely Correction that contains the correction generated by the correction_llm.

Attributes:

Name Type Description
critique_agent_worker FunctionCallingAgentWorker

Critique agent responsible for performing the critique reflection.

critique_template str

The template containing instructions for how the Critique agent should perform the reflection.

max_iterations int

The max number of reflection & correction cycles permitted. Defaults to DEFAULT_MAX_ITERATIONS = 5.

stopping_callable Optional[StoppingCallable]

An optional stopping condition that operates over the critique reflection string and returns a boolean to determine if the latest correction is sufficient. Defaults to None.

correction_llm Optional[LLM]

The LLM used for producing corrected responses against a critique or reflection. Defaults to None.

callback_manager Optional[CallbackManager]

Callback manager. Defaults to None.

verbose bool

Whether execution should be verbose. Defaults to False.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class ToolInteractiveReflectionAgentWorker(BaseModel, BaseAgentWorker):
    """Tool-Interactive Reflection Agent Worker.

    This agent worker implements the CRITIC reflection framework introduced
    by Gou, Zhibin, et al. (2024) ICLR. (source: https://arxiv.org/pdf/2305.11738)

    CRITIC stands for `Correcting with tool-interactive critiquing`. It works
    by performing a reflection on a response to a task/query using external tools
    (e.g., fact checking using a Google search tool) and subsequently using
    the critique to generate a corrected response. It cycles thru tool-interactive
    reflection and correction until a specific stopping criteria has been met
    or a max number of iterations has been reached.

    This agent delegates the critique subtask to a user-supplied `critique_agent_worker`
    that is of `FunctionCallingAgentWorker` type i.e. it uses tools to perform
    tasks. For correction, it uses a user-specified `correction_llm` with a
    PydanticProgram (determined dynamically with llm.structured_predict)
    in order to produce a structured output, namely `Correction` that
    contains the correction generated by the `correction_llm`.

    Attributes:
        critique_agent_worker (FunctionCallingAgentWorker): Critique agent responsible
            for performing the critique reflection.
        critique_template (str): The template containing instructions for how the
            Critique agent should perform the reflection.
        max_iterations (int, optional): The max number of reflection & correction
            cycles permitted. Defaults to DEFAULT_MAX_ITERATIONS = 5.
        stopping_callable (Optional[StoppingCallable], optional): An optional stopping
            condition that operates over the critique reflection string and returns
            a boolean to determine if the latest correction is sufficient. Defaults to None.
        correction_llm (Optional[LLM], optional): The LLM used for producing corrected
            responses against a critique or reflection. Defaults to None.
        callback_manager (Optional[CallbackManager], optional): Callback manager. Defaults to None.
        verbose (bool, optional): Whether execution should be verbose. Defaults to False.
    """

    callback_manager: CallbackManager = Field(default=CallbackManager([]))
    max_iterations: int = Field(default=DEFAULT_MAX_ITERATIONS)
    stopping_callable: Optional[StoppingCallable] = Field(
        default=None,
        description="Optional function that operates on critique string to see if no more corrections are needed.",
    )
    _critique_agent_worker: FunctionCallingAgentWorker = PrivateAttr()
    _critique_template: str = PrivateAttr()
    _correction_llm: LLM = PrivateAttr()
    _verbose: bool = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        critique_agent_worker: FunctionCallingAgentWorker,
        critique_template: str,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        stopping_callable: Optional[StoppingCallable] = None,
        correction_llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        """__init__."""
        super().__init__(
            callback_manager=callback_manager,
            max_iterations=max_iterations,
            stopping_callable=stopping_callable,
            **kwargs,
        )
        self._critique_agent_worker = critique_agent_worker
        self._critique_template = critique_template
        self._verbose = verbose
        self._correction_llm = correction_llm

    @classmethod
    def from_defaults(
        cls,
        critique_agent_worker: FunctionCallingAgentWorker,
        critique_template: str,
        correction_llm: Optional[LLM] = None,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        stopping_callable: Optional[StoppingCallable] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "ToolInteractiveReflectionAgentWorker":
        """Convenience constructor method from set of BaseTools (Optional)."""
        if correction_llm is None:
            try:
                from llama_index.llms.openai import OpenAI
            except ImportError:
                raise ImportError(
                    "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
                )
            correction_llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

        return cls(
            critique_agent_worker=critique_agent_worker,
            critique_template=critique_template,
            correction_llm=correction_llm,
            max_iterations=max_iterations,
            stopping_callable=stopping_callable,
            callback_manager=callback_manager or CallbackManager([]),
            verbose=verbose,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """Initialize step from task."""
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            new_memory.put(message)
        # inject new input into memory
        new_memory.put(ChatMessage(content=task.input, role=MessageRole.USER))

        # initialize task state
        task_state = {
            "new_memory": new_memory,
            "sources": [],
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"count": 0},
        )

    def _remove_correction_str_prefix(self, correct_msg: str) -> str:
        """Helper function to format correction message for final response."""
        return correct_msg.replace(CORRECT_RESPONSE_PREFIX, "")

    @dispatcher.span
    def _critique(self, input_str: str) -> AgentChatResponse:
        agent = self._critique_agent_worker.as_agent(verbose=self._verbose)
        critique = agent.chat(self._critique_template.format(input_str=input_str))
        if self._verbose:
            print(f"Critique: {critique.response}", flush=True)
        return critique

    @dispatcher.span
    def _correct(self, input_str: str, critique: str) -> ChatMessage:
        correction = self._correction_llm.structured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            critique=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step."""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # critique phase
        critique_response = self._critique(input_str=prev_correct_str_without_prefix)
        task.extra_state["sources"].extend(critique_response.sources)

        is_done = False
        if self.stopping_callable:
            is_done = self.stopping_callable(critique_str=critique_response.response)

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=critique_response.response
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = self._correct(
                input_str=prev_correct_str_without_prefix,
                critique=critique_response.response,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            # reached max iterations, no further reflection/correction cycles
            if self.max_iterations == state["count"]:
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(
                    response=str(correct_msg), sources=critique_response.sources
                )
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Async Methods
    @dispatcher.span
    async def _acritique(self, input_str: str) -> AgentChatResponse:
        agent = self._critique_agent_worker.as_agent(verbose=self._verbose)
        critique = await agent.achat(
            self._critique_template.format(input_str=input_str)
        )
        if self._verbose:
            print(f"Critique: {critique.response}", flush=True)
        return critique

    @dispatcher.span
    async def _acorrect(self, input_str: str, critique: str) -> ChatMessage:
        correction = await self._correction_llm.astructured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            critique=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async)."""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # critique phase
        critique_response = await self._acritique(
            input_str=prev_correct_str_without_prefix
        )
        task.extra_state["sources"].extend(critique_response.sources)

        is_done = False
        if self.stopping_callable:
            is_done = self.stopping_callable(critique_str=critique_response.response)

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=critique_response.response
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = await self._acorrect(
                input_str=prev_correct_str_without_prefix,
                critique=critique_response.response,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            # reached max iterations, no further reflection/correction cycles
            if self.max_iterations == state["count"]:
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(
                    response=str(correct_msg), sources=critique_response.sources
                )
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Steam methods
    @dispatcher.span
    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        raise NotImplementedError(
            "Stream not supported for tool-interactive reflection agent"
        )

    @dispatcher.span
    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        raise NotImplementedError(
            "Stream not supported for tool-interactive reflection agent"
        )

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            self.prefix_messages
            + task.memory.get()
            + task.extra_state["new_memory"].get_all()
        )

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """Finalize task, after all the steps are completed."""
        # add new messages to memory
        task.memory.set(task.extra_state["new_memory"].get_all())
        # reset new memory
        task.extra_state["new_memory"].reset()

from_defaults classmethod #

from_defaults(critique_agent_worker: FunctionCallingAgentWorker, critique_template: str, correction_llm: Optional[LLM] = None, max_iterations: int = DEFAULT_MAX_ITERATIONS, stopping_callable: Optional[StoppingCallable] = None, callback_manager: Optional[CallbackManager] = None, verbose: bool = False, **kwargs: Any) -> ToolInteractiveReflectionAgentWorker

Convenience constructor method from set of BaseTools (Optional).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@classmethod
def from_defaults(
    cls,
    critique_agent_worker: FunctionCallingAgentWorker,
    critique_template: str,
    correction_llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    stopping_callable: Optional[StoppingCallable] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "ToolInteractiveReflectionAgentWorker":
    """Convenience constructor method from set of BaseTools (Optional)."""
    if correction_llm is None:
        try:
            from llama_index.llms.openai import OpenAI
        except ImportError:
            raise ImportError(
                "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
            )
        correction_llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

    return cls(
        critique_agent_worker=critique_agent_worker,
        critique_template=critique_template,
        correction_llm=correction_llm,
        max_iterations=max_iterations,
        stopping_callable=stopping_callable,
        callback_manager=callback_manager or CallbackManager([]),
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

Initialize step from task.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """Initialize step from task."""
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        new_memory.put(message)
    # inject new input into memory
    new_memory.put(ChatMessage(content=task.input, role=MessageRole.USER))

    # initialize task state
    task_state = {
        "new_memory": new_memory,
        "sources": [],
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"count": 0},
    )

run_step #

run_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
@dispatcher.span
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step."""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # critique phase
    critique_response = self._critique(input_str=prev_correct_str_without_prefix)
    task.extra_state["sources"].extend(critique_response.sources)

    is_done = False
    if self.stopping_callable:
        is_done = self.stopping_callable(critique_str=critique_response.response)

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=critique_response.response
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = self._correct(
            input_str=prev_correct_str_without_prefix,
            critique=critique_response.response,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        # reached max iterations, no further reflection/correction cycles
        if self.max_iterations == state["count"]:
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(
                response=str(correct_msg), sources=critique_response.sources
            )
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

arun_step async #

arun_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
@dispatcher.span
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async)."""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # critique phase
    critique_response = await self._acritique(
        input_str=prev_correct_str_without_prefix
    )
    task.extra_state["sources"].extend(critique_response.sources)

    is_done = False
    if self.stopping_callable:
        is_done = self.stopping_callable(critique_str=critique_response.response)

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=critique_response.response
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = await self._acorrect(
            input_str=prev_correct_str_without_prefix,
            critique=critique_response.response,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        # reached max iterations, no further reflection/correction cycles
        if self.max_iterations == state["count"]:
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(
                response=str(correct_msg), sources=critique_response.sources
            )
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

stream_step #

stream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
423
424
425
426
427
428
429
@dispatcher.span
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """Run step (stream)."""
    raise NotImplementedError(
        "Stream not supported for tool-interactive reflection agent"
    )

astream_step async #

astream_step(step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput

Run step (async stream).

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
431
432
433
434
435
436
437
438
439
@dispatcher.span
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """Run step (async stream)."""
    raise NotImplementedError(
        "Stream not supported for tool-interactive reflection agent"
    )

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

Finalize task, after all the steps are completed.

Source code in llama-index-integrations/agent/llama-index-agent-introspective/llama_index/agent/introspective/reflective/tool_interactive_reflection.py
448
449
450
451
452
453
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """Finalize task, after all the steps are completed."""
    # add new messages to memory
    task.memory.set(task.extra_state["new_memory"].get_all())
    # reset new memory
    task.extra_state["new_memory"].reset()