Skip to content

Orchestrators

BaseOrchestrator #

Bases: ABC

Base class for an orchestrator.

The general idea for an orchestrator is to manage the flow of messages between services.

Given some state, and task, figure out the next messages to publish. Then, once the messages are processed, update the state with the results.

Source code in llama_deploy/llama_deploy/orchestrators/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseOrchestrator(ABC):
    """Base class for an orchestrator.

    The general idea for an orchestrator is to manage the flow of messages between services.

    Given some state, and task, figure out the next messages to publish. Then, once
    the messages are processed, update the state with the results.
    """

    @abstractmethod
    async def get_next_messages(
        self, task_def: TaskDefinition, state: Dict[str, Any]
    ) -> Tuple[List[QueueMessage], Dict[str, Any]]:
        """Get the next message to process. Returns the message and the new state."""
        ...

    @abstractmethod
    async def add_result_to_state(
        self, result: TaskResult, state: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Add the result of processing a message to the state. Returns the new state."""
        ...

get_next_messages abstractmethod async #

get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]

Get the next message to process. Returns the message and the new state.

Source code in llama_deploy/llama_deploy/orchestrators/base.py
17
18
19
20
21
22
@abstractmethod
async def get_next_messages(
    self, task_def: TaskDefinition, state: Dict[str, Any]
) -> Tuple[List[QueueMessage], Dict[str, Any]]:
    """Get the next message to process. Returns the message and the new state."""
    ...

add_result_to_state abstractmethod async #

add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]

Add the result of processing a message to the state. Returns the new state.

Source code in llama_deploy/llama_deploy/orchestrators/base.py
24
25
26
27
28
29
@abstractmethod
async def add_result_to_state(
    self, result: TaskResult, state: Dict[str, Any]
) -> Dict[str, Any]:
    """Add the result of processing a message to the state. Returns the new state."""
    ...

SimpleOrchestrator #

Bases: BaseOrchestrator

A simple orchestrator that handles orchestration between a service and a user.

Currently, the final message is published to the human message queue for final processing.

Source code in llama_deploy/llama_deploy/orchestrators/simple.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class SimpleOrchestrator(BaseOrchestrator):
    """A simple orchestrator that handles orchestration between a service and a user.

    Currently, the final message is published to the `human` message queue for final processing.
    """

    def __init__(self, max_retries: int = 3, final_message_type: str = "human") -> None:
        self.max_retries = max_retries
        self.final_message_type = final_message_type

    async def get_next_messages(
        self, task_def: TaskDefinition, state: Dict[str, Any]
    ) -> Tuple[List[QueueMessage], Dict[str, Any]]:
        """Get the next message to process. Returns the message and the new state.

        Assumes the agent_id (i.e. the service name) is the destination for the next message.

        Runs the required service, then sends the result to the final message type.
        """

        destination_messages = []

        if task_def.agent_id is None:
            raise ValueError(
                "Task definition must have an agent_id specified as a service name"
            )

        if task_def.task_id not in state:
            state[task_def.task_id] = {}

        result_key = get_result_key(task_def.task_id)
        if state.get(result_key, None) is not None:
            result = state[result_key]
            if not isinstance(result, TaskResult):
                if isinstance(result, str):
                    result = TaskResult(**json.loads(result))
                elif isinstance(result, dict):
                    result = TaskResult(**result)
                else:
                    raise ValueError(f"Result must be a TaskResult, not {type(result)}")

            assert isinstance(result, TaskResult), "Result must be a TaskResult"

            if self.final_message_type is not None:
                destination = self.final_message_type

                destination_messages = [
                    QueueMessage(
                        type=destination,
                        action=ActionTypes.COMPLETED_TASK,
                        data=result.model_dump(),
                    )
                ]
        else:
            destination = task_def.agent_id
            destination_messages = [
                QueueMessage(
                    type=destination,
                    action=ActionTypes.NEW_TASK,
                    data=NewTask(
                        task=task_def, state=state[task_def.task_id]
                    ).model_dump(),
                )
            ]

        return destination_messages, state

    async def add_result_to_state(
        self, result: TaskResult, state: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Add the result of processing a message to the state. Returns the new state."""

        # TODO: detect failures + retries
        cur_retries = state.get("retries", -1) + 1
        state["retries"] = cur_retries

        state[get_result_key(result.task_id)] = result

        return state

get_next_messages async #

get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]

Get the next message to process. Returns the message and the new state.

Assumes the agent_id (i.e. the service name) is the destination for the next message.

Runs the required service, then sends the result to the final message type.

Source code in llama_deploy/llama_deploy/orchestrators/simple.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def get_next_messages(
    self, task_def: TaskDefinition, state: Dict[str, Any]
) -> Tuple[List[QueueMessage], Dict[str, Any]]:
    """Get the next message to process. Returns the message and the new state.

    Assumes the agent_id (i.e. the service name) is the destination for the next message.

    Runs the required service, then sends the result to the final message type.
    """

    destination_messages = []

    if task_def.agent_id is None:
        raise ValueError(
            "Task definition must have an agent_id specified as a service name"
        )

    if task_def.task_id not in state:
        state[task_def.task_id] = {}

    result_key = get_result_key(task_def.task_id)
    if state.get(result_key, None) is not None:
        result = state[result_key]
        if not isinstance(result, TaskResult):
            if isinstance(result, str):
                result = TaskResult(**json.loads(result))
            elif isinstance(result, dict):
                result = TaskResult(**result)
            else:
                raise ValueError(f"Result must be a TaskResult, not {type(result)}")

        assert isinstance(result, TaskResult), "Result must be a TaskResult"

        if self.final_message_type is not None:
            destination = self.final_message_type

            destination_messages = [
                QueueMessage(
                    type=destination,
                    action=ActionTypes.COMPLETED_TASK,
                    data=result.model_dump(),
                )
            ]
    else:
        destination = task_def.agent_id
        destination_messages = [
            QueueMessage(
                type=destination,
                action=ActionTypes.NEW_TASK,
                data=NewTask(
                    task=task_def, state=state[task_def.task_id]
                ).model_dump(),
            )
        ]

    return destination_messages, state

add_result_to_state async #

add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]

Add the result of processing a message to the state. Returns the new state.

Source code in llama_deploy/llama_deploy/orchestrators/simple.py
86
87
88
89
90
91
92
93
94
95
96
97
async def add_result_to_state(
    self, result: TaskResult, state: Dict[str, Any]
) -> Dict[str, Any]:
    """Add the result of processing a message to the state. Returns the new state."""

    # TODO: detect failures + retries
    cur_retries = state.get("retries", -1) + 1
    state["retries"] = cur_retries

    state[get_result_key(result.task_id)] = result

    return state

options: members: - BaseOrchestrator - SimpleOrchestrator - SimpleOrchestratorConfig