Coverage for orchestr_ant_ion / pipeline / types.py: 100%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""Shared data structures for monitoring pipelines."""
3from __future__ import annotations
5from dataclasses import dataclass
6from enum import Enum
7from typing import TYPE_CHECKING
10if TYPE_CHECKING:
11 from collections import deque
14RESOLUTION_MIN = 64
15RESOLUTION_MAX = 7680
16FPS_MIN = 1
17FPS_MAX = 240
18DEVICE_INDEX_MIN = 0
21class CaptureBackend(Enum):
22 """Video capture backend options."""
24 OPENCV = "opencv"
25 GSTREAMER = "gstreamer"
28@dataclass
29class CameraConfig:
30 """Camera configuration settings with validation."""
32 device_index: int = 0
33 width: int = 1920
34 height: int = 1080
35 fps: int = 30
36 backend: CaptureBackend = CaptureBackend.OPENCV
38 def __post_init__(self) -> None:
39 self._validate()
41 def _validate(self) -> None:
42 if self.device_index < DEVICE_INDEX_MIN:
43 raise ValueError(f"device_index must be >= {DEVICE_INDEX_MIN}")
44 if not RESOLUTION_MIN <= self.width <= RESOLUTION_MAX:
45 raise ValueError(f"width must be in [{RESOLUTION_MIN}, {RESOLUTION_MAX}]")
46 if not RESOLUTION_MIN <= self.height <= RESOLUTION_MAX:
47 raise ValueError(f"height must be in [{RESOLUTION_MIN}, {RESOLUTION_MAX}]")
48 if not FPS_MIN <= self.fps <= FPS_MAX:
49 raise ValueError(f"fps must be in [{FPS_MIN}, {FPS_MAX}]")
52@dataclass
53class SystemStats:
54 """Container for system statistics."""
56 cpu_percent: float = 0.0
57 ram_percent: float = 0.0
58 ram_used_gb: float = 0.0
59 ram_total_gb: float = 0.0
60 gpu_percent: float = 0.0
61 gpu_memory_used_gb: float = 0.0
62 gpu_memory_total_gb: float = 0.0
63 gpu_memory_percent: float = 0.0
64 gpu_temp_celsius: float = 0.0
65 gpu_power_watts: float = 0.0
66 gpu_name: str = "N/A"
69@dataclass
70class PerformanceMetrics:
71 """Container for performance metrics."""
73 camera_fps: float = 0.0
74 inference_ms: float = 0.0
75 inference_capacity_fps: float = 0.0
76 frame_budget_percent: float = 0.0
77 actual_throughput_fps: float = 0.0
80@dataclass
81class Track:
82 """Tracked object with normalized trajectory points.
84 Attributes:
85 track_id: Unique identifier for this track.
86 points_norm: Deque of (x, y) normalized coordinates (0.0-1.0) showing
87 the object's recent trajectory. New points are appended to the right.
88 last_seen_ts: Timestamp of the most recent detection associated with
89 this track. Used to expire stale tracks.
90 """
92 track_id: int
93 points_norm: deque[tuple[float, float]]
94 last_seen_ts: float