Skip to content

Processors

aana.processors.remote

run_remote

run_remote(func)

Wrap a function to run it remotely on Ray.

PARAMETER DESCRIPTION
func

the function to wrap

TYPE: Callable

RETURNS DESCRIPTION
Callable

the wrapped function

TYPE: Callable

Source code in aana/processors/remote.py
def run_remote(func: Callable) -> Callable:
    """Wrap a function to run it remotely on Ray.

    Args:
        func (Callable): the function to wrap

    Returns:
        Callable: the wrapped function
    """

    async def generator_wrapper(*args, **kwargs):
        async for item in ray.remote(func).remote(*args, **kwargs):
            yield await item

    if inspect.isgeneratorfunction(func):
        return generator_wrapper
    else:
        return ray.remote(func).remote

aana.processors.video

extract_audio

extract_audio(video)

Extract the audio file from a Video and return an Audio object.

PARAMETER DESCRIPTION
video

The video file to extract audio.

TYPE: Video

RETURNS DESCRIPTION
Audio

an Audio object containing the extracted audio.

TYPE: Audio

Source code in aana/processors/video.py
def extract_audio(video: Video) -> Audio:
    """Extract the audio file from a Video and return an Audio object.

    Args:
        video (Video): The video file to extract audio.

    Returns:
        Audio: an Audio object containing the extracted audio.
    """
    audio_bytes = load_audio(video.path)

    # Only difference will be in path where WAV file will be stored
    # and in content but has same media_id
    return Audio(
        url=video.url,
        media_id=f"audio_{video.media_id}",
        content=audio_bytes,
        title=video.title,
        description=video.description,
    )

aana.processors.batch

BatchProcessor

BatchProcessor(process_batch, batch_size, num_threads)

Class for parallel processing data in chunks.

The BatchProcessor class encapsulates the logic required to take a large collection of data, split it into manageable batches, process these batches in parallel, and then combine the results into a single cohesive output.

Batching works by iterating through the input request, which is a dictionary where each key maps to a list-like collection of data. The class splits each collection into sublists of length up to batch_size, ensuring that corresponding elements across the collections remain grouped together in their respective batches.

Merging takes the output from each processed batch, which is also a dictionary structure, and combines these into a single dictionary. Lists are extended, numpy arrays are concatenated, and dictionaries are updated. If a new data type is encountered, an error is raised prompting the implementer to specify how these types should be merged.

This class is particularly useful for batching of requests to a machine learning model.

The thread pool for parallel processing is managed internally and is shut down automatically when the BatchProcessor instance is garbage collected.

ATTRIBUTE DESCRIPTION
process_batch

A function to process each batch.

TYPE: Callable

batch_size

The size of each batch to be processed.

TYPE: int

num_threads

The number of threads in the thread pool for parallel processing.

TYPE: int

PARAMETER DESCRIPTION
process_batch

Function that processes each batch.

TYPE: Callable

batch_size

Size of the batches.

TYPE: int

num_threads

Number of threads in the pool.

TYPE: int

Source code in aana/processors/batch.py
def __init__(self, process_batch: Callable, batch_size: int, num_threads: int):
    """Constructor.

    Args:
        process_batch (Callable): Function that processes each batch.
        batch_size (int): Size of the batches.
        num_threads (int): Number of threads in the pool.
    """
    self.process_batch = process_batch
    self.batch_size = batch_size
    self.pool = ThreadPoolExecutor(num_threads)

batch_iterator

batch_iterator(request)

Converts request into an iterator of batches.

Iterates over the input request, breaking it into smaller batches for processing. Each batch is a dictionary with the same keys as the input request, but the values are sublists containing only the elements for that batch.

Example:

request = {
    'images': [img1, img2, img3, img4, img5],
    'texts': ['text1', 'text2', 'text3', 'text4', 'text5']
}
# Assuming a batch size of 2, this iterator would yield:
# 1st iteration: {'images': [img1, img2], 'texts': ['text1', 'text2']}
# 2nd iteration: {'images': [img3, img4], 'texts': ['text3', 'text4']}
# 3rd iteration: {'images': [img5], 'texts': ['text5']}

PARAMETER DESCRIPTION
request

The request data to split into batches.

TYPE: dict[str, list[Any]]

YIELDS DESCRIPTION
dict[str, list[Any]]

Iterator[dict[str, list[Any]]]: An iterator over the batched requests.

Source code in aana/processors/batch.py
def batch_iterator(self, request: dict[str, Any]) -> Iterator[dict[str, list[Any]]]:
    """Converts request into an iterator of batches.

    Iterates over the input request, breaking it into smaller batches for processing.
    Each batch is a dictionary with the same keys as the input request, but the values
    are sublists containing only the elements for that batch.

    Example:
    ```python
    request = {
        'images': [img1, img2, img3, img4, img5],
        'texts': ['text1', 'text2', 'text3', 'text4', 'text5']
    }
    # Assuming a batch size of 2, this iterator would yield:
    # 1st iteration: {'images': [img1, img2], 'texts': ['text1', 'text2']}
    # 2nd iteration: {'images': [img3, img4], 'texts': ['text3', 'text4']}
    # 3rd iteration: {'images': [img5], 'texts': ['text5']}
    ```

    Args:
        request (dict[str, list[Any]]): The request data to split into batches.

    Yields:
        Iterator[dict[str, list[Any]]]: An iterator over the batched requests.
    """
    lengths = [len(value) for value in request.values()]
    if len(set(lengths)) > 1:
        raise ValueError("All inputs must have the same length")  # noqa: TRY003

    total_batches = (max(lengths) + self.batch_size - 1) // self.batch_size
    for i in range(total_batches):
        start = i * self.batch_size
        end = start + self.batch_size
        yield {key: value[start:end] for key, value in request.items()}

process

process(request)

Process a request.

Splits the input request into batches, processes each batch in parallel, and then merges the results into a single dictionary.

PARAMETER DESCRIPTION
request

The request data to process.

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dict[str, Any]: The merged results from processing all batches.

Source code in aana/processors/batch.py
async def process(self, request: dict[str, Any]) -> dict[str, Any]:
    """Process a request.

    Splits the input request into batches, processes each batch in parallel, and then merges
    the results into a single dictionary.

    Args:
        request (Dict[str, Any]): The request data to process.

    Returns:
        Dict[str, Any]: The merged results from processing all batches.
    """
    loop = asyncio.get_running_loop()
    futures = [
        loop.run_in_executor(self.pool, self.process_batch, batch)
        for batch in self.batch_iterator(request)
    ]
    outputs = await asyncio.gather(*futures)
    return self.merge_outputs(outputs)

merge_outputs

merge_outputs(outputs)

Merge output.

Combine processed batch outputs into a single dictionary. It handles various data types by extending lists, updating dictionaries, and concatenating numpy arrays.

Example:

outputs = [
    {'images': [processed_img1, processed_img2], 'labels': ['cat', 'dog']},
    {'images': [processed_img3, processed_img4], 'labels': ['bird', 'mouse']},
    {'images': [processed_img5], 'labels': ['fish']}
]
# The merged result would be:
# {
#     'images': [processed_img1, processed_img2, processed_img3, processed_img4, processed_img5],
#     'labels': ['cat', 'dog', 'bird', 'mouse', 'fish']
# }

PARAMETER DESCRIPTION
outputs

List of outputs from the processed batches.

TYPE: list[dict[str, Any]]

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The merged result.

Source code in aana/processors/batch.py
def merge_outputs(self, outputs: list[dict[str, Any]]) -> dict[str, Any]:
    """Merge output.

    Combine processed batch outputs into a single dictionary. It handles various data types
    by extending lists, updating dictionaries, and concatenating numpy arrays.

    Example:
    ```python
    outputs = [
        {'images': [processed_img1, processed_img2], 'labels': ['cat', 'dog']},
        {'images': [processed_img3, processed_img4], 'labels': ['bird', 'mouse']},
        {'images': [processed_img5], 'labels': ['fish']}
    ]
    # The merged result would be:
    # {
    #     'images': [processed_img1, processed_img2, processed_img3, processed_img4, processed_img5],
    #     'labels': ['cat', 'dog', 'bird', 'mouse', 'fish']
    # }
    ```

    Args:
        outputs (list[dict[str, Any]]): List of outputs from the processed batches.

    Returns:
        dict[str, Any]: The merged result.
    """
    merged_output = {}
    for output in outputs:
        for key, value in output.items():
            if key not in merged_output:
                merged_output[key] = value
            else:
                if isinstance(value, list):
                    merged_output[key].extend(value)
                elif isinstance(value, dict):
                    merged_output[key].update(value)
                elif isinstance(value, np.ndarray):
                    if key in merged_output:
                        merged_output[key] = np.concatenate(
                            (merged_output[key], value)
                        )
                    else:
                        merged_output[key] = value
                else:
                    raise NotImplementedError(
                        "Merging of this data type is not implemented"
                    )
    return merged_output

aana.processors.speaker.PostProcessingForDiarizedAsr

Class to handle post-processing for diarized ASR output by combining diarization and transcription segments.

The post-processing involves assigning speaker labels to transcription segments and words, aligning speakers with punctuation, optionally merging homogeneous speaker segments, and reassigning confidence information to the segments.

ATTRIBUTE DESCRIPTION
diarized_segments

Contains speaker diarization segments.

TYPE: list[SpeakerDiarizationSegment]

transcription_segments

Transcription segments.

TYPE: list[AsrSegment]

merge

Whether to merge the same speaker segments in the final output.

TYPE: bool

process

process(diarized_segments, transcription_segments, merge=False)

Executes the post-processing pipeline that combines diarization and transcription segments.

This method performs the following steps: 1. Assign speaker labels to each segment and word in the transcription based on the diarization output. 2. Align speakers with punctuation. 3. Create new transcription segments by combining the speaker-labeled words. 4. Optionally, merge consecutive speaker segments. 5. Add confidence and no_speech_confidence to the new segments.

PARAMETER DESCRIPTION
diarized_segments

Contains speaker diarization segments.

TYPE: list[SpeakerDiarizationSegment]

transcription_segments

Transcription segments.

TYPE: list[AsrSegment]

merge

If True, merges consecutive speaker segments in the final output. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[AsrSegment]

list[AsrSegment]: Updated transcription segments with speaker information per segment and word.

Source code in aana/processors/speaker.py
@classmethod
def process(
    cls,
    diarized_segments: list[SpeakerDiarizationSegment],
    transcription_segments: list[AsrSegment],
    merge: bool = False,
) -> list[AsrSegment]:
    """Executes the post-processing pipeline that combines diarization and transcription segments.

    This method performs the following steps:
    1. Assign speaker labels to each segment and word in the transcription based on the diarization output.
    2. Align speakers with punctuation.
    3. Create new transcription segments by combining the speaker-labeled words.
    4. Optionally, merge consecutive speaker segments.
    5. Add confidence and no_speech_confidence to the new segments.

    Args:
        diarized_segments (list[SpeakerDiarizationSegment]): Contains speaker diarization segments.
        transcription_segments (list[AsrSegment]): Transcription segments.
        merge (bool): If True, merges consecutive speaker segments in the final output. Defaults to False.

    Returns:
        list[AsrSegment]: Updated transcription segments with speaker information per segment and word.
    """
    # intro: validation checks
    if not transcription_segments or not diarized_segments:
        return transcription_segments

    # Check if inputs are valid:
    for segment in transcription_segments:
        if segment.text and not segment.words:
            raise ValueError("Word-level timestamps are required for diarized ASR.")  # noqa: TRY003

    # 1. Assign speaker labels to each segment and word
    speaker_labelled_transcription = cls._assign_word_speakers(
        diarized_segments, transcription_segments
    )

    # 2. Align speakers with punctuation
    word_speaker_mapping = cls._align_with_punctuation(
        speaker_labelled_transcription
    )

    # 3. Create new transcription segments with speaker information
    segments = cls._create_speaker_segments(word_speaker_mapping)

    # 4. Assign confidence variables to the new segments
    segments = cls._add_segment_variables(segments, transcription_segments)

    # Optional: Merge consecutive speaker segments
    if merge:
        segments = cls._combine_homeogeneous_speaker_asr_segments(segments)

    return segments