Pipeline Architecture

Overview

The Pipeline Architecture is the core of daolite, providing a flexible way to model complex Adaptive Optics processing chains. It allows you to define components, their dependencies, and compute resources to accurately model the timing behavior of real-world AO systems.

Key Features

  • Dependency Management: Automatically handles dependencies between components

  • Heterogeneous Computing: Support for different compute resources per component

  • Flexible Component Ordering: Define components in any order, dependencies determine execution

  • Visualization: Built-in timing visualization tools

  • Custom Components: Easily add custom processing stages

Pipeline Concepts

The daolite pipeline is based on these fundamental concepts:

Components

A pipeline is composed of individual components, each representing a specific processing step in the AO chain:

from daolite import Pipeline, PipelineComponent, ComponentType
from daolite.pipeline.camera import PCOCamLink

# Create a pipeline component
camera_component = PipelineComponent(
    component_type=ComponentType.CAMERA,
    name="Camera",
    compute=cpu_resource,
    function=PCOCamLink,
    params={
        "n_pixels": 1024*1024,
        "group": 10,
        "readout": "rolling"
    }
)

Component Types

daolite defines standard component types for AO systems:

  • ComponentType.CAMERA: Camera readout and initial data acquisition

  • ComponentType.CALIBRATION: Pixel calibration operations

  • ComponentType.CENTROIDER: Wavefront sensing/centroiding

  • ComponentType.RECONSTRUCTION: Wavefront reconstruction

  • ComponentType.CONTROL: DM control operations

  • ComponentType.NETWORK: Network data transfer

  • ComponentType.TRANSFER: PCIe or other data transfer operations

  • ComponentType.CUSTOM: User-defined custom operations

Dependencies

Components can depend on other components, creating a directed graph of processing:

from daolite.pipeline.centroider import Centroider
import numpy as np

# Create a component that depends on the camera component
n_subaps = 6400
centroid_agenda = np.ones(50, dtype=int) * (n_subaps // 50)

centroider_component = PipelineComponent(
    component_type=ComponentType.CENTROIDER,
    name="Centroider",
    compute=gpu_resource,
    function=Centroider,
    params={
        "centroid_agenda": centroid_agenda,
        "n_pix_per_subap": 16*16
    },
    dependencies=["Camera"]  # This component depends on "Camera"
)

Compute Resources

Each component can use a different compute resource, allowing modeling of heterogeneous systems:

# Define different compute resources
cpu_resource = amd_epyc_7763()  # CPU for camera and control
gpu_resource = nvidia_rtx_4090()  # GPU for centroiding and reconstruction

# Use different resources for different components
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CAMERA,
    name="Camera",
    compute=cpu_resource,  # CPU resource
    # ...other parameters...
))

pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CENTROIDER,
    name="Centroider",
    compute=gpu_resource,  # GPU resource
    # ...other parameters...
))

see also: Compute Resources for more details on compute resources.

Creating a Pipeline

Basic Pipeline

Here’s how to create a basic AO pipeline:

from daolite import Pipeline, PipelineComponent, ComponentType
from daolite.compute import hardware
from daolite.pipeline.camera import PCOCamLink
from daolite.pipeline.centroider import Centroider
from daolite.pipeline.reconstruction import Reconstruction
from daolite.pipeline.control import FullFrameControl
import numpy as np

# Create compute resources
cpu = hardware.amd_epyc_7763()
gpu = hardware.nvidia_rtx_4090()

# System parameters
n_subaps = 6400
n_acts = 81 * 81
n_pixels = 1024 * 1024
n_groups = 50

# Define agendas
centroid_agenda = np.ones(n_groups, dtype=int) * (n_subaps // n_groups)

# Create a new pipeline
pipeline = Pipeline(name="SCAO System")

# Add components in any order
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CAMERA,
    name="Camera",
    compute=cpu,
    function=PCOCamLink,
    params={"n_pixels": n_pixels, "group": n_groups, "readout": "rolling"}
))

pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CENTROIDER,
    name="Centroider",
    compute=gpu,
    function=Centroider,
    params={"centroid_agenda": centroid_agenda, "n_pix_per_subap": 16*16},
    dependencies=["Camera"]
))

pipeline.add_component(PipelineComponent(
    component_type=ComponentType.RECONSTRUCTOR,
    name="Reconstructor",
    compute=gpu,
    function=Reconstruction,
    params={"centroid_agenda": centroid_agenda, "n_acts": n_acts},
    dependencies=["Centroider"]
))

pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CONTROL,
    name="Controller",
    compute=cpu,
    function=FullFrameControl,
    params={"n_acts": n_acts, "operations": ["integration", "offset", "saturation"]},
    dependencies=["Reconstructor"]
))

# Run the pipeline
results = pipeline.run()

# Visualize the results
pipeline.visualize(title="AO Pipeline Timing")

Complex Pipelines

More complex pipelines can include branching, multiple dependencies, and transfer operations:

# Create a pipeline with branching and transfers
pipeline = Pipeline(name="MCAO System")

# Add camera component
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CAMERA,
    name="WFS Camera",
    compute=cpu,
    function=PCOCamLink,
    params={"n_pixels": 1024*1024, "group": 10, "readout": "rolling"}
))

# Add multiple centroiders for different guide stars
for i in range(3):
    centroid_agenda = np.ones(20, dtype=int) * (1024 // 20)
    pipeline.add_component(PipelineComponent(
        component_type=ComponentType.CENTROIDER,
        name=f"Centroider GS{i+1}",
        compute=gpu,
        function=Centroider,
        params={"centroid_agenda": centroid_agenda, "n_pix_per_subap": 16*16},
        dependencies=["WFS Camera"]
    ))

# Add reconstructor that depends on all centroiders
centroid_agenda_recon = np.ones(20, dtype=int) * (3072 // 20)  # Total slopes from 3 GS
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.RECONSTRUCTOR,
    name="Reconstructor",
    compute=gpu,
    function=Reconstruction,
    params={"centroid_agenda": centroid_agenda_recon, "n_acts": 10000},
    dependencies=["Centroider GS1", "Centroider GS2", "Centroider GS3"]
))

# Add DM controller
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CONTROL,
    name="DM Controller",
    compute=cpu,
    function=FullFrameControl,
    params={"n_acts": 10000, "operations": ["integration", "offset", "saturation"]},
    dependencies=["Reconstructor"]
))

Running and Analyzing Pipelines

Running a Pipeline

Once a pipeline is defined, you can run it to calculate timing:

# Run the pipeline with debug information
results = pipeline.run(debug=True)

# Get the total latency
total_latency = pipeline.get_total_latency()
print(f"Total pipeline latency: {total_latency} µs")

# Get component-specific latencies
component_latencies = pipeline.get_component_latencies()
for component, latency in component_latencies.items():
    print(f"{component}: {latency} µs")

Pipeline Visualization

daolite provides built-in visualization tools for pipeline timing:

# Generate a timing visualization
fig = pipeline.visualize(
    title="AO Pipeline Timing",
    xlabel="Time (µs)",
    save_path="pipeline_timing.png",
    figsize=(12, 8)
)

# For more control over the visualization:
from daolite.utils.visualization import create_pipeline_gantt_chart

custom_fig = create_pipeline_gantt_chart(
    pipeline_results=results,
    title="Custom Pipeline Visualization",
    component_colors={
        "Camera": "blue",
        "Centroider": "green",
        "Reconstructor": "red"
    },
    show_critical_path=True
)

Pipeline Summary

Generate a summary of the pipeline performance:

# Print a summary of the pipeline
pipeline.print_summary()

# Get a dictionary with the summary data
summary = pipeline.get_summary()

# Example output:
# {
#   'total_latency': 523.45,
#   'components': {
#     'Camera': {'start': 0.0, 'end': 150.0, 'latency': 150.0, 'resource': 'CPU'},
#     'Centroider': {'start': 150.0, 'end': 320.5, 'latency': 170.5, 'resource': 'GPU'},
#     # ...more components...
#   },
#   'critical_path': ['Camera', 'Centroider', 'Reconstructor', 'Control']
# }

Custom Pipeline Functions

You can create custom functions for use in pipeline components:

def my_custom_processor(compute, n_elements, scale_factor=1.0, debug=False):
    """
    A custom processing function for use in a pipeline component.

    Args:
        compute: The compute resource to use
        n_elements: Number of elements to process
        scale_factor: Optional scaling factor
        debug: Whether to print debug information

    Returns:
        A numpy array with timing information
    """
    # Calculate processing time based on compute resource
    if compute.hardware == "GPU":
        # GPU processing model
        mem_time = (n_elements * 4) / compute.memory_bandwidth
        compute_time = (n_elements * 10) / compute.flops  # 10 ops per element
    else:
        # CPU processing model
        mem_time = (n_elements * 4) / (compute.memory_channels *
                                     compute.memory_width *
                                     compute.memory_frequency / 8)
        compute_time = (n_elements * 10) / (compute.cores *
                                          compute.core_frequency *
                                          compute.flops_per_cycle)

    # Scale by implementation efficiency
    compute_time *= scale_factor

    # Use the limiting factor
    processing_time = max(mem_time, compute_time)

    if debug:
        print(f"Memory time: {mem_time} µs")
        print(f"Compute time: {compute_time} µs")
        print(f"Processing time: {processing_time} µs")

    # Create timing array (single timing entry)
    timing = np.zeros([1, 2])
    timing[0, 1] = processing_time

    return timing

# Use in a pipeline component
pipeline.add_component(PipelineComponent(
    component_type=ComponentType.CUSTOM,
    name="My Custom Component",
    compute=cpu,
    function=my_custom_processor,
    params={
        "n_elements": 10000,
        "scale_factor": 1.2,
        "debug": True
    },
    dependencies=["Previous Component"]
))

Pipeline Examples

Here are some complete examples of different pipeline configurations:

SCAO Pipeline

A complete Single Conjugate Adaptive Optics pipeline:

from daolite import Pipeline, PipelineComponent, ComponentType
from daolite import amd_epyc_7763, nvidia_rtx_4090
from daolite.simulation.camera import simulate_camera_readout
from daolite.pipeline.centroider import weighted_center_of_gravity
from daolite.pipeline.reconstruction import mvm_reconstruction
from daolite.pipeline.control import integrator_control
from daolite.pipeline.transfer import pcie_transfer

# Create compute resources
cpu = amd_epyc_7763()
gpu = nvidia_rtx_4090()

# Create SCAO pipeline
scao = Pipeline(name="SCAO System")

# System parameters
n_subaps = 50 * 50
n_acts = 51 * 51
n_pixels = 800 * 800
n_groups = 50

# Define agendas
centroid_agenda = np.ones(n_groups, dtype=int) * (n_subaps // n_groups)

# Add camera component
scao.add_component(PipelineComponent(
    component_type=ComponentType.CAMERA,
    name="Shack-Hartmann Camera",
    compute=cpu,
    function=PCOCamLink,
    params={
        "n_pixels": n_pixels,
        "group": n_groups,
        "readout": "global"
    }
))

# Add centroider component
scao.add_component(PipelineComponent(
    component_type=ComponentType.CENTROIDER,
    name="Centroider",
    compute=gpu,
    function=Centroider,
    params={
        "centroid_agenda": centroid_agenda,
        "n_pix_per_subap": 16*16
    },
    dependencies=["Shack-Hartmann Camera"]
))

# Add reconstructor component
scao.add_component(PipelineComponent(
    component_type=ComponentType.RECONSTRUCTOR,
    name="Reconstructor",
    compute=gpu,
    function=Reconstruction,
    params={
        "centroid_agenda": centroid_agenda,
        "n_acts": n_acts
    },
    dependencies=["Centroider"]
))

# Add controller component
scao.add_component(PipelineComponent(
    component_type=ComponentType.CONTROL,
    name="Controller",
    compute=cpu,
    function=FullFrameControl,
    params={
        "n_acts": n_acts,
        "operations": ["integration", "offset", "saturation"]
    },
    dependencies=["Reconstructor"]
))

# Run the pipeline
results = scao.run()

# Show the results
scao.visualize(title="SCAO Pipeline Timing")
scao.print_summary()

MCAO Pipeline

A Multi Conjugate Adaptive Optics pipeline with multiple wavefront sensors and deformable mirrors:

from daolite import Pipeline, PipelineComponent, ComponentType
from daolite import amd_epyc_7763, nvidia_a100_80gb
from daolite.simulation.camera import simulate_camera_readout
from daolite.pipeline.centroider import weighted_center_of_gravity
from daolite.pipeline.reconstruction import tomographic_reconstruction
from daolite.pipeline.control import multi_dm_control
from daolite.pipeline.transfer import pcie_transfer

# Create compute resources
cpu = amd_epyc_7763()
gpu = nvidia_a100_80gb()

# Create MCAO pipeline
mcao = Pipeline(name="MCAO System")

# Add multiple WFS cameras
for i in range(5):
    mcao.add_component(PipelineComponent(
        component_type=ComponentType.CAMERA,
        name=f"LGS Camera {i+1}",
        compute=cpu,
        function=PCOCamLink,
        params={
            "n_pixels": 600*600,
            "group": 10,
            "readout": "global"
        }
    ))

    # Add centroider for each camera
    n_subaps = 40 * 40
    centroid_agenda = np.ones(30, dtype=int) * (n_subaps // 30)
    mcao.add_component(PipelineComponent(
        component_type=ComponentType.CENTROIDER,
        name=f"Centroider {i+1}",
        compute=gpu,
        function=Centroider,
        params={
            "centroid_agenda": centroid_agenda,
            "n_pix_per_subap": 15*15
        },
        dependencies=[f"LGS Camera {i+1}"]
    ))

# Add reconstructor that depends on all centroiders
# Combine all slopes
total_slopes = 5 * 40 * 40  # 5 guide stars
combined_agenda = np.ones(30, dtype=int) * (total_slopes // 30)
total_acts = 61*61 + 31*31 + 19*19  # 3 DMs

mcao.add_component(PipelineComponent(
    component_type=ComponentType.RECONSTRUCTOR,
    name="Reconstructor",
    compute=gpu,
    function=Reconstruction,
    params={
        "centroid_agenda": combined_agenda,
        "n_acts": total_acts
    },
    dependencies=[f"Centroider {i+1}" for i in range(5)]
))

# Add controller
mcao.add_component(PipelineComponent(
    component_type=ComponentType.CONTROL,
    name="Controller",
    compute=cpu,
    function=FullFrameControl,
    params={
        "n_acts": total_acts,
        "operations": ["integration", "offset", "saturation"]
    },
    dependencies=["Reconstructor"]
))

# Run the pipeline
results = mcao.run()

# Show the results
mcao.visualize(title="MCAO Pipeline Timing")
mcao.print_summary()

API Reference

For complete API details, see the Pipeline API Reference section.