Skip to content

Server- Pipeline

Lavender Data’s pipeline consists of several components that process your data from storage to application.

Pipeline Overview

There are four main components:

  • Filters: Determine which samples to include/exclude during iteration
  • Categorizers: Group samples into batches based on a specific criterion
  • Collaters: Control how individual samples are combined into batches
  • Preprocessors: Transform batches before they’re returned to your application

Building the Pipeline

Lavender Data allows you to build your own pipeline by defining filters, collaters, and preprocessors.

Module Directory

  1. First, create a directory to store your components.

    Terminal window
    mkdir -p ~/.lavender-data/modules
  2. Then, set the environment variable.

    Terminal window
    export LAVENDER_DATA_MODULES_DIR=~/.lavender-data/modules
  3. After adding or modifying components, restart the Lavender Data server for the changes to take effect.

    Terminal window
    lavender-data server restart

Filters

Filters determine which samples to include or exclude during iteration. To create a filter:

  1. Create a new Python file in your modules directory.

  2. Define a class that inherits from Filter with a unique name.

  3. Implement the filter method that returns a boolean.

    For example, to create a filter that only includes samples with even UIDs:

    modules/filter.py
    from lavender_data.server import Filter
    class UidModFilter(Filter, name="uid_mod"):
    def filter(self, sample: dict, *, mod: int = 2) -> bool:
    """
    Only include samples where uid % mod == 0
    Args:
    sample: The individual sample to filter
    mod: The modulus to use for filtering (default: 2)
    Returns:
    bool: True if the sample should be included, False otherwise
    """
    return sample["uid"] % mod == 0

Categorizers

Categorizers group samples into batches based on a specific criterion. To create a categorizer:

  1. Create a new Python file in your modules directory.

  2. Define a class that inherits from Categorizer with a unique name.

  3. Implement the categorize method that takes a sample and returns a string.

    For example, to create a categorizer that groups samples by aspect ratio:

    modules/categorizer.py
    from lavender_data.server import Categorizer
    class AspectRatioCategorizer(Categorizer, name="aspect_ratio"):
    """
    Categorize samples by aspect ratio
    Args:
    sample: The sample to categorize
    Returns:
    str: The bucket key
    """
    def categorize(self, sample: dict) -> str:
    return f"{sample['width']}x{sample['height']}"

Collaters

Collaters control how individual samples are combined into batches. To create a collater:

  1. Create a new Python file in your modules directory.

  2. Define a class that inherits from Collater with a unique name.

  3. Implement the collate method that takes a list of samples and returns a dictionary.

    For example, to create a simple Python list collater:

    modules/collater.py
    from lavender_data.server import Collater
    class PyListCollater(Collater, name="pylist"):
    def collate(self, samples: list[dict]) -> dict:
    """
    Collate samples into a dictionary of lists
    Args:
    samples: List of samples to collate
    Returns:
    dict: Dictionary with lists for each field
    """
    result = {}
    for key in samples[0].keys():
    result[key] = [sample[key] for sample in samples]
    return result

Preprocessors

Preprocessors transform batches before they’re returned to your application. To create a preprocessor:

  1. Create a new Python file in your modules directory.

  2. Define a class that inherits from Preprocessor with a unique name.

  3. Implement the process method that takes a batch and returns a processed batch.

    For example, to add a new column to each batch:

    modules/preprocessor.py
    from lavender_data.server import Preprocessor
    class AppendNewColumn(Preprocessor, name="append_new_column"):
    def process(self, batch: dict) -> dict:
    """
    Add a new column to the batch
    Args:
    batch: The batch to process
    Returns:
    dict: The processed batch with a new column
    """
    batch["new_column"] = []
    for uid in batch["uid"]:
    batch["new_column"].append(f"{uid}_processed")
    return batch