Skip to content

Endpoint

aana.api.Endpoint

Endpoint(name, path, summary, admin_required=False, active_subscription_required=False, defer_option=DeferOption.OPTIONAL, initialized=False, event_handlers=None)

Class used to represent an endpoint.

ATTRIBUTE DESCRIPTION
name

Name of the endpoint.

TYPE: str

path

Path of the endpoint (e.g. "/video/transcribe").

TYPE: str

summary

Description of the endpoint that will be shown in the API documentation.

TYPE: str

admin_required

Flag indicating if the endpoint requires admin access.

TYPE: bool

active_subscription_required

Flag indicating if the endpoint requires an active subscription.

TYPE: bool

defer_option

Defer option for the endpoint (always, never, optional).

TYPE: DeferOption

event_handlers

The list of event handlers to register for the endpoint.

TYPE: list[EventHandler] | None

initialize

initialize()

Initialize the endpoint.

Redefine this method to add initialization logic for the endpoint (e.g. create a handle to the deployment). Call super().initialize() to ensure the endpoint is initialized.

Example
async def initialize(self):
    await super().initialize()
    self.asr_handle = await AanaDeploymentHandle.create("whisper_deployment")
Source code in aana/api/api_generation.py
async def initialize(self):
    """Initialize the endpoint.

    Redefine this method to add initialization logic for the endpoint (e.g. create a handle to the deployment).
    Call super().initialize() to ensure the endpoint is initialized.

    Example:
        ```python
        async def initialize(self):
            await super().initialize()
            self.asr_handle = await AanaDeploymentHandle.create("whisper_deployment")
        ```
    """
    self.initialized = True

run

run(*args, **kwargs)

The main method of the endpoint that is called when the endpoint receives a request.

Redefine this method to implement the logic of the endpoint.

Example
async def run(self, video: VideoInput) -> WhisperOutput:
    video_obj: Video = await run_remote(download_video)(video_input=video)
    transcription = await self.asr_handle.transcribe(audio=audio)
    return transcription
Source code in aana/api/api_generation.py
async def run(self, *args, **kwargs):
    """The main method of the endpoint that is called when the endpoint receives a request.

    Redefine this method to implement the logic of the endpoint.

    Example:
        ```python
        async def run(self, video: VideoInput) -> WhisperOutput:
            video_obj: Video = await run_remote(download_video)(video_input=video)
            transcription = await self.asr_handle.transcribe(audio=audio)
            return transcription
        ```
    """
    raise NotImplementedError

register

register(app, custom_schemas, event_manager)

Register the endpoint in the FastAPI application.

PARAMETER DESCRIPTION
app

FastAPI application.

TYPE: FastAPI

custom_schemas

Dictionary containing custom schemas.

TYPE: dict[str, dict]

event_manager

Event manager for the application.

TYPE: EventManager

Source code in aana/api/api_generation.py
def register(
    self, app: FastAPI, custom_schemas: dict[str, dict], event_manager: EventManager
):
    """Register the endpoint in the FastAPI application.

    Args:
        app (FastAPI): FastAPI application.
        custom_schemas (dict[str, dict]): Dictionary containing custom schemas.
        event_manager (EventManager): Event manager for the application.
    """
    RequestModel = self.get_request_model()
    ResponseModel = self.get_response_model()

    if self.event_handlers:
        for handler in self.event_handlers:
            event_manager.register_handler_for_events(handler, [self.path])

    route_func = self.__create_endpoint_func(
        RequestModel=RequestModel,
        event_manager=event_manager,
    )

    app.post(
        self.path,
        name=self.name,
        summary=self.summary,
        operation_id=self.name,
        response_model=ResponseModel,
        responses={
            400: {"model": ExceptionResponseModel},
        },
    )(route_func)
    custom_schemas[self.name] = RequestModel.model_json_schema()

get_request_model

get_request_model()

Generate the request Pydantic model for the endpoint.

RETURNS DESCRIPTION
type[BaseModel]

type[BaseModel]: Request Pydantic model.

Source code in aana/api/api_generation.py
def get_request_model(self) -> type[BaseModel]:
    """Generate the request Pydantic model for the endpoint.

    Returns:
        type[BaseModel]: Request Pydantic model.
    """
    model_name = self.__generate_model_name("Request")
    input_fields = self.__get_input_fields()
    return create_model(model_name, **input_fields)

get_response_model

get_response_model()

Generate the response Pydantic model for the endpoint.

RETURNS DESCRIPTION
type[BaseModel]

type[BaseModel]: Response Pydantic model.

Source code in aana/api/api_generation.py
def get_response_model(self) -> type[BaseModel]:
    """Generate the response Pydantic model for the endpoint.

    Returns:
        type[BaseModel]: Response Pydantic model.
    """
    model_name = self.__generate_model_name("Response")
    output_fields = self.__get_output_fields()
    return create_model(
        model_name, **output_fields, __config__=ConfigDict(extra="forbid")
    )

is_streaming_response

is_streaming_response()

Check if the endpoint returns a streaming response.

RETURNS DESCRIPTION
bool

True if the endpoint returns a streaming response, False otherwise.

TYPE: bool

Source code in aana/api/api_generation.py
@classmethod
def is_streaming_response(cls) -> bool:
    """Check if the endpoint returns a streaming response.

    Returns:
        bool: True if the endpoint returns a streaming response, False otherwise.
    """
    return isasyncgenfunction(cls.run)

send_usage_event

send_usage_event(api_key, metric, properties)

Send a usage event to the LAGO API service.

PARAMETER DESCRIPTION
api_key

The API key information.

TYPE: ApiKey

metric

The metric code.

TYPE: str

properties

The properties of the event (usage data, e.g. {"count": 10}).

TYPE: dict

Source code in aana/api/api_generation.py
def send_usage_event(
    self, api_key: ApiKey, metric: str, properties: dict[str, Any]
):
    """Send a usage event to the LAGO API service.

    Args:
        api_key (ApiKey): The API key information.
        metric (str): The metric code.
        properties (dict): The properties of the event (usage data, e.g. {"count": 10}).
    """
    from lago_python_client.client import Client
    from lago_python_client.models import Event
    from tenacity import (
        retry,
        retry_if_exception_type,
        stop_after_attempt,
    )

    @retry(
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type(Exception),
    )
    def send_event_with_retry(client, event):
        return client.events.create(event)

    if not aana_settings.api_service.enabled:
        logger.warning("API service is not enabled. Skipping usage event.")
        return

    if (
        not aana_settings.api_service.lago_url
        or not aana_settings.api_service.lago_api_key
    ):
        logger.warning(
            "LAGO API service URL or API key is not set. Skipping usage event."
        )
        return

    try:
        client = Client(
            api_key=aana_settings.api_service.lago_api_key,
            api_url=aana_settings.api_service.lago_url,
        )

        event = Event(
            transaction_id=str(uuid.uuid4()),
            code=metric,
            external_subscription_id=api_key.subscription_id,
            timestamp=time.time(),
            properties=properties,
        )

        send_event_with_retry(client, event)
    except Exception as e:
        logger.error(
            f"Failed to send usage event after retries: {e}", exc_info=True
        )