Skip to content

Decorators

step #

step(*args: Any, workflow: Optional[Type[Workflow]] = None, pass_context: bool = False, num_workers: int = 4, retry_policy: Optional[RetryPolicy] = None) -> Callable

Decorator used to mark methods and functions as workflow steps.

Decorators are evaluated at import time, but we need to wait for starting the communication channels until runtime. For this reason, we temporarily store the list of events that will be consumed by this step in the function object itself.

Parameters:

Name Type Description Default
workflow Optional[Type[Workflow]]

Workflow class to which the decorated step will be added. Only needed when using the decorator on free functions instead of class methods.

None
num_workers int

The number of workers that will process events for the decorated step. The default value works most of the times.

4
retry_policy Optional[RetryPolicy]

The policy used to retry a step that encountered an error while running.

None
Source code in llama-index-core/llama_index/core/workflow/decorators.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def step(
    *args: Any,
    workflow: Optional[Type["Workflow"]] = None,
    pass_context: bool = False,
    num_workers: int = 4,
    retry_policy: Optional[RetryPolicy] = None,
) -> Callable:
    """Decorator used to mark methods and functions as workflow steps.

    Decorators are evaluated at import time, but we need to wait for
    starting the communication channels until runtime. For this reason,
    we temporarily store the list of events that will be consumed by this
    step in the function object itself.

    Args:
        workflow: Workflow class to which the decorated step will be added. Only needed when using the
            decorator on free functions instead of class methods.
        num_workers: The number of workers that will process events for the decorated step. The default
            value works most of the times.
        retry_policy: The policy used to retry a step that encountered an error while running.
    """

    def decorator(func: Callable) -> Callable:
        if not isinstance(num_workers, int) or num_workers <= 0:
            raise WorkflowValidationError(
                "num_workers must be an integer greater than 0"
            )

        # This will raise providing a message with the specific validation failure
        spec = inspect_signature(func)
        validate_step_signature(spec)
        event_name, accepted_events = next(iter(spec.accepted_events.items()))

        # store the configuration in the function object
        func.__step_config = StepConfig(  # type: ignore[attr-defined]
            accepted_events=accepted_events,
            event_name=event_name,
            return_types=spec.return_types,
            context_parameter=spec.context_parameter,
            num_workers=num_workers,
            requested_services=spec.requested_services or [],
            retry_policy=retry_policy,
        )

        # If this is a free function, call add_step() explicitly.
        if is_free_function(func.__qualname__):
            if workflow is None:
                msg = f"To decorate {func.__name__} please pass a workflow class to the @step decorator."
                raise WorkflowValidationError(msg)
            workflow.add_step(func)

        return func

    if len(args):
        # The decorator was used without parentheses, like `@step`
        func = args[0]
        decorator(func)
        return func
    return decorator