Coverage for orchestr_ant_ion / streaming / capture.py: 0%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""Camera capture utilities for Raspberry Pi Picamera2.""" 

2 

3from __future__ import annotations 

4 

5import importlib 

6import queue 

7import threading 

8import time 

9from functools import lru_cache 

10from typing import TYPE_CHECKING, Protocol, cast 

11 

12import cv2 

13import numpy as np 

14from loguru import logger 

15 

16 

17if TYPE_CHECKING: 

18 from types import ModuleType 

19 

20 

21@lru_cache(maxsize=1) 

22def _get_picamera2() -> ModuleType: 

23 """Import and cache the Picamera2 module.""" 

24 return importlib.import_module("picamera2") 

25 

26 

27def initialize_camera() -> object: 

28 """Initialize and start the Picamera2 device.""" 

29 picamera2 = _get_picamera2() 

30 camera = picamera2.Picamera2() 

31 camera.configure("preview") 

32 camera.start() 

33 logger.info("Camera initialized successfully.") 

34 return camera 

35 

36 

37class _CameraProtocol(Protocol): 

38 """Protocol for camera objects used by this module.""" 

39 

40 def capture_array(self) -> np.ndarray | None: ... 

41 

42 def stop(self) -> None: ... 

43 

44 

45class FrameCapture: 

46 """Background frame capture using Picamera2 with a bounded queue.""" 

47 

48 def __init__(self, queue_size: int = 10, capture_interval: float = 0.03) -> None: 

49 """Create a frame capture worker. 

50 

51 Args: 

52 queue_size: Maximum number of frames to buffer. 

53 capture_interval: Sleep between captures in seconds. 

54 """ 

55 self.camera: _CameraProtocol = cast("_CameraProtocol", initialize_camera()) 

56 self.capture_interval = capture_interval 

57 self.frame_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=queue_size) 

58 self.running = True 

59 self._camera_lock = threading.Lock() 

60 self.thread = threading.Thread(target=self.update, daemon=True) 

61 self.thread.start() 

62 

63 def update(self) -> None: 

64 """Continuously capture frames into the queue.""" 

65 while self.running: 

66 try: 

67 with self._camera_lock: 

68 if not self.running: 

69 break 

70 frame = self.camera.capture_array() 

71 

72 if frame is None or frame.size == 0: 

73 logger.warning("Received empty frame, retrying...") 

74 continue 

75 

76 logger.debug("Frame captured successfully with size {}", frame.shape) 

77 frame = cv2.rotate(frame, cv2.ROTATE_180) 

78 frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) 

79 

80 if not self.frame_queue.full(): 

81 self.frame_queue.put(frame) 

82 logger.debug( 

83 "Frame added to queue. Queue size: {}", 

84 self.frame_queue.qsize(), 

85 ) 

86 else: 

87 logger.warning("Frame queue full, discarding frame.") 

88 except (OSError, RuntimeError) as exc: 

89 logger.error("Error during frame capture: {}", exc) 

90 self.restart_camera() 

91 time.sleep(self.capture_interval) 

92 

93 def get_frame(self) -> np.ndarray | None: 

94 """Return the latest frame from the queue, if available.""" 

95 if not self.frame_queue.empty(): 

96 frame = self.frame_queue.get_nowait() 

97 logger.debug("Frame retrieved from queue.") 

98 return frame 

99 

100 logger.warning("Queue is empty, waiting for frames...") 

101 return None 

102 

103 def stop(self) -> None: 

104 """Stop the capture thread.""" 

105 self.running = False 

106 self.thread.join() 

107 

108 def restart_camera(self) -> None: 

109 """Restart the camera after a capture failure.""" 

110 logger.info("Restarting camera...") 

111 with self._camera_lock: 

112 self.camera = cast("_CameraProtocol", initialize_camera()) 

113 

114 @staticmethod 

115 def get_fallback_frame() -> np.ndarray: 

116 """Return a fallback black frame when no data is available.""" 

117 return np.zeros((480, 640, 3), dtype=np.uint8)