Human in the loop#
Tools can also be defined that get a human in the loop. This is useful for tasks that require human input, such as confirming a tool call or providing feedback.
As we'll see in the our Workflows tutorial, the way Workflows work under the hood of AgentWorkflow is by running steps which both emit and receive events. Here's a diagram of the steps (in blue) that makes up an AgentWorkflow and the events (in green) that pass data between them. You'll recognize these events, they're the same ones we were handling in the output stream earlier.
To get a human in the loop, we'll get our tool to emit an event that isn't received by any other step in the workflow. We'll then tell our tool to wait until it receives a specific "reply" event.
We have built-in InputRequiredEvent
and HumanResponseEvent
events to use for this purpose. If you want to capture different forms of human input, you can subclass these events to match your own preferences. Let's import them:
from llama_index.core.workflow import (
InputRequiredEvent,
HumanResponseEvent,
)
Next we'll create a tool that performs a hypothetical dangerous task. There are a couple of new things happening here:
wait_for_event
is used to wait for a HumanResponseEvent.- The
waiter_event
is the event that is written to the event stream, to let the caller know that we're waiting for a response. waiter_id
is a unique identifier for this specific wait call. It helps ensure that we only send onewaiter_event
for eachwaiter_id
.- The
requirements
argument is used to specify that we want to wait for a HumanResponseEvent with a specificuser_name
.
async def dangerous_task(ctx: Context) -> str:
"""A dangerous task that requires human confirmation."""
# emit a waiter event (InputRequiredEvent here)
# and wait until we see a HumanResponseEvent
question = "Are you sure you want to proceed? "
response = await ctx.wait_for_event(
HumanResponseEvent,
waiter_id=question,
waiter_event=InputRequiredEvent(
prefix=question,
user_name="Laurie",
),
requirements={"user_name": "Laurie"},
)
# act on the input from the event
if response.response.strip().lower() == "yes":
return "Dangerous task completed successfully."
else:
return "Dangerous task aborted."
We create our agent as usual, passing it the tool we just defined:
workflow = FunctionAgent(
tools=[dangerous_task],
llm=llm,
system_prompt="You are a helpful assistant that can perform dangerous tasks.",
)
Now we can run the workflow, handling the InputRequiredEvent
just like any other streaming event, and responding with a HumanResponseEvent
passed in using the send_event
method:
handler = workflow.run(user_msg="I want to proceed with the dangerous task.")
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
# capture keyboard input
response = input(event.prefix)
# send our response back
handler.ctx.send_event(
HumanResponseEvent(
response=response,
user_name=event.user_name,
)
)
response = await handler
print(str(response))
As usual, you can see the full code of this example.
You can do anything you want to capture the input; you could use a GUI, or audio input, or even get another, separate agent involved. If your input is going to take a while, or happen in another process, you might want to serialize the context and save it to a database or file so that you can resume the workflow later.
Speaking of getting other agents involved brings us to our next section, multi-agent systems.