Coverage for orchestr_ant_ion / streaming / capture.py: 0%
68 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""Camera capture utilities for Raspberry Pi Picamera2."""
3from __future__ import annotations
5import importlib
6import queue
7import threading
8import time
9from functools import lru_cache
10from typing import TYPE_CHECKING, Protocol, cast
12import cv2
13import numpy as np
14from loguru import logger
17if TYPE_CHECKING:
18 from types import ModuleType
21@lru_cache(maxsize=1)
22def _get_picamera2() -> ModuleType:
23 """Import and cache the Picamera2 module."""
24 return importlib.import_module("picamera2")
27def initialize_camera() -> object:
28 """Initialize and start the Picamera2 device."""
29 picamera2 = _get_picamera2()
30 camera = picamera2.Picamera2()
31 camera.configure("preview")
32 camera.start()
33 logger.info("Camera initialized successfully.")
34 return camera
37class _CameraProtocol(Protocol):
38 """Protocol for camera objects used by this module."""
40 def capture_array(self) -> np.ndarray | None: ...
42 def stop(self) -> None: ...
45class FrameCapture:
46 """Background frame capture using Picamera2 with a bounded queue."""
48 def __init__(self, queue_size: int = 10, capture_interval: float = 0.03) -> None:
49 """Create a frame capture worker.
51 Args:
52 queue_size: Maximum number of frames to buffer.
53 capture_interval: Sleep between captures in seconds.
54 """
55 self.camera: _CameraProtocol = cast("_CameraProtocol", initialize_camera())
56 self.capture_interval = capture_interval
57 self.frame_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=queue_size)
58 self.running = True
59 self._camera_lock = threading.Lock()
60 self.thread = threading.Thread(target=self.update, daemon=True)
61 self.thread.start()
63 def update(self) -> None:
64 """Continuously capture frames into the queue."""
65 while self.running:
66 try:
67 with self._camera_lock:
68 if not self.running:
69 break
70 frame = self.camera.capture_array()
72 if frame is None or frame.size == 0:
73 logger.warning("Received empty frame, retrying...")
74 continue
76 logger.debug("Frame captured successfully with size {}", frame.shape)
77 frame = cv2.rotate(frame, cv2.ROTATE_180)
78 frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
80 if not self.frame_queue.full():
81 self.frame_queue.put(frame)
82 logger.debug(
83 "Frame added to queue. Queue size: {}",
84 self.frame_queue.qsize(),
85 )
86 else:
87 logger.warning("Frame queue full, discarding frame.")
88 except (OSError, RuntimeError) as exc:
89 logger.error("Error during frame capture: {}", exc)
90 self.restart_camera()
91 time.sleep(self.capture_interval)
93 def get_frame(self) -> np.ndarray | None:
94 """Return the latest frame from the queue, if available."""
95 if not self.frame_queue.empty():
96 frame = self.frame_queue.get_nowait()
97 logger.debug("Frame retrieved from queue.")
98 return frame
100 logger.warning("Queue is empty, waiting for frames...")
101 return None
103 def stop(self) -> None:
104 """Stop the capture thread."""
105 self.running = False
106 self.thread.join()
108 def restart_camera(self) -> None:
109 """Restart the camera after a capture failure."""
110 logger.info("Restarting camera...")
111 with self._camera_lock:
112 self.camera = cast("_CameraProtocol", initialize_camera())
114 @staticmethod
115 def get_fallback_frame() -> np.ndarray:
116 """Return a fallback black frame when no data is available."""
117 return np.zeros((480, 640, 3), dtype=np.uint8)