Skip to content

Integrations

aana.integrations.haystack

AanaDeploymentComponent

AanaDeploymentComponent(deployment_handle, method_name)

Wrapper for Aana deployments to run as HayStack Components.

Example
deployment_handle = await AanaDeploymentHandle.create("my_deployment")
haystack_component = AanaDeploymentComponent(deployment_handle, "my_method")
haystack_component.warm_up()  # This is currently a no-op, but subject to change.
component_result = haystack_component.run(my_input_prompt="This is an input prompt")
PARAMETER DESCRIPTION
deployment_handle

the Aana Ray deployment to be wrapped (must be a class Deployment)

TYPE: AanaDeploymentHandle

method_name

the name of the method on the deployment to call inside the component's run() method.

TYPE: str

Source code in aana/integrations/haystack/deployment_component.py
def __init__(self, deployment_handle: AanaDeploymentHandle, method_name: str):
    """Constructor.

    Arguments:
        deployment_handle (AanaDeploymentHandle): the Aana Ray deployment to be wrapped (must be a class Deployment)
        method_name (str): the name of the method on the deployment to call inside the component's `run()` method.
    """
    self._deployment_handle = deployment_handle

    # Determine input and output types for `run()`
    # Will raise if the function is not defined (e.g. if you pass a function deployment)
    self.run_method = self._get_method(method_name)
    if not self.run_method:
        raise AttributeError(name=method_name, obj=self._deployment_handle)
    hints = get_type_hints(self.run_method)
    input_types, output_types = typehints_to_component_types(hints)
    # The functions `set_input_types()` and `set_output_types()`
    # take an positional instance argument and keyword arguments
    component.set_input_types(self, **input_types)
    component.set_output_types(self, **output_types)

warm_up

warm_up()

Warms up the deployment to a ready state.

As we run off an existing deployment handle, this is currently a no-op.

Source code in aana/integrations/haystack/deployment_component.py
def warm_up(self):
    """Warms up the deployment to a ready state.

    As we run off an existing deployment handle, this is currently a no-op.
    """
    self._warm = True

run

run(*args, **kwargs)

Run the component. This is the primary interface for Haystack Components.

PARAMETER DESCRIPTION
*args

the arguments to pass to the deployment run function

DEFAULT: ()

**kwargs

the keyword arguments to pass to the deployment run function

DEFAULT: {}

RETURNS DESCRIPTION

The return value of the deployment's run function

Source code in aana/integrations/haystack/deployment_component.py
def run(self, *args, **kwargs):
    """Run the component. This is the primary interface for Haystack Components.

    Arguments:
        *args: the arguments to pass to the deployment run function
        **kwargs: the keyword arguments to pass to the deployment run function

    Returns:
        The return value of the deployment's run function
    """
    # Function may (must?) be a coroutine. Resolve it if so.
    return run_async(self._call(*args, **kwargs))

aana.integrations.external.av

pyAVWrapper

Bases: AbstractAudioLibrary

Class for audio handling using PyAV library.

read_file

read_file(path, sample_rate=16000)

Read an audio file from path and return it as a numpy array.

PARAMETER DESCRIPTION
path

The path of the file to read.

TYPE: Path

sample_rate

sample rate of the audio, default is 16000.

TYPE: int DEFAULT: 16000

RETURNS DESCRIPTION
ndarray

np.ndarray: The audio file as a numpy array.

Source code in aana/integrations/external/av.py
@classmethod
def read_file(cls, path: Path, sample_rate: int = 16000) -> np.ndarray:
    """Read an audio file from path and return it as a numpy array.

    Args:
        path (Path): The path of the file to read.
        sample_rate (int): sample rate of the audio, default is 16000.

    Returns:
        np.ndarray: The audio file as a numpy array.
    """
    resampler = av.audio.resampler.AudioResampler(
        format="s16",
        layout="mono",
        rate=sample_rate,
    )

    raw_buffer = io.BytesIO()
    dtype = None

    with av.open(str(path), mode="r", metadata_errors="ignore") as container:
        frames = container.decode(audio=0)
        frames = ignore_invalid_frames(frames)
        frames = group_frames(frames, 500000)
        frames = resample_frames(frames, resampler)

        for frame in frames:
            array = frame.to_ndarray()
            dtype = array.dtype
            raw_buffer.write(array)

    # It appears that some objects related to the resampler are not freed
    # unless the garbage collector is manually run.
    del resampler
    gc.collect()

    audio = np.frombuffer(raw_buffer.getbuffer(), dtype=dtype)
    # Convert s16 back to f32.
    audio = audio.astype(np.float32) / 32768.0
    return audio

read_from_bytes

read_from_bytes(content, sample_rate=16000)

Read audio bytes and return as a numpy array.

PARAMETER DESCRIPTION
content

The content of the file to read.

TYPE: bytes

sample_rate

sample rate of the audio, default is 16000.

TYPE: int DEFAULT: 16000

RETURNS DESCRIPTION
ndarray

np.ndarray: The file as a numpy array.

Source code in aana/integrations/external/av.py
@classmethod
def read_from_bytes(cls, content: bytes, sample_rate: int = 16000) -> np.ndarray:
    """Read audio bytes and return as a numpy array.

    Args:
        content (bytes): The content of the file to read.
        sample_rate (int): sample rate of the audio, default is 16000.

    Returns:
        np.ndarray: The file as a numpy array.
    """
    # Create an in-memory file-like object
    content_io = io.BytesIO(content)

    resampler = av.audio.resampler.AudioResampler(
        format="s16",
        layout="mono",
        rate=sample_rate,
    )

    raw_buffer = io.BytesIO()
    dtype = None

    with av.open(content_io, mode="r", metadata_errors="ignore") as container:
        frames = container.decode(audio=0)
        frames = ignore_invalid_frames(frames)
        frames = group_frames(frames, 500000)
        frames = resample_frames(frames, resampler)

        for frame in frames:
            array = frame.to_ndarray()
            dtype = array.dtype
            raw_buffer.write(array)

    # It appears that some objects related to the resampler are not freed
    # unless the garbage collector is manually run.
    del resampler
    gc.collect()

    audio = np.frombuffer(raw_buffer.getbuffer(), dtype=dtype)
    # Convert s16 back to f32.
    audio = audio.astype(np.float32) / 32768.0
    return audio

write_file

write_file(path, audio, sample_rate=16000)

Write an audio file in wav format to the path from numpy array.

PARAMETER DESCRIPTION
path

The path of the file to write.

TYPE: Path

audio

The audio to write.

TYPE: ndarray

sample_rate

The sample rate of the audio to save, default is 16000.

TYPE: int DEFAULT: 16000

Source code in aana/integrations/external/av.py
@classmethod
def write_file(cls, path: Path, audio: np.ndarray, sample_rate: int = 16000):
    """Write an audio file in wav format to the path from numpy array.

    Args:
        path (Path): The path of the file to write.
        audio (np.ndarray): The audio to write.
        sample_rate (int): The sample rate of the audio to save, default is 16000.
    """
    audio = (audio * 32768.0).astype(np.int16)
    # Create an AV container
    container = av.open(str(path), "w", format="wav")
    # Add an audio stream
    stream = container.add_stream("pcm_s16le", rate=sample_rate)
    stream.channels = 1
    # Write audio frames to the stream
    for frame in av.AudioFrame.from_ndarray(
        audio, format="s16", layout="mono", rate=sample_rate
    ):
        for packet in stream.encode(frame):
            container.mux(packet)
    for packet in stream.encode(None):
        container.mux(packet)
    container.close()

write_to_bytes

write_to_bytes(audio)

Write bytes using the audio library from numpy array.

PARAMETER DESCRIPTION
audio

The audio to write.

TYPE: ndarray

RETURNS DESCRIPTION
bytes

The audio as bytes.

TYPE: bytes

Source code in aana/integrations/external/av.py
@classmethod
def write_to_bytes(cls, audio: np.ndarray) -> bytes:
    """Write bytes using the audio library from numpy array.

    Args:
        audio (np.ndarray): The audio to write.

    Returns:
        bytes: The audio as bytes.
    """
    frame = av.AudioFrame(format="s16", layout="mono", samples=len(audio))
    frame.planes[0].update(audio.astype(np.int16).tobytes())
    return frame.planes[0].to_bytes()

write_audio_bytes

write_audio_bytes(path, audio, sample_rate=16000)

Write an audio file in wav format to path from the normalized audio bytes.

PARAMETER DESCRIPTION
path

The path of the file to write.

TYPE: Path

audio

The audio to in 16-bit PCM byte write.

TYPE: bytes

sample_rate

The sample rate of the audio, default is 16000.

TYPE: int DEFAULT: 16000

Source code in aana/integrations/external/av.py
@classmethod
def write_audio_bytes(cls, path: Path, audio: bytes, sample_rate: int = 16000):
    """Write an audio file in wav format to path from the normalized audio bytes.

    Args:
        path (Path): The path of the file to write.
        audio (bytes): The audio to in 16-bit PCM byte write.
        sample_rate (int): The sample rate of the audio, default is 16000.
    """
    with wave.open(str(path), "wb") as wav_file:
        wav_file.setnchannels(1)  # Mono audio
        wav_file.setsampwidth(2)  # 16-bit audio
        wav_file.setframerate(sample_rate)  # Sample rate
        wav_file.writeframes(audio)

load_audio

load_audio(file, sample_rate=16000)

Open an audio file and read as mono waveform, resampling as necessary.

PARAMETER DESCRIPTION
file

The audio/video file to open.

TYPE: Path

sample_rate

The sample rate to resample the audio if necessary.

TYPE: int DEFAULT: 16000

RETURNS DESCRIPTION
bytes

The content of the audio as bytes.

TYPE: bytes

RAISES DESCRIPTION
RuntimeError

if ffmpeg fails to convert and load the audio.

Source code in aana/integrations/external/av.py
def load_audio(file: Path, sample_rate: int = 16000) -> bytes:
    """Open an audio file and read as mono waveform, resampling as necessary.

    Args:
        file (Path): The audio/video file to open.
        sample_rate (int): The sample rate to resample the audio if necessary.

    Returns:
        bytes: The content of the audio as bytes.

    Raises:
        RuntimeError: if ffmpeg fails to convert and load the audio.
    """
    resampler = av.audio.resampler.AudioResampler(
        format="s16",
        layout="mono",
        rate=sample_rate,
    )

    raw_buffer = io.BytesIO()

    # Try loading audio and check for empty audio in one shot.
    try:
        with av.open(str(file), mode="r", metadata_errors="ignore") as container:
            # check for empty audio
            if container.streams.audio == tuple():
                return b""

            frames = container.decode(audio=0)
            frames = ignore_invalid_frames(frames)
            frames = group_frames(frames, 500000)
            frames = resample_frames(frames, resampler)

            for frame in frames:
                array = frame.to_ndarray()
                raw_buffer.write(array)

        # It appears that some objects related to the resampler are not freed
        # unless the garbage collector is manually run.
        del resampler
        gc.collect()

        return raw_buffer.getvalue()

    except Exception as e:
        raise RuntimeError(f"{e!s}") from e

ignore_invalid_frames

ignore_invalid_frames(frames)

Filter out invalid frames from the input generator.

PARAMETER DESCRIPTION
frames

The input generator of frames.

TYPE: Generator

YIELDS DESCRIPTION
Generator

av.audio.frame.AudioFrame: Valid audio frames.

RAISES DESCRIPTION
StopIteration

When the input generator is exhausted.

Source code in aana/integrations/external/av.py
def ignore_invalid_frames(frames: Generator) -> Generator:
    """Filter out invalid frames from the input generator.

    Args:
        frames (Generator): The input generator of frames.

    Yields:
        av.audio.frame.AudioFrame: Valid audio frames.

    Raises:
        StopIteration: When the input generator is exhausted.
    """
    iterator = iter(frames)

    while True:
        try:
            yield next(iterator)
        except StopIteration:  # noqa: PERF203
            break
        except av.error.InvalidDataError:
            continue

group_frames

group_frames(frames, num_samples=None)

Group audio frames and yield groups of frames based on the specified number of samples.

PARAMETER DESCRIPTION
frames

The input generator of audio frames.

TYPE: Generator

num_samples

The target number of samples for each group.

TYPE: int | None DEFAULT: None

YIELDS DESCRIPTION
Generator

av.audio.frame.AudioFrame: Grouped audio frames.

Source code in aana/integrations/external/av.py
def group_frames(frames: Generator, num_samples: int | None = None) -> Generator:
    """Group audio frames and yield groups of frames based on the specified number of samples.

    Args:
        frames (Generator): The input generator of audio frames.
        num_samples (int | None): The target number of samples for each group.

    Yields:
        av.audio.frame.AudioFrame: Grouped audio frames.
    """
    fifo = av.audio.fifo.AudioFifo()

    for frame in frames:
        frame.pts = None  # Ignore timestamp check.
        fifo.write(frame)

        if num_samples is not None and fifo.samples >= num_samples:
            yield fifo.read()

    if fifo.samples > 0:
        yield fifo.read()

resample_frames

resample_frames(frames, resampler)

Resample audio frames using the provided resampler.

PARAMETER DESCRIPTION
frames

The input generator of audio frames.

TYPE: Generator

resampler

The audio resampler.

YIELDS DESCRIPTION
Generator

av.audio.frame.AudioFrame: Resampled audio frames.

Source code in aana/integrations/external/av.py
def resample_frames(frames: Generator, resampler) -> Generator:
    """Resample audio frames using the provided resampler.

    Args:
        frames (Generator): The input generator of audio frames.
        resampler: The audio resampler.

    Yields:
        av.audio.frame.AudioFrame: Resampled audio frames.
    """
    # Add None to flush the resampler.
    for frame in itertools.chain(frames, [None]):
        yield from resampler.resample(frame)

aana.integrations.external.decord

FramesDict

Bases: TypedDict

Represents a set of frames with ids, timestamps and total duration.

ATTRIBUTE DESCRIPTION
frames

the extracted frames

TYPE: list[Image]

timestamps

the timestamps of the extracted frames

TYPE: list[float]

duration

the total duration of the video

TYPE: float

frame_ids

the ids of the extracted frames

TYPE: list[int]

extract_frames

extract_frames(video, params)

Extract frames from a video using decord.

PARAMETER DESCRIPTION
video

the video to extract frames from

TYPE: Video

params

the parameters of the video extraction

TYPE: VideoParams

RETURNS DESCRIPTION
FramesDict

a dictionary containing the extracted frames, frame_ids, timestamps, and duration

TYPE: FramesDict

Source code in aana/integrations/external/decord.py
def extract_frames(video: Video, params: VideoParams) -> FramesDict:
    """Extract frames from a video using decord.

    Args:
        video (Video): the video to extract frames from
        params (VideoParams): the parameters of the video extraction

    Returns:
        FramesDict: a dictionary containing the extracted frames, frame_ids, timestamps, and duration
    """
    device = decord.cpu(0)
    num_threads = 1  # TODO: see if we can use more threads

    num_fps: float = params.extract_fps
    try:
        video_reader = decord.VideoReader(
            str(video.path), ctx=device, num_threads=num_threads
        )
    except DECORDError as video_reader_exception:
        try:
            audio_reader = decord.AudioReader(str(video.path), ctx=device)
            return FramesDict(
                frames=[],
                timestamps=[],
                duration=audio_reader.duration(),
                frame_ids=[],
            )
        except DECORDError:
            raise VideoReadingException(video) from video_reader_exception

    video_fps = video_reader.get_avg_fps()
    num_frames = len(video_reader)
    duration = num_frames / video_fps

    if params.fast_mode_enabled:
        indexes = video_reader.get_key_indices()
    else:
        # num_fps can be smaller than 1 (e.g. 0.5 means 1 frame every 2 seconds)
        indexes = np.arange(0, num_frames, int(video_fps / num_fps))
    timestamps = video_reader.get_frame_timestamp(indexes)[:, 0].tolist()

    frames_array = video_reader.get_batch(indexes).asnumpy()
    frames = []
    for frame_id, frame in enumerate(frames_array):
        img = Image(numpy=frame, media_id=f"{video.media_id}_frame_{frame_id}")
        frames.append(img)

    return FramesDict(
        frames=frames,
        timestamps=timestamps,
        duration=duration,
        frame_ids=list(range(len(frames))),
    )

get_video_duration

get_video_duration(video)

Extract video duration using decord.

PARAMETER DESCRIPTION
video

the video to get its duration

TYPE: Video

RETURNS DESCRIPTION
float

duration of the video

TYPE: float

RAISES DESCRIPTION
VideoReadingException

if the file is not readable or a valid multimedia file

Source code in aana/integrations/external/decord.py
def get_video_duration(video: Video) -> float:
    """Extract video duration using decord.

    Args:
        video (Video): the video to get its duration

    Returns:
        float: duration of the video

    Raises:
        VideoReadingException: if the file is not readable or a valid multimedia file
    """
    device = decord.cpu(0)
    try:
        video_reader = decord.VideoReader(str(video.path), ctx=device, num_threads=1)
    except DECORDError as video_reader_exception:
        try:
            audio_reader = decord.AudioReader(str(video.path), ctx=device)
            return audio_reader.duration()
        except DECORDError:
            raise VideoReadingException(video) from video_reader_exception

    video_fps = video_reader.get_avg_fps()
    num_frames = len(video_reader)
    duration = num_frames / video_fps
    return duration

generate_frames

generate_frames(video, params, batch_size=8)

Generate frames from a video using decord.

PARAMETER DESCRIPTION
video

the video to extract frames from

TYPE: Video

params

the parameters of the video extraction

TYPE: VideoParams

batch_size

the number of frames to yield at each iteration

TYPE: int DEFAULT: 8

YIELDS DESCRIPTION
FramesDict

a dictionary containing the extracted frames, frame ids, timestamps, and duration for each batch

TYPE:: FramesDict

Raises: VideoReadingException: if the file is not readable or a valid multimedia file

Source code in aana/integrations/external/decord.py
def generate_frames(
    video: Video, params: VideoParams, batch_size: int = 8
) -> Generator[FramesDict, None, None]:
    """Generate frames from a video using decord.

    Args:
        video (Video): the video to extract frames from
        params (VideoParams): the parameters of the video extraction
        batch_size (int): the number of frames to yield at each iteration

    Yields:
        FramesDict: a dictionary containing the extracted frames, frame ids, timestamps,
                    and duration for each batch
    Raises:
        VideoReadingException: if the file is not readable or a valid multimedia file
    """
    device = decord.cpu(0)
    num_threads = 1  # TODO: see if we can use more threads

    num_fps: float = params.extract_fps
    is_audio_only = False
    try:
        video_reader = decord.VideoReader(
            str(video.path), ctx=device, num_threads=num_threads
        )
    except DECORDError as video_reader_exception:
        try:
            audio_reader = decord.AudioReader(str(video.path), ctx=device)
            is_audio_only = True
            yield FramesDict(
                frames=[],
                timestamps=[],
                duration=audio_reader.duration(),
                frame_ids=[],
            )

        except DECORDError:
            raise VideoReadingException(video) from video_reader_exception

    if is_audio_only:
        return

    video_fps = video_reader.get_avg_fps()
    num_frames = len(video_reader)
    duration = num_frames / video_fps

    if params.fast_mode_enabled:
        indexes = video_reader.get_key_indices()
    else:
        # num_fps can be smaller than 1 (e.g. 0.5 means 1 frame every 2 seconds)
        indexes = np.arange(0, num_frames, int(video_fps / num_fps))
    timestamps = video_reader.get_frame_timestamp(indexes)[:, 0].tolist()

    for i in range(0, len(indexes), batch_size):
        batch = indexes[i : i + batch_size]
        batch_frames_array = video_reader.get_batch(batch).asnumpy()
        batch_frames = []
        for frame_id, frame in enumerate(batch_frames_array):
            img = Image(numpy=frame, media_id=f"{video.media_id}_frame_{i+frame_id}")
            batch_frames.append(img)

        batch_timestamps = timestamps[i : i + batch_size]
        yield FramesDict(
            frames=batch_frames,
            frame_ids=list(range(i, i + len(batch_frames))),
            timestamps=batch_timestamps,
            duration=duration,
        )

is_audio

is_audio(path)

Checks if it's a valid audio.

Source code in aana/integrations/external/decord.py
def is_audio(path: Path) -> bool:
    """Checks if it's a valid audio."""
    try:
        decord.AudioReader(str(path))
    except DECORDError:
        return False
    return True

aana.integrations.external.opencv

OpenCVWrapper

Bases: AbstractImageLibrary

Wrapper class for OpenCV functions.

read_file

read_file(path)

Read a file using OpenCV.

PARAMETER DESCRIPTION
path

The path of the file to read.

TYPE: Path

RETURNS DESCRIPTION
ndarray

np.ndarray: The file as a numpy array in RGB format.

Source code in aana/integrations/external/opencv.py
@classmethod
def read_file(cls, path: Path) -> np.ndarray:
    """Read a file using OpenCV.

    Args:
        path (Path): The path of the file to read.

    Returns:
        np.ndarray: The file as a numpy array in RGB format.
    """
    img = cv2.imread(str(path))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

read_from_bytes

read_from_bytes(content)

Read bytes using OpenCV.

PARAMETER DESCRIPTION
content

The content of the file to read.

TYPE: bytes

RETURNS DESCRIPTION
ndarray

np.ndarray: The file as a numpy array in RGB format.

Source code in aana/integrations/external/opencv.py
@classmethod
def read_from_bytes(cls, content: bytes) -> np.ndarray:
    """Read bytes using OpenCV.

    Args:
        content (bytes): The content of the file to read.

    Returns:
        np.ndarray: The file as a numpy array in RGB format.
    """
    img = cv2.imdecode(np.frombuffer(content, np.uint8), cv2.IMREAD_COLOR)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

write_file

write_file(path, img)

Write a file using OpenCV.

PARAMETER DESCRIPTION
path

The path of the file to write.

TYPE: Path

img

The image to write.

TYPE: ndarray

Source code in aana/integrations/external/opencv.py
@classmethod
def write_file(cls, path: Path, img: np.ndarray):
    """Write a file using OpenCV.

    Args:
        path (Path): The path of the file to write.
        img (np.ndarray): The image to write.
    """
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    cv2.imwrite(str(path), img)

write_to_bytes

write_to_bytes(img)

Write bytes using OpenCV.

PARAMETER DESCRIPTION
img

The image to write.

TYPE: ndarray

RETURNS DESCRIPTION
bytes

The image as bytes.

TYPE: bytes

Source code in aana/integrations/external/opencv.py
@classmethod
def write_to_bytes(cls, img: np.ndarray) -> bytes:
    """Write bytes using OpenCV.

    Args:
        img (np.ndarray): The image to write.

    Returns:
        bytes: The image as bytes.
    """
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    _, buffer = cv2.imencode(".bmp", img)
    return buffer.tobytes()

aana.integrations.external.yt_dlp

get_video_metadata

get_video_metadata(video_url)

Fetch video's metadata for a url.

PARAMETER DESCRIPTION
video_url

the video input url

TYPE: str

RETURNS DESCRIPTION
metadata

the metadata of the video

TYPE: VideoMetadata

RAISES DESCRIPTION
DownloadException

Request does not succeed.

Source code in aana/integrations/external/yt_dlp.py
def get_video_metadata(video_url: str) -> VideoMetadata:
    """Fetch video's metadata for a url.

    Args:
        video_url (str): the video input url

    Returns:
        metadata (VideoMetadata): the metadata of the video

    Raises:
        DownloadException: Request does not succeed.
    """
    ydl_options = {
        "extract_flat": True,
        "hls_prefer_native": True,
        "extractor_args": {"youtube": {"skip": ["hls", "dash"]}},
    }
    try:
        with yt_dlp.YoutubeDL(ydl_options) as ydl:
            info = ydl.extract_info(video_url, download=False)
            title = info.get("title", "")
            description = info.get("description", "")
            duration = info.get("duration")
            return VideoMetadata(
                title=title,
                description=description,
                duration=duration,
            )
    except DownloadError as e:
        error_message = e.msg.split(";")[0]
        raise DownloadException(url=video_url, msg=error_message) from e

download_video

download_video(video_input)

Downloads videos for a VideoInput object.

PARAMETER DESCRIPTION
video_input

the video input to download

TYPE: VideoInput

RETURNS DESCRIPTION
Video

the video object

TYPE: Video

RAISES DESCRIPTION
DownloadException

Request does not succeed.

Source code in aana/integrations/external/yt_dlp.py
def download_video(video_input: VideoInput | Video) -> Video:
    """Downloads videos for a VideoInput object.

    Args:
        video_input (VideoInput): the video input to download

    Returns:
        Video: the video object

    Raises:
        DownloadException: Request does not succeed.
    """
    if isinstance(video_input, Video):
        return video_input
    if video_input.url is not None:
        video_dir = settings.video_dir
        url_hash = hashlib.md5(
            video_input.url.encode(), usedforsecurity=False
        ).hexdigest()

        # we use yt_dlp to download the video
        # it works not only for youtube videos, but also for other websites and direct links
        ydl_options = {
            "outtmpl": f"{video_dir}/{url_hash}.%(ext)s",
            "extract_flat": True,
            "hls_prefer_native": True,
            "extractor_args": {"youtube": {"skip": ["hls", "dash"]}},
        }
        try:
            with yt_dlp.YoutubeDL(ydl_options) as ydl:
                info = ydl.extract_info(video_input.url, download=False)
                title = info.get("title", "")
                description = info.get("description", "")
                path = Path(ydl.prepare_filename(info))
                if not path.exists():
                    ydl.download([video_input.url])
                if not path.exists():
                    raise DownloadException(video_input.url)
                return Video(
                    path=path,
                    url=video_input.url,
                    media_id=video_input.media_id,
                    title=title,
                    description=description,
                )
        except DownloadError as e:
            # removes the yt-dlp request to file an issue
            error_message = e.msg.split(";")[0]
            raise DownloadException(url=video_input.url, msg=error_message) from e
    else:
        return video_input.convert_input_to_object()