Skip to content

apiserver#

Deployment #

A Deployment consists of running services and core component instances.

Every Deployment is self contained, running a dedicated instance of the control plane and the message queue along with any service defined in the configuration object.

Source code in llama_deploy/apiserver/deployment.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class Deployment:
    """A Deployment consists of running services and core component instances.

    Every Deployment is self contained, running a dedicated instance of the control plane
    and the message queue along with any service defined in the configuration object.
    """

    def __init__(self, *, config: Config, root_path: Path) -> None:
        """Creates a Deployment instance.

        Args:
            config: The configuration object defining this deployment
            root_path: The path on the filesystem used to store deployment data
        """
        self._name = config.name
        self._path = root_path / config.name
        self._simple_message_queue: SimpleMessageQueue | None = None
        self._queue_client = self._load_message_queue_client(config.message_queue)
        self._control_plane_config = config.control_plane
        self._control_plane = ControlPlaneServer(
            self._queue_client,
            SimpleOrchestrator(**SimpleOrchestratorConfig().model_dump()),
            config=config.control_plane,
        )
        self._workflow_services: list[WorkflowService] = self._load_services(config)
        self._client = Client(control_plane_url=config.control_plane.url)
        self._default_service = config.default_service

    @property
    def default_service(self) -> str | None:
        return self._default_service

    @property
    def client(self) -> Client:
        """Returns an async client to interact with this deployment."""
        return self._client

    @property
    def name(self) -> str:
        """Returns the name of this deployment."""
        return self._name

    @property
    def path(self) -> Path:
        """Returns the absolute path to the root of this deployment."""
        return self._path.resolve()

    async def start(self) -> None:
        """The task that will be launched in this deployment asyncio loop.

        This task is responsible for launching asyncio tasks for the core components and the services.
        All the tasks are gathered before returning.
        """
        tasks = []

        # Spawn SimpleMessageQueue if needed
        if self._simple_message_queue:
            # If SimpleMessageQueue was selected in the config file we take care of running the task
            tasks.append(
                asyncio.create_task(self._simple_message_queue.launch_server())
            )
            # the other components need the queue to run in order to start, give the queue some time to start
            # FIXME: having to await a magic number of seconds is very brittle, we should rethink the bootstrap process
            await asyncio.sleep(1)

        # Control Plane
        cp_consumer_fn = await self._control_plane.register_to_message_queue()
        tasks.append(asyncio.create_task(self._control_plane.launch_server()))
        tasks.append(asyncio.create_task(cp_consumer_fn()))

        # Services
        for wfs in self._workflow_services:
            service_task = asyncio.create_task(wfs.launch_server())
            tasks.append(service_task)
            consumer_fn = await wfs.register_to_message_queue()
            control_plane_url = f"http://{self._control_plane_config.host}:{self._control_plane_config.port}"
            await wfs.register_to_control_plane(control_plane_url)
            consumer_task = asyncio.create_task(consumer_fn())
            tasks.append(consumer_task)

        # Run allthethings
        await asyncio.gather(*tasks)

    def _load_services(self, config: Config) -> list[WorkflowService]:
        """Creates WorkflowService instances according to the configuration object."""
        workflow_services = []
        for service_id, service_config in config.services.items():
            source = service_config.source
            if source is None:
                # this is a default service, skip for now
                # TODO: check the service name is valid and supported
                # TODO: possibly start the default service if not running already
                continue

            # FIXME: Momentarily assuming everything is a workflow
            if service_config.path is None:
                msg = "path field in service definition must be set"
                raise ValueError(msg)

            if service_config.port is None:
                # This won't happen if we arrive here from Manager.deploy(), the manager will assign a port
                msg = "port field in service definition must be set"
                raise ValueError(msg)

            if service_config.host is None:
                # This won't happen if we arrive here from Manager.deploy(), the manager will assign a host
                msg = "host field in service definition must be set"
                raise ValueError(msg)

            # Sync the service source
            destination = self._path / service_id
            source_manager = SOURCE_MANAGERS[source.type]
            source_manager.sync(source.name, str(destination.resolve()))

            # Install dependencies
            self._install_dependencies(service_config)

            # Search for a workflow instance in the service path
            pythonpath = (destination / service_config.path).parent.resolve()
            sys.path.append(str(pythonpath))
            module_name, workflow_name = Path(service_config.path).name.split(":")
            module = importlib.import_module(module_name)
            workflow = getattr(module, workflow_name)
            workflow_config = WorkflowServiceConfig(
                host=service_config.host,
                port=service_config.port,
                internal_host="0.0.0.0",
                internal_port=service_config.port,
                service_name=service_id,
            )
            workflow_services.append(
                WorkflowService(
                    workflow=workflow,
                    message_queue=self._queue_client,
                    **workflow_config.model_dump(),
                )
            )

        return workflow_services

    @staticmethod
    def _install_dependencies(service_config: Service) -> None:
        """Runs `pip install` on the items listed under `python-dependencies` in the service configuration."""
        if not service_config.python_dependencies:
            return

        try:
            subprocess.check_call(
                [
                    sys.executable,
                    "-m",
                    "pip",
                    "install",
                    *service_config.python_dependencies,
                ]
            )
        except subprocess.CalledProcessError as e:
            msg = f"Unable to install service dependencies using command '{e.cmd}': {e.stderr}"
            raise DeploymentError(msg) from None

    def _load_message_queue_client(
        self, cfg: MessageQueueConfig | None
    ) -> BaseMessageQueue:
        # Use the SimpleMessageQueue as the default
        if cfg is None:
            # we use model_validate instead of __init__ to avoid static checkers complaining over field aliases
            cfg = SimpleMessageQueueConfig()

        if cfg.type == "aws":
            return AWSMessageQueue(**cfg.model_dump())
        elif cfg.type == "kafka":
            return KafkaMessageQueue(cfg)  # type: ignore
        elif cfg.type == "rabbitmq":
            return RabbitMQMessageQueue(cfg)  # type: ignore
        elif cfg.type == "redis":
            return RedisMessageQueue(**cfg.model_dump())
        elif cfg.type == "simple":
            self._simple_message_queue = SimpleMessageQueue(**cfg.model_dump())
            return self._simple_message_queue.client
        elif cfg.type == "solace":
            return SolaceMessageQueue(**cfg.model_dump())
        else:
            msg = f"Unsupported message queue: {cfg.type}"
            raise ValueError(msg)

client property #

client: Client

Returns an async client to interact with this deployment.

name property #

name: str

Returns the name of this deployment.

path property #

path: Path

Returns the absolute path to the root of this deployment.

start async #

start() -> None

The task that will be launched in this deployment asyncio loop.

This task is responsible for launching asyncio tasks for the core components and the services. All the tasks are gathered before returning.

Source code in llama_deploy/apiserver/deployment.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def start(self) -> None:
    """The task that will be launched in this deployment asyncio loop.

    This task is responsible for launching asyncio tasks for the core components and the services.
    All the tasks are gathered before returning.
    """
    tasks = []

    # Spawn SimpleMessageQueue if needed
    if self._simple_message_queue:
        # If SimpleMessageQueue was selected in the config file we take care of running the task
        tasks.append(
            asyncio.create_task(self._simple_message_queue.launch_server())
        )
        # the other components need the queue to run in order to start, give the queue some time to start
        # FIXME: having to await a magic number of seconds is very brittle, we should rethink the bootstrap process
        await asyncio.sleep(1)

    # Control Plane
    cp_consumer_fn = await self._control_plane.register_to_message_queue()
    tasks.append(asyncio.create_task(self._control_plane.launch_server()))
    tasks.append(asyncio.create_task(cp_consumer_fn()))

    # Services
    for wfs in self._workflow_services:
        service_task = asyncio.create_task(wfs.launch_server())
        tasks.append(service_task)
        consumer_fn = await wfs.register_to_message_queue()
        control_plane_url = f"http://{self._control_plane_config.host}:{self._control_plane_config.port}"
        await wfs.register_to_control_plane(control_plane_url)
        consumer_task = asyncio.create_task(consumer_fn())
        tasks.append(consumer_task)

    # Run allthethings
    await asyncio.gather(*tasks)

Manager #

The Manager orchestrates deployments and their runtime.

Usage example
config = Config.from_yaml(data_path / "git_service.yaml")
manager = Manager(tmp_path)
t = threading.Thread(target=asyncio.run, args=(manager.serve(),))
t.start()
manager.deploy(config)
t.join()
Source code in llama_deploy/apiserver/deployment.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
class Manager:
    """The Manager orchestrates deployments and their runtime.

    Usage example:
        ```python
        config = Config.from_yaml(data_path / "git_service.yaml")
        manager = Manager(tmp_path)
        t = threading.Thread(target=asyncio.run, args=(manager.serve(),))
        t.start()
        manager.deploy(config)
        t.join()
        ```
    """

    def __init__(
        self, deployments_path: Path = Path(".deployments"), max_deployments: int = 10
    ) -> None:
        """Creates a Manager instance.

        Args:
            deployments_path: The filesystem path where deployments will create their root path.
            max_deployments: The maximum number of deployments supported by this manager.
        """
        self._deployments: dict[str, Any] = {}
        self._deployments_path = deployments_path
        self._max_deployments = max_deployments
        self._pool = ThreadPool(processes=max_deployments)
        self._control_plane_port = 8002

    @property
    def deployment_names(self) -> list[str]:
        """Return a list of names for the active deployments."""
        return list(self._deployments.keys())

    def get_deployment(self, deployment_name: str) -> Deployment | None:
        return self._deployments.get(deployment_name)

    async def serve(self) -> None:
        """The server loop, it keeps the manager running."""
        event = asyncio.Event()
        try:
            # Waits indefinitely since `event` will never be set
            await event.wait()
        except asyncio.CancelledError:
            pass

    def deploy(self, config: Config) -> None:
        """Creates a Deployment instance and starts the relative runtime.

        Args:
            config: The deployment configuration.

        Raises:
            ValueError: If a deployment with the same name already exists or the maximum number of deployment exceeded.
            DeploymentError: If it wasn't possible to create a deployment.
        """
        if config.name in self._deployments:
            msg = f"Deployment already exists: {config.name}"
            raise ValueError(msg)

        if len(self._deployments) == self._max_deployments:
            msg = "Reached the maximum number of deployments, cannot schedule more"
            raise ValueError(msg)

        self._assign_control_plane_address(config)

        deployment = Deployment(config=config, root_path=self._deployments_path)
        self._deployments[config.name] = deployment
        self._pool.apply_async(func=asyncio.run, args=(deployment.start(),))

    def _assign_control_plane_address(self, config: Config) -> None:
        for service in config.services.values():
            if not service.port:
                service.port = self._control_plane_port
                self._control_plane_port += 1
            if not service.host:
                service.host = "localhost"

deployment_names property #

deployment_names: list[str]

Return a list of names for the active deployments.

serve async #

serve() -> None

The server loop, it keeps the manager running.

Source code in llama_deploy/apiserver/deployment.py
263
264
265
266
267
268
269
270
async def serve(self) -> None:
    """The server loop, it keeps the manager running."""
    event = asyncio.Event()
    try:
        # Waits indefinitely since `event` will never be set
        await event.wait()
    except asyncio.CancelledError:
        pass

deploy #

deploy(config: Config) -> None

Creates a Deployment instance and starts the relative runtime.

Parameters:

Name Type Description Default
config Config

The deployment configuration.

required

Raises:

Type Description
ValueError

If a deployment with the same name already exists or the maximum number of deployment exceeded.

DeploymentError

If it wasn't possible to create a deployment.

Source code in llama_deploy/apiserver/deployment.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def deploy(self, config: Config) -> None:
    """Creates a Deployment instance and starts the relative runtime.

    Args:
        config: The deployment configuration.

    Raises:
        ValueError: If a deployment with the same name already exists or the maximum number of deployment exceeded.
        DeploymentError: If it wasn't possible to create a deployment.
    """
    if config.name in self._deployments:
        msg = f"Deployment already exists: {config.name}"
        raise ValueError(msg)

    if len(self._deployments) == self._max_deployments:
        msg = "Reached the maximum number of deployments, cannot schedule more"
        raise ValueError(msg)

    self._assign_control_plane_address(config)

    deployment = Deployment(config=config, root_path=self._deployments_path)
    self._deployments[config.name] = deployment
    self._pool.apply_async(func=asyncio.run, args=(deployment.start(),))

Config #

Bases: BaseModel

Model definition mapping a deployment config file.

Parameters:

Name Type Description Default
name str
required
control_plane ControlPlaneConfig
required
message_queue Annotated[Union[AWSMessageQueueConfig, KafkaMessageQueueConfig, RabbitMQMessageQueueConfig, RedisMessageQueueConfig, SimpleMessageQueueConfig, SolaceMessageQueueConfig], FieldInfo] | None
None
default_service str | None
None
services dict[str, Service]
required
Source code in llama_deploy/apiserver/config_parser.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Config(BaseModel):
    """Model definition mapping a deployment config file."""

    name: str
    control_plane: ControlPlaneConfig = Field(alias="control-plane")
    message_queue: MessageQueueConfig | None = Field(None, alias="message-queue")
    default_service: str | None = Field(None, alias="default-service")
    services: dict[str, Service]

    @classmethod
    def from_yaml_bytes(cls, src: bytes) -> Self:
        """Read config data from bytes containing yaml code."""
        config = yaml.safe_load(src) or {}
        return cls(**config)

    @classmethod
    def from_yaml(cls, path: Path) -> Self:
        """Read config data from a yaml file."""
        with open(path, "r") as yaml_file:
            config = yaml.safe_load(yaml_file) or {}
        return cls(**config)

from_yaml_bytes classmethod #

from_yaml_bytes(src: bytes) -> Self

Read config data from bytes containing yaml code.

Source code in llama_deploy/apiserver/config_parser.py
74
75
76
77
78
@classmethod
def from_yaml_bytes(cls, src: bytes) -> Self:
    """Read config data from bytes containing yaml code."""
    config = yaml.safe_load(src) or {}
    return cls(**config)

from_yaml classmethod #

from_yaml(path: Path) -> Self

Read config data from a yaml file.

Source code in llama_deploy/apiserver/config_parser.py
80
81
82
83
84
85
@classmethod
def from_yaml(cls, path: Path) -> Self:
    """Read config data from a yaml file."""
    with open(path, "r") as yaml_file:
        config = yaml.safe_load(yaml_file) or {}
    return cls(**config)

GitSourceManager #

A SourceManager specialized for sources of type git.

Source code in llama_deploy/apiserver/source_managers/git.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class GitSourceManager:
    """A SourceManager specialized for sources of type `git`."""

    def sync(self, source: str, destination: str | None = None) -> None:
        """Clones the repository at URL `source` into a local path `destination`.

        Args:
            source: The URL of the git repository. It can optionally contain a branch target using the name convention
                `git_repo_url@branch_name`. For example, "https://example.com/llama_deploy.git@branch_name".
            destination: The path in the local filesystem where to clone the git repository.
        """
        if not destination:
            raise ValueError("Destination cannot be empty")

        url, branch_name = self._parse_source(source)
        kwargs: dict[str, Any] = {"url": url, "to_path": destination}
        if branch_name:
            kwargs["multi_options"] = [f"-b {branch_name}", "--single-branch"]

        Repo.clone_from(**kwargs)

    @staticmethod
    def _parse_source(source: str) -> tuple[str, str | None]:
        branch_name = None
        toks = source.split("@")
        url = toks[0]
        if len(toks) > 1:
            branch_name = toks[1]

        return url, branch_name

sync #

sync(source: str, destination: str | None = None) -> None

Clones the repository at URL source into a local path destination.

Parameters:

Name Type Description Default
source str

The URL of the git repository. It can optionally contain a branch target using the name convention git_repo_url@branch_name. For example, "https://example.com/llama_deploy.git@branch_name".

required
destination str | None

The path in the local filesystem where to clone the git repository.

None
Source code in llama_deploy/apiserver/source_managers/git.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def sync(self, source: str, destination: str | None = None) -> None:
    """Clones the repository at URL `source` into a local path `destination`.

    Args:
        source: The URL of the git repository. It can optionally contain a branch target using the name convention
            `git_repo_url@branch_name`. For example, "https://example.com/llama_deploy.git@branch_name".
        destination: The path in the local filesystem where to clone the git repository.
    """
    if not destination:
        raise ValueError("Destination cannot be empty")

    url, branch_name = self._parse_source(source)
    kwargs: dict[str, Any] = {"url": url, "to_path": destination}
    if branch_name:
        kwargs["multi_options"] = [f"-b {branch_name}", "--single-branch"]

    Repo.clone_from(**kwargs)

LocalSourceManager #

A SourceManager specialized for sources of type local.

Source code in llama_deploy/apiserver/source_managers/local.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class LocalSourceManager:
    """A SourceManager specialized for sources of type `local`."""

    def sync(self, source: str, destination: str | None = None) -> None:
        """Copies the folder with path `source` into a local path `destination`.

        Args:
            source: The filesystem path to the folder containing the source code.
            destination: The path in the local filesystem where to copy the source directory.
        """
        if not destination:
            raise ValueError("Destination cannot be empty")

        try:
            shutil.copytree(source, destination, dirs_exist_ok=True)
        except shutil.Error as e:
            msg = f"Unable to copy {source} into {destination}: {e}"
            raise ValueError(msg) from e

sync #

sync(source: str, destination: str | None = None) -> None

Copies the folder with path source into a local path destination.

Parameters:

Name Type Description Default
source str

The filesystem path to the folder containing the source code.

required
destination str | None

The path in the local filesystem where to copy the source directory.

None
Source code in llama_deploy/apiserver/source_managers/local.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def sync(self, source: str, destination: str | None = None) -> None:
    """Copies the folder with path `source` into a local path `destination`.

    Args:
        source: The filesystem path to the folder containing the source code.
        destination: The path in the local filesystem where to copy the source directory.
    """
    if not destination:
        raise ValueError("Destination cannot be empty")

    try:
        shutil.copytree(source, destination, dirs_exist_ok=True)
    except shutil.Error as e:
        msg = f"Unable to copy {source} into {destination}: {e}"
        raise ValueError(msg) from e

SourceManager #

Bases: Protocol

Protocol to be implemented by classes responsible for managing Deployment sources.

Source code in llama_deploy/apiserver/source_managers/__init__.py
 9
10
11
12
13
14
15
16
17
18
19
class SourceManager(Protocol):
    """Protocol to be implemented by classes responsible for managing Deployment sources."""

    def sync(
        self, source: str, destination: str | None = None
    ) -> None:  # pragma: no cover
        """Fetches resources from `source` so they can be used in a deployment.

        Optionally uses `destination` to store data when this makes sense for the
        specific source type.
        """

sync #

sync(source: str, destination: str | None = None) -> None

Fetches resources from source so they can be used in a deployment.

Optionally uses destination to store data when this makes sense for the specific source type.

Source code in llama_deploy/apiserver/source_managers/__init__.py
12
13
14
15
16
17
18
19
def sync(
    self, source: str, destination: str | None = None
) -> None:  # pragma: no cover
    """Fetches resources from `source` so they can be used in a deployment.

    Optionally uses `destination` to store data when this makes sense for the
    specific source type.
    """