Coverage for orchestr_ant_ion / pipeline / capture / opencv.py: 30%
40 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""OpenCV capture backend."""
3from __future__ import annotations
5from contextlib import suppress
6from typing import TYPE_CHECKING
8import cv2
9from loguru import logger
12if TYPE_CHECKING:
13 import numpy as np
15 from orchestr_ant_ion.pipeline.types import CameraConfig
18class OpenCVCapture:
19 """OpenCV video capture wrapper."""
21 def __init__(self, config: CameraConfig) -> None:
22 """Create an OpenCV capture instance."""
23 self.config = config
24 self.cap: cv2.VideoCapture | None = None
25 self.actual_width = 0
26 self.actual_height = 0
27 self.actual_fps = 0.0
29 def open(self) -> bool:
30 """Open the camera device and configure capture settings."""
31 logger.info("Opening camera {} with OpenCV...", self.config.device_index)
33 self.cap = cv2.VideoCapture(self.config.device_index)
34 if not self.cap.isOpened():
35 logger.error("Cannot open camera with OpenCV!")
36 return False
38 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.width)
39 self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.height)
40 self.cap.set(cv2.CAP_PROP_FPS, self.config.fps)
41 with suppress(Exception):
42 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
44 self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
45 self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
46 self.actual_fps = float(self.cap.get(cv2.CAP_PROP_FPS))
48 logger.success(
49 "Camera opened: {}x{} @ {:.1f} FPS",
50 self.actual_width,
51 self.actual_height,
52 self.actual_fps,
53 )
54 return True
56 def read(self) -> tuple[bool, np.ndarray | None]:
57 """Read a frame from the camera."""
58 if self.cap is None:
59 return False, None
60 return self.cap.read()
62 def release(self) -> None:
63 """Release the OpenCV capture handle."""
64 if self.cap:
65 self.cap.release()
66 self.cap = None
68 def is_opened(self) -> bool:
69 """Return True if the camera is open."""
70 return self.cap is not None and self.cap.isOpened()
72 def get_info(self) -> dict:
73 """Return backend metadata for diagnostics."""
74 return {
75 "backend": "OpenCV",
76 "pipeline": f"OpenCV DirectShow (device {self.config.device_index})",
77 "width": self.actual_width,
78 "height": self.actual_height,
79 "fps": self.actual_fps,
80 }