Coverage for orchestr_ant_ion / pipeline / metrics / performance.py: 26%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""Performance tracking helpers for monitoring pipelines.""" 

2 

3from __future__ import annotations 

4 

5import time 

6from collections import deque 

7 

8from orchestr_ant_ion.pipeline.types import PerformanceMetrics 

9 

10 

11class PerformanceTracker: 

12 """Track performance metrics with moving averages.""" 

13 

14 def __init__(self, avg_frames: int = 30) -> None: 

15 """Initialize the tracker with a rolling window size.""" 

16 self.avg_frames = avg_frames 

17 self.camera_times: deque[float] = deque(maxlen=avg_frames) 

18 self.inference_times: deque[float] = deque(maxlen=avg_frames) 

19 self.last_camera_time: float | None = None 

20 self.frame_count = 0 

21 self.start_time = time.perf_counter() 

22 

23 def tick_camera(self) -> None: 

24 """Record a camera frame tick for FPS estimation.""" 

25 now = time.perf_counter() 

26 if self.last_camera_time is not None: 

27 self.camera_times.append(now - self.last_camera_time) 

28 self.last_camera_time = now 

29 self.frame_count += 1 

30 

31 def add_inference_time(self, elapsed_ms: float) -> None: 

32 """Record a single inference duration in milliseconds.""" 

33 self.inference_times.append(elapsed_ms) 

34 

35 def get_metrics(self) -> PerformanceMetrics: 

36 """Compute aggregated performance metrics.""" 

37 metrics = PerformanceMetrics() 

38 

39 if self.camera_times: 

40 avg_camera_time = sum(self.camera_times) / len(self.camera_times) 

41 metrics.camera_fps = 1.0 / avg_camera_time if avg_camera_time > 0 else 0.0 

42 

43 if self.inference_times: 

44 metrics.inference_ms = sum(self.inference_times) / len(self.inference_times) 

45 metrics.inference_capacity_fps = ( 

46 1000.0 / metrics.inference_ms if metrics.inference_ms > 0 else 0.0 

47 ) 

48 

49 if metrics.camera_fps > 0: 

50 frame_budget_ms = 1000.0 / metrics.camera_fps 

51 metrics.frame_budget_percent = ( 

52 metrics.inference_ms / frame_budget_ms 

53 ) * 100 

54 

55 elapsed = time.perf_counter() - self.start_time 

56 metrics.actual_throughput_fps = ( 

57 self.frame_count / elapsed if elapsed > 0 else 0.0 

58 ) 

59 

60 return metrics