Skip to content

deploy#

deploy_core async #

deploy_core(control_plane_config: ControlPlaneConfig | None = None, message_queue_config: BaseSettings | None = None, orchestrator_config: SimpleOrchestratorConfig | None = None, disable_message_queue: bool = False, disable_control_plane: bool = False) -> None

Deploy the core components of the llama_deploy system.

This function sets up and launches the message queue, control plane, and orchestrator. It handles the initialization and connection of these core components.

Parameters:

Name Type Description Default
control_plane_config Optional[ControlPlaneConfig]

Configuration for the control plane.

None
message_queue_config Optional[BaseSettings]

Configuration for the message queue. Defaults to a local SimpleMessageQueue.

None
orchestrator_config Optional[SimpleOrchestratorConfig]

Configuration for the orchestrator. If not provided, a default SimpleOrchestratorConfig will be used.

None
disable_message_queue bool

Whether to disable deploying the message queue. Defaults to False.

False
disable_control_plane bool

Whether to disable deploying the control plane. Defaults to False.

False

Raises:

Type Description
ValueError

If an unknown message queue type is specified in the config.

Exception

If any of the launched tasks encounter an error.

Source code in llama_deploy/deploy/deploy.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def deploy_core(
    control_plane_config: ControlPlaneConfig | None = None,
    message_queue_config: BaseSettings | None = None,
    orchestrator_config: SimpleOrchestratorConfig | None = None,
    disable_message_queue: bool = False,
    disable_control_plane: bool = False,
) -> None:
    """
    Deploy the core components of the llama_deploy system.

    This function sets up and launches the message queue, control plane, and orchestrator.
    It handles the initialization and connection of these core components.

    Args:
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
            If not provided, a default SimpleOrchestratorConfig will be used.
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.

    Raises:
        ValueError: If an unknown message queue type is specified in the config.
        Exception: If any of the launched tasks encounter an error.
    """
    control_plane_config = control_plane_config or ControlPlaneConfig()
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()

    tasks = []

    message_queue_client = _get_message_queue_client(message_queue_config)
    # If needed, start the SimpleMessageQueueServer
    if (
        isinstance(message_queue_config, SimpleMessageQueueConfig)
        and not disable_message_queue
    ):
        queue = SimpleMessageQueueServer(message_queue_config)
        tasks.append(asyncio.create_task(queue.launch_server()))
        # let message queue boot up
        await asyncio.sleep(2)

    if not disable_control_plane:
        control_plane = ControlPlaneServer(
            message_queue_client,
            SimpleOrchestrator(**orchestrator_config.model_dump()),
            config=control_plane_config,
        )
        tasks.append(asyncio.create_task(control_plane.launch_server()))
        # let service spin up
        await asyncio.sleep(2)
        # register the control plane as a consumer
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
        tasks.append(asyncio.create_task(control_plane_consumer_fn()))

    # let things run
    try:
        await asyncio.gather(*tasks)
    except (Exception, asyncio.CancelledError):
        await message_queue_client.cleanup()
        for task in tasks:
            if not task.done():
                task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

deploy_workflow async #

deploy_workflow(workflow: Workflow, workflow_config: WorkflowServiceConfig, control_plane_config: ControlPlaneConfig | None = None) -> None

Deploy a workflow as a service within the llama_deploy system.

This function sets up a workflow as a service, connects it to the message queue, and registers it with the control plane.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to be deployed as a service.

required
workflow_config WorkflowServiceConfig

Configuration for the workflow service.

required
control_plane_config Optional[ControlPlaneConfig]

Configuration for the control plane.

None

Raises:

Type Description
HTTPError

If there's an error communicating with the control plane.

ValueError

If an invalid message queue config is encountered.

Exception

If any of the launched tasks encounter an error.

Source code in llama_deploy/deploy/deploy.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def deploy_workflow(
    workflow: Workflow,
    workflow_config: WorkflowServiceConfig,
    control_plane_config: ControlPlaneConfig | None = None,
) -> None:
    """
    Deploy a workflow as a service within the llama_deploy system.

    This function sets up a workflow as a service, connects it to the message queue,
    and registers it with the control plane.

    Args:
        workflow (Workflow): The workflow to be deployed as a service.
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.

    Raises:
        httpx.HTTPError: If there's an error communicating with the control plane.
        ValueError: If an invalid message queue config is encountered.
        Exception: If any of the launched tasks encounter an error.
    """
    control_plane_config = control_plane_config or ControlPlaneConfig()
    control_plane_url = control_plane_config.url

    async with httpx.AsyncClient() as client:
        response = await client.get(f"{control_plane_url}/queue_config")
        queue_config_dict = response.json()

    message_queue_config = _get_message_queue_config(queue_config_dict)
    message_queue_client = _get_message_queue_client(message_queue_config)

    # override the service manager, while maintaining dict of existing services
    workflow._service_manager = NetworkServiceManager(
        workflow._service_manager._services
    )

    service = WorkflowService(
        workflow=workflow,
        message_queue=message_queue_client,
        config=workflow_config,
    )

    service_task = asyncio.create_task(service.launch_server())

    # let service spin up
    await asyncio.sleep(1)

    # register to control plane
    await service.register_to_control_plane(control_plane_url)

    # register to message queue
    consumer_fn = await service.register_to_message_queue()

    # create consumer task
    consumer_task = asyncio.create_task(consumer_fn())

    # let things sync up
    await asyncio.sleep(1)

    try:
        # Propagate the exception if any of the tasks exited with an error
        await asyncio.gather(service_task, consumer_task, return_exceptions=True)
    except asyncio.CancelledError:
        consumer_task.cancel()
        service_task.cancel()

        await asyncio.gather(service_task, consumer_task)