orchestrators
#
BaseOrchestrator #
Bases: ABC
Base class for an orchestrator.
The general idea for an orchestrator is to manage the flow of messages between services.
Given some state, and task, figure out the next messages to publish. Then, once the messages are processed, update the state with the results.
Source code in llama_deploy/orchestrators/base.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
get_next_messages
abstractmethod
async
#
get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]
Get the next message to process. Returns the message and the new state.
Source code in llama_deploy/orchestrators/base.py
17 18 19 20 21 22 |
|
add_result_to_state
abstractmethod
async
#
add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]
Add the result of processing a message to the state. Returns the new state.
Source code in llama_deploy/orchestrators/base.py
24 25 26 27 28 29 |
|
SimpleOrchestrator #
Bases: BaseOrchestrator
A simple orchestrator that handles orchestration between a service and a user.
Currently, the final message is published to the human
message queue for final processing.
Source code in llama_deploy/orchestrators/simple.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
get_next_messages
async
#
get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]
Get the next message to process. Returns the message and the new state.
Assumes the agent_id (i.e. the service name) is the destination for the next message.
Runs the required service, then sends the result to the final message type.
Source code in llama_deploy/orchestrators/simple.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
|
add_result_to_state
async
#
add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]
Add the result of processing a message to the state. Returns the new state.
Source code in llama_deploy/orchestrators/simple.py
86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
SimpleOrchestratorConfig #
Bases: BaseSettings
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_retries |
int
|
|
3
|
final_message_type |
str | None
|
|
None
|
Source code in llama_deploy/orchestrators/simple.py
12 13 14 15 16 |
|