Skip to content

Python SDK#

Llama Deploy provides a Python SDK for interacting with deployed systems. The SDK supports both synchronous and asynchronous operations through a unified client interface. The asynchronous API is recommended for production use.

Getting started#

Creating a client is as simple as this:

import llama_deploy

client = llama_deploy.Client()

Client Configuration#

The client can be configured either through constructor arguments or environment variables. To set a configuration parameter through environment variables, their name should be the uppercase version of the parameter name, prefixed by the string LLAMA_DEPLOY_:

import os
import llama_deploy

# Set `disable_ssl` to False with an environment variable
os.environ["LLAMA_DEPLOY_DISABLE_SSL"] = "False"


async def check_status():
    # Pass other config settings to the Client constructor
    client = llama_deploy.Client(
        api_server_url="http://localhost:4501", timeout=10
    )
    status = await client.apiserver.status()
    print(status)

Note

For a list of all the available configuration parameters, see the dedicated API Reference section.

Client Components#

The client provides access to two main components:

Each component exposes specific methods for managing and interacting with the deployed system.

Note

For a complete list of available methods and detailed API reference, see the API Reference section.

Usage Examples#

Asynchronous Operations#

import llama_deploy


async def check_status():
    client = llama_deploy.Client()
    status = await client.apiserver.status()
    print(status)

Synchronous Operations#

import llama_deploy

client = llama_deploy.Client()
status = client.sync.apiserver.status()
print(status)

Important

The synchronous API (client.sync) cannot be used within an async event loop. Use the async methods directly in that case.

A more complex example#

This is an example of how you would use the recommended async version of the client to run a deployed workflow and collect the events it streams.

import llama_deploy


async def stream_events(services):
    client = llama_deploy.Client(timeout=10)

    # Create a new session
    session = await client.core.sessions.create()

    # Assuming there's a workflow called `streaming_workflow`, run it in the background
    task_id = await session.run_nowait(
        "streaming_workflow", arg="Hello, world!"
    )

    # The workflow is supposed to stream events signalling its progress
    async for event in session.get_task_result_stream(task_id):
        if "progress" in event:
            print(f'Workflow Progress: {event["progress"]}')

    # When done, collect the workflow output
    final_result = await session.get_task_result(task_id)
    print(final_result)

    # Clean up the session
    await client.core.sessions.delete(session.id)

The equivalent synchronous version would be the following:

import llama_deploy


def stream_events(services):
    client = llama_deploy.Client(timeout=10)

    # Create a new session
    session = client.sync.core.sessions.create()

    # Assuming there's a workflow called `streaming_workflow`, run it
    task_id = session.run_nowait("streaming_workflow", arg1="hello_world")

    # The workflow is supposed to stream events signalling its progress.
    # Since this is a synchronous call, by this time all the events were
    # streamed and collected in a list.
    for event in session.get_task_result_stream(task_id):
        if "progress" in event:
            print(f'Workflow Progress: {event["progress"]}')

    # Collect the workflow output
    final_result = session.get_task_result(task_id)
    print(final_result)

    # Clean up the session
    client.sync.core.sessions.delete(session.id)