Skip to content

SDK

aana.sdk.AanaSDK

AanaSDK(name='app', migration_func=None, retryable_exceptions=None)

Aana SDK to deploy and manage Aana deployments and endpoints.

PARAMETER DESCRIPTION
name

The name of the application. Defaults to "app".

TYPE: str DEFAULT: 'app'

migration_func

The migration function to run. Defaults to None.

TYPE: Callable | None DEFAULT: None

retryable_exceptions

The exceptions that can be retried in the task queue. Defaults to ['InferenceException', 'ActorDiedError', 'OutOfMemoryError'].

TYPE: list[Exception, str] | None DEFAULT: None

Source code in aana/sdk.py
def __init__(
    self,
    name: str = "app",
    migration_func: Callable | None = None,
    retryable_exceptions: list[Exception, str] | None = None,
):
    """Aana SDK to deploy and manage Aana deployments and endpoints.

    Args:
        name (str, optional): The name of the application. Defaults to "app".
        migration_func (Callable | None): The migration function to run. Defaults to None.
        retryable_exceptions (list[Exception, str] | None): The exceptions that can be retried in the task queue.
                                                            Defaults to ['InferenceException', 'ActorDiedError', 'OutOfMemoryError'].
    """
    self.name = name
    self.migration_func = migration_func
    self.endpoints: dict[str, Endpoint] = {}
    self.deployments: dict[str, Deployment] = {}
    self.routers: dict[str, APIRouter] = {}

    if retryable_exceptions is None:
        self.retryable_exceptions = [
            "InferenceException",
            "ActorDiedError",
            "OutOfMemoryError",
        ]
    else:
        self.retryable_exceptions = retryable_exceptions
    # Convert exceptions to string if they are not already
    # to avoid serialization issues
    self.retryable_exceptions = [
        exc if isinstance(exc, str) else exc.__name__
        for exc in self.retryable_exceptions
    ]

    if aana_settings.task_queue.enabled:
        self.add_task_queue(deploy=False)

connect

connect(port=8000, host='127.0.0.1', address='auto', dashboard_host='127.0.0.1', dashboard_port=8265, show_logs=False, num_cpus=None, num_gpus=None)

Connect to a Ray cluster or start a new Ray cluster and Ray Serve.

PARAMETER DESCRIPTION
port

The port to run the Aana server on. Defaults to 8000.

TYPE: int DEFAULT: 8000

host

The host to run the Aana server on. Defaults to "127.0.0.1".

TYPE: str DEFAULT: '127.0.0.1'

address

The address of the Ray cluster. Defaults to "auto".

TYPE: str DEFAULT: 'auto'

dashboard_host

The host to bind the dashboard server to. Can either be localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines.

TYPE: str DEFAULT: '127.0.0.1'

dashboard_port

The port to bind the dashboard server to. Defaults to 8265.

TYPE: int DEFAULT: 8265

show_logs

If True, the logs will be shown, otherwise they will be hidden but can be accessed in the Ray dashboard. Defaults to False.

TYPE: bool DEFAULT: False

num_cpus

Number of CPUs the user wishes to assign to each raylet. By default, this is set based on virtual cores.

TYPE: int DEFAULT: None

num_gpus

Number of GPUs the user wishes to assign to each raylet. By default, this is set based on detected GPUs.

TYPE: int DEFAULT: None

Source code in aana/sdk.py
def connect(
    self,
    port: int = 8000,
    host: str = "127.0.0.1",
    address: str = "auto",
    dashboard_host: str = "127.0.0.1",
    dashboard_port: int = 8265,
    show_logs: bool = False,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
) -> "AanaSDK":
    """Connect to a Ray cluster or start a new Ray cluster and Ray Serve.

    Args:
        port (int, optional): The port to run the Aana server on. Defaults to 8000.
        host (str, optional): The host to run the Aana server on. Defaults to "127.0.0.1".
        address (str, optional): The address of the Ray cluster. Defaults to "auto".
        dashboard_host (str, optional):  The host to bind the dashboard server to. Can either be
            localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces).
            By default, this is set to localhost to prevent access from external machines.
        dashboard_port (int, optional): The port to bind the dashboard server to. Defaults to 8265.
        show_logs (bool, optional): If True, the logs will be shown, otherwise
            they will be hidden but can be accessed in the Ray dashboard. Defaults to False.
        num_cpus (int, optional): Number of CPUs the user wishes to assign to each
            raylet. By default, this is set based on virtual cores.
        num_gpus (int, optional): Number of GPUs the user wishes to assign to each
            raylet. By default, this is set based on detected GPUs.
    """
    self.port = port
    self.host = host

    try:
        # Try to connect to an existing Ray cluster
        ray.init(
            address=address,
            ignore_reinit_error=True,
            log_to_driver=show_logs,
        )
    except ConnectionError:
        # If connection fails, start a new Ray cluster and serve instance
        ray.init(
            ignore_reinit_error=True,
            log_to_driver=show_logs,
            num_cpus=num_cpus,
            num_gpus=num_gpus,
            include_dashboard=True,
            dashboard_host=dashboard_host,
            dashboard_port=dashboard_port,
        )

    serve_status = serve.status()
    if serve_status.proxies == {}:  # If serve is not running yet
        # TODO: check if the port is already in use if serve is not running yet or
        # check if the port is the same as an existing serve instance if serve is running
        serve.start(http_options=HTTPOptions(port=self.port, host=self.host))

    return self

migrate

migrate()

Run Alembic migrations.

Source code in aana/sdk.py
def migrate(self):
    """Run Alembic migrations."""
    if self.migration_func:
        try:
            self.migration_func(aana_settings)
        except EmptyMigrationsException:
            print(
                "No versions found in the custom migrations. Using default migrations."
            )
            run_alembic_migrations(aana_settings)
    else:
        run_alembic_migrations(aana_settings)

print_app_status

print_app_status(app_name, app_status)

Show the status of the application using simple ASCII formatting.

PARAMETER DESCRIPTION
app_name

The name of the application.

TYPE: str

app_status

The status of the application.

TYPE: ApplicationStatusOverview

Source code in aana/sdk.py
def print_app_status(self, app_name: str, app_status: ApplicationStatusOverview):
    """Show the status of the application using simple ASCII formatting.

    Args:
        app_name (str): The name of the application.
        app_status (ApplicationStatusOverview): The status of the application.
    """

    def print_separator(end="\n"):
        print("=" * 60, end=end)

    def print_header(title):
        print_separator()
        print(title)
        print_separator()

    def print_key_value(key, value, indent=0):
        print(f"{' ' * indent}{key}: {value}")

    if app_status.deployments:
        for deployment_name, deployment_status in app_status.deployments.items():
            print_header(f"{deployment_name} ({app_name})")
            print_key_value("Status", deployment_status.status.value, indent=0)
            print_key_value("Message", deployment_status.message, indent=0)
    print_separator()

add_task_queue

add_task_queue(deploy=False)

Add a task queue deployment.

PARAMETER DESCRIPTION
deploy

If True, the deployment will be deployed immediately, otherwise it will be registered and can be deployed later when deploy() is called. Defaults to False.

TYPE: bool DEFAULT: False

Source code in aana/sdk.py
def add_task_queue(self, deploy: bool = False):
    """Add a task queue deployment.

    Args:
        deploy (bool, optional): If True, the deployment will be deployed immediately,
                otherwise it will be registered and can be deployed later when deploy() is called. Defaults to False.
    """
    from aana.deployments.task_queue_deployment import (
        TaskQueueConfig,
        TaskQueueDeployment,
    )

    task_queue_deployment = TaskQueueDeployment.options(
        num_replicas=1,
        user_config=TaskQueueConfig(
            app_name=self.name,
            retryable_exceptions=self.retryable_exceptions,
        ).model_dump(mode="json"),
    )
    self.register_deployment(
        "task_queue_deployment",
        task_queue_deployment,
        deploy=deploy,
    )

register_deployment

register_deployment(name, instance, deploy=False)

Register a deployment.

PARAMETER DESCRIPTION
name

The name of the deployment.

TYPE: str

instance

The instance of the deployment to be registered.

TYPE: Deployment

deploy

If True, the deployment will be deployed immediately, otherwise it will be registered and can be deployed later when deploy() is called. Defaults to False.

TYPE: bool DEFAULT: False

Source code in aana/sdk.py
def register_deployment(
    self,
    name: str,
    instance: Deployment,
    deploy: bool = False,
):
    """Register a deployment.

    Args:
        name (str): The name of the deployment.
        instance (Deployment): The instance of the deployment to be registered.
        deploy (bool, optional): If True, the deployment will be deployed immediately,
                otherwise it will be registered and can be deployed later when deploy() is called. Defaults to False.
    """
    if deploy:
        try:
            serve.api._run(
                instance.bind(),
                name=name,
                route_prefix=f"/{name}",
                _blocking=False,
            )
            self.wait_for_deployment()
        except FailedDeployment:
            status = serve.status()
            app_status = status.applications[name]
            self.print_app_status(name, app_status)
    else:
        self.deployments[name] = instance

get_deployment_app

get_deployment_app(name)

Get the application instance for the deployment.

PARAMETER DESCRIPTION
name

The name of the deployment.

TYPE: str

RETURNS DESCRIPTION
Application

The application instance for the deployment.

TYPE: Application

RAISES DESCRIPTION
KeyError

If the deployment is not found.

Source code in aana/sdk.py
def get_deployment_app(self, name: str) -> Application:
    """Get the application instance for the deployment.

    Args:
        name (str): The name of the deployment.

    Returns:
        Application: The application instance for the deployment.

    Raises:
        KeyError: If the deployment is not found.
    """
    if name in self.deployments:
        return self.deployments[name].bind()
    else:
        raise KeyError(f"Deployment {name} not found.")  # noqa: TRY003

unregister_deployment

unregister_deployment(name)

Unregister a deployment.

PARAMETER DESCRIPTION
name

The name of the deployment to be unregistered.

TYPE: str

Source code in aana/sdk.py
def unregister_deployment(self, name: str):
    """Unregister a deployment.

    Args:
        name (str): The name of the deployment to be unregistered.
    """
    if name in self.deployments:
        del self.deployments[name]
    serve.delete(name)

get_main_app

get_main_app()

Get the main application instance.

RETURNS DESCRIPTION
Application

The main application instance.

TYPE: Application

Source code in aana/sdk.py
def get_main_app(self) -> Application:
    """Get the main application instance.

    Returns:
        Application: The main application instance.
    """
    return RequestHandler.options(num_replicas=aana_settings.num_workers).bind(
        app_name=self.name,
        endpoints=self.endpoints.values(),
        deployments=list(self.deployments.keys()),
        routers=list(self.routers.values()),
    )

register_endpoint

register_endpoint(name, path, summary, endpoint_cls, admin_required=False, active_subscription_required=False, defer_option=DeferOption.OPTIONAL, event_handlers=None)

Register an endpoint.

PARAMETER DESCRIPTION
name

The name of the endpoint.

TYPE: str

path

The path of the endpoint.

TYPE: str

summary

The summary of the endpoint.

TYPE: str

endpoint_cls

The class of the endpoint.

TYPE: Type[Endpoint]

admin_required

If True, the endpoint requires admin access. Defaults to False.

TYPE: bool DEFAULT: False

active_subscription_required

If True, the endpoint requires an active subscription. Defaults to False.

TYPE: bool DEFAULT: False

defer_option

Defer option for the endpoint (always, never, optional).

TYPE: DeferOption DEFAULT: OPTIONAL

event_handlers

The event handlers to register for the endpoint.

TYPE: list[EventHandler] DEFAULT: None

Source code in aana/sdk.py
def register_endpoint(
    self,
    name: str,
    path: str,
    summary: str,
    endpoint_cls: type[Endpoint],
    admin_required: bool = False,
    active_subscription_required: bool = False,
    defer_option: DeferOption = DeferOption.OPTIONAL,
    event_handlers: list[EventHandler] | None = None,
):
    """Register an endpoint.

    Args:
        name (str): The name of the endpoint.
        path (str): The path of the endpoint.
        summary (str): The summary of the endpoint.
        endpoint_cls (Type[Endpoint]): The class of the endpoint.
        admin_required (bool, optional): If True, the endpoint requires admin access. Defaults to False.
        active_subscription_required (bool, optional): If True, the endpoint requires an active subscription. Defaults to False.
        defer_option (DeferOption): Defer option for the endpoint (always, never, optional).
        event_handlers (list[EventHandler], optional): The event handlers to register for the endpoint.
    """
    endpoint = endpoint_cls(
        name=name,
        path=path,
        summary=summary,
        admin_required=admin_required,
        active_subscription_required=active_subscription_required,
        defer_option=defer_option,
        event_handlers=event_handlers,
    )
    self.endpoints[name] = endpoint

unregister_endpoint

unregister_endpoint(name)

Unregister an endpoint.

PARAMETER DESCRIPTION
name

The name of the endpoint to be unregistered.

TYPE: str

Source code in aana/sdk.py
def unregister_endpoint(self, name: str):
    """Unregister an endpoint.

    Args:
        name (str): The name of the endpoint to be unregistered.
    """
    if name in self.endpoints:
        del self.endpoints[name]

register_router

register_router(name, router)

Register a FastAPI router.

PARAMETER DESCRIPTION
name

The name of the router.

TYPE: str

router

The instance of the APIRouter to be registered.

TYPE: APIRouter

Source code in aana/sdk.py
def register_router(self, name: str, router: APIRouter):
    """Register a FastAPI router.

    Args:
        name (str): The name of the router.
        router (APIRouter): The instance of the APIRouter to be registered.
    """
    self.routers[name] = router

unregister_router

unregister_router(name)

Unregister a FastAPI router.

PARAMETER DESCRIPTION
name

The name of the router to be unregistered.

TYPE: str

Source code in aana/sdk.py
def unregister_router(self, name: str):
    """Unregister a FastAPI router.

    Args:
        name (str): The name of the router to be unregistered.
    """
    if name in self.routers:
        del self.routers[name]

wait_for_deployment

wait_for_deployment()

Wait for the deployment to complete.

Source code in aana/sdk.py
def wait_for_deployment(self):  # noqa: C901
    """Wait for the deployment to complete."""
    consecutive_resource_unavailable = 0
    # Number of consecutive checks before raising an resource unavailable error
    resource_unavailable_threshold = 5

    while True:
        status = serve.status()
        if all(
            application.status == "RUNNING"
            for application in status.applications.values()
        ):
            break
        if any(
            application.status == "DEPLOY_FAILED"
            or application.status == "UNHEALTHY"
            for application in status.applications.values()
        ):
            error_messages = []
            for app_name, app_status in status.applications.items():
                if (
                    app_status.status == "DEPLOY_FAILED"
                    or app_status.status == "UNHEALTHY"
                ):
                    for (
                        deployment_name,
                        deployment_status,
                    ) in app_status.deployments.items():
                        error_messages.append(
                            f"Error: {deployment_name} ({app_name}): {deployment_status.message}"
                        )
            raise FailedDeployment("\n".join(error_messages))

        gcs_address = ray.get_runtime_context().gcs_address
        cluster_status = get_cluster_status(gcs_address)
        demands = (
            cluster_status.resource_demands.cluster_constraint_demand
            + cluster_status.resource_demands.ray_task_actor_demand
            + cluster_status.resource_demands.placement_group_demand
        )

        resource_unavailable = False
        for demand in demands:
            if isinstance(demand, ResourceDemand) and demand.bundles_by_count:
                error_message = f"Error: No available node types can fulfill resource request {demand.bundles_by_count[0].bundle}. "
                if "GPU" in demand.bundles_by_count[0].bundle:
                    error_message += "Might be due to insufficient or misconfigured CPU or GPU resources."
                resource_unavailable = True
            else:
                error_message = f"Error: {demand}"
                resource_unavailable = True

        if resource_unavailable:
            consecutive_resource_unavailable += 1
            if consecutive_resource_unavailable >= resource_unavailable_threshold:
                raise InsufficientResources(error_message)
        else:
            consecutive_resource_unavailable = 0

        time.sleep(1)  # Wait for 1 second before checking again

deploy

deploy(blocking=False, sequential=False)

Deploy the application with the registered endpoints and deployments.

PARAMETER DESCRIPTION
blocking

If True, the function will block until interrupted. Defaults to False.

TYPE: bool DEFAULT: False

sequential

If True, the deployments will be deployed sequentially. Defaults to False.

TYPE: bool DEFAULT: False

Source code in aana/sdk.py
def deploy(self, blocking: bool = False, sequential: bool = False):
    """Deploy the application with the registered endpoints and deployments.

    Args:
        blocking (bool, optional): If True, the function will block until interrupted. Defaults to False.
        sequential (bool, optional): If True, the deployments will be deployed sequentially. Defaults to False.
    """
    try:
        for deployment_name in self.deployments:
            serve.api._run(
                self.get_deployment_app(deployment_name),
                name=deployment_name,
                route_prefix=f"/{deployment_name}",
                _blocking=False,
            )
            if sequential:
                self.wait_for_deployment()

        serve.api._run(
            self.get_main_app(),
            name=self.name,
            route_prefix="/",
            _blocking=False,  # blocking manually after to display the message "Deployed successfully."
        )

        self.wait_for_deployment()

        rprint("[green]Deployed successfully.[/green]")
        rprint(
            f"Documentation is available at "
            f"[link=http://{self.host}:{self.port}/docs]http://{self.host}:{self.port}/docs[/link] and "
            f"[link=http://{self.host}:{self.port}/redoc]http://{self.host}:{self.port}/redoc[/link]"
        )
        while blocking:
            time.sleep(10)
    except KeyboardInterrupt:
        print("Got KeyboardInterrupt, shutting down...")
        serve.shutdown()
        sys.exit()
    except DeploymentException as e:
        status = serve.status()
        serve.shutdown()
        for app_name, app_status in status.applications.items():
            if (
                app_status.status == "DEPLOY_FAILED"
                or app_status.status == "UNHEALTHY"
            ):
                self.print_app_status(app_name, app_status)
        if isinstance(e, InsufficientResources):
            rprint(f"[red] {e} [/red]")
        raise
    except Exception:
        serve.shutdown()
        traceback.print_exc()
        print(
            "Received unexpected error, see console logs for more details. "
            "Shutting down..."
        )
        raise

shutdown

shutdown()

Shutdown the Aana server.

Source code in aana/sdk.py
def shutdown(self):
    """Shutdown the Aana server."""
    serve.shutdown()
    ray.shutdown()

build

build(import_path, host='0.0.0.0', port=8000, app_config_name='app_config', config_name='config')

Build the application configuration file.

Two files will be created: app_config (.py) and config (.yaml).s

PARAMETER DESCRIPTION
import_path

The import path of the application.

TYPE: str

host

The host to run the application on. Defaults to "0.0.0.0".

TYPE: str DEFAULT: '0.0.0.0'

port

The port to run the application on. Defaults to 8000.

TYPE: int DEFAULT: 8000

app_config_name

The name of the application config file. Defaults to "app_config".

TYPE: str DEFAULT: 'app_config'

config_name

The name of the config file. Defaults to "config".

TYPE: str DEFAULT: 'config'

Source code in aana/sdk.py
def build(
    self,
    import_path: str,
    host: str = "0.0.0.0",  # noqa: S104
    port: int = 8000,
    app_config_name: str = "app_config",
    config_name: str = "config",
):
    """Build the application configuration file.

    Two files will be created: app_config (.py) and config (.yaml).s

    Args:
        import_path (str): The import path of the application.
        host (str): The host to run the application on. Defaults to "0.0.0.0".
        port (int): The port to run the application on. Defaults to 8000.
        app_config_name (str): The name of the application config file. Defaults to "app_config".
        config_name (str): The name of the config file. Defaults to "config".
    """
    # Split the import path into module and variable.
    # For example, aana.projects.whisper.app:aana_app will be split into
    # module "aana.projects.whisper.app" and variable "aana_app".
    app_module, app_var = import_path.split(":")

    # Use location of the app module as the output directory
    output_dir = Path(importlib.util.find_spec(app_module).origin).parent

    # Import AanaSDK app from the given import path
    aana_app = import_from_path(import_path)
    if not isinstance(aana_app, AanaSDK):
        raise TypeError(  # noqa: TRY003
            f"Error: {import_path} is not an AanaSDK instance, got {type(aana_app)}"
        )

    # Generate the app config file
    # Example:
    #    from aana.projects.whisper.app import aana_app
    #    asr_deployment = aana_app.get_deployment_app("asr_deployment")
    #    vad_deployment = aana_app.get_deployment_app("vad_deployment")
    #    whisper_app = aana_app.get_main_app()
    app_config = ""
    app_config += f"from {app_module} import {app_var}\n\n"
    for deployment_name in aana_app.deployments:
        app_config += f"{deployment_name} = {app_var}.get_deployment_app('{deployment_name}')\n"
    app_config += f"{aana_app.name} = {app_var}.get_main_app()\n"

    # Output path for the app config file
    app_config_path = output_dir / f"{app_config_name}.py"
    # Import path for the app config file, for example aana.projects.whisper.app_config
    app_config_import_path = f"{app_module.rsplit('.', 1)[0]}.{app_config_name}"

    # Write the app config file
    with app_config_path.open("w") as f:
        f.write(app_config)

    # Build "serve build" command to generate config.yaml
    # For example,
    # serve build aana.projects.whisper.app_config:vad_deployment
    #             aana.projects.whisper.app_config:asr_deployment
    #             aana.projects.whisper.app_config:whisper_app
    #             -o /workspaces/aana_sdk/aana/projects/whisper/config.yaml
    config_path = output_dir / f"{config_name}.yaml"
    serve_options = []
    for deployment_name in aana_app.deployments:
        serve_options.append(f"{app_config_import_path}:{deployment_name}")  # noqa: PERF401
    serve_options += [
        f"{app_config_import_path}:{aana_app.name}",
        "--output-path",
        str(config_path),
        "--app-dir",
        output_dir,
    ]

    # Execute "serve build" with click CliRuuner
    from click.testing import CliRunner
    from ray.serve.scripts import ServeDeploySchemaDumper, build

    result = CliRunner().invoke(build, serve_options)
    if result.exception:
        raise result.exception

    # Update the config file with the host and port and rename apps
    with config_path.open() as f:
        config = yaml.load(f, Loader=yaml.FullLoader)  # noqa: S506

    config["http_options"] = {"host": host, "port": port}

    for app in config["applications"]:
        app["name"] = app["import_path"].split(":")[-1]

    with config_path.open("w") as f:
        yaml.dump(
            config,
            f,
            Dumper=ServeDeploySchemaDumper,
            default_flow_style=False,
            sort_keys=False,
        )

    print(f"App config successfully saved to {app_config_path}")
    print(f"Config successfully saved to {config_path}")