Skip to content

Nested workflows#

Another way to extend workflows is to nest additional workflows. It's possible to create explicit slots in existing flows where you can supply an entire additional workflow. For example, let's say we had a query that used an LLM to reflect on the quality of that query. The author might expect that you would want to modify the reflection step, and leave a slot for you to do that.

Here's our base workflow:

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
from llama_index.utils.workflow import draw_all_possible_flows


class Step2Event(Event):
    query: str


class MainWorkflow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent, reflection_workflow: Workflow
    ) -> Step2Event:
        print("Need to run reflection")
        res = await reflection_workflow.run(query=ev.query)

        return Step2Event(query=res)

    @step
    async def step_two(self, ctx: Context, ev: Step2Event) -> StopEvent:
        print("Query is ", ev.query)
        # do something with the query here
        return StopEvent(result=ev.query)

This workflow by itself will not run; it needs a valid workflow for the reflection step. Let's create one:

class ReflectionFlow(Workflow):
    @step
    async def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        print("Doing custom reflection")
        return StopEvent(result="Improved query")

Now we can run the main workflow by supplying this custom reflection nested flow using the add_workflows method, to which we pass an instance of the ReflectionFlow class:

w = MainWorkflow(timeout=10, verbose=False)
w.add_workflows(reflection_workflow=ReflectionFlow())
result = await w.run(query="Initial query")
print(result)

Note that because the nested flow is a totally different workflow rather than a step, draw_all_possible_flows will only draw the flow of MainWorkflow.

Default workflows#

If you're creating a workflow with multiple slots for nested workflows, you might want to provide default workflows for each slot. You can do this by setting the default value of the slot to an instance of the workflow class. Here's an example.

First, let's create a default sub-workflow to use:

class DefaultSubflow(Workflow):
    @step()
    async def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        print("Doing basic reflection")
        return StopEvent(result="Improved query")

Now we can modify the MainWorkflow to include a default sub-workflow:

class MainWorkflow(Workflow):
    @step()
    async def start(
        self,
        ctx: Context,
        ev: StartEvent,
        reflection_workflow: Workflow = DefaultSubflow(),
    ) -> Step2Event:
        print("Need to run reflection")
        res = await reflection_workflow.run(query=ev.query)

        return Step2Event(query=res)

Now, if you run the workflow without providing a custom reflection workflow, it will use the default one. This can be very useful for providing a good "out of the box" experience for users who may not want to customize everything.

Finally, let's take a look at observability and debugging in workflows.