Coverage for orchestr_ant_ion / pipeline / metrics / performance.py: 26%
34 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""Performance tracking helpers for monitoring pipelines."""
3from __future__ import annotations
5import time
6from collections import deque
8from orchestr_ant_ion.pipeline.types import PerformanceMetrics
11class PerformanceTracker:
12 """Track performance metrics with moving averages."""
14 def __init__(self, avg_frames: int = 30) -> None:
15 """Initialize the tracker with a rolling window size."""
16 self.avg_frames = avg_frames
17 self.camera_times: deque[float] = deque(maxlen=avg_frames)
18 self.inference_times: deque[float] = deque(maxlen=avg_frames)
19 self.last_camera_time: float | None = None
20 self.frame_count = 0
21 self.start_time = time.perf_counter()
23 def tick_camera(self) -> None:
24 """Record a camera frame tick for FPS estimation."""
25 now = time.perf_counter()
26 if self.last_camera_time is not None:
27 self.camera_times.append(now - self.last_camera_time)
28 self.last_camera_time = now
29 self.frame_count += 1
31 def add_inference_time(self, elapsed_ms: float) -> None:
32 """Record a single inference duration in milliseconds."""
33 self.inference_times.append(elapsed_ms)
35 def get_metrics(self) -> PerformanceMetrics:
36 """Compute aggregated performance metrics."""
37 metrics = PerformanceMetrics()
39 if self.camera_times:
40 avg_camera_time = sum(self.camera_times) / len(self.camera_times)
41 metrics.camera_fps = 1.0 / avg_camera_time if avg_camera_time > 0 else 0.0
43 if self.inference_times:
44 metrics.inference_ms = sum(self.inference_times) / len(self.inference_times)
45 metrics.inference_capacity_fps = (
46 1000.0 / metrics.inference_ms if metrics.inference_ms > 0 else 0.0
47 )
49 if metrics.camera_fps > 0:
50 frame_budget_ms = 1000.0 / metrics.camera_fps
51 metrics.frame_budget_percent = (
52 metrics.inference_ms / frame_budget_ms
53 ) * 100
55 elapsed = time.perf_counter() - self.start_time
56 metrics.actual_throughput_fps = (
57 self.frame_count / elapsed if elapsed > 0 else 0.0
58 )
60 return metrics