Coverage for orchestr_ant_ion / pipeline / capture / opencv.py: 30%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""OpenCV capture backend.""" 

2 

3from __future__ import annotations 

4 

5from contextlib import suppress 

6from typing import TYPE_CHECKING 

7 

8import cv2 

9from loguru import logger 

10 

11 

12if TYPE_CHECKING: 

13 import numpy as np 

14 

15 from orchestr_ant_ion.pipeline.types import CameraConfig 

16 

17 

18class OpenCVCapture: 

19 """OpenCV video capture wrapper.""" 

20 

21 def __init__(self, config: CameraConfig) -> None: 

22 """Create an OpenCV capture instance.""" 

23 self.config = config 

24 self.cap: cv2.VideoCapture | None = None 

25 self.actual_width = 0 

26 self.actual_height = 0 

27 self.actual_fps = 0.0 

28 

29 def open(self) -> bool: 

30 """Open the camera device and configure capture settings.""" 

31 logger.info("Opening camera {} with OpenCV...", self.config.device_index) 

32 

33 self.cap = cv2.VideoCapture(self.config.device_index) 

34 if not self.cap.isOpened(): 

35 logger.error("Cannot open camera with OpenCV!") 

36 return False 

37 

38 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.width) 

39 self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.height) 

40 self.cap.set(cv2.CAP_PROP_FPS, self.config.fps) 

41 with suppress(Exception): 

42 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) 

43 

44 self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) 

45 self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) 

46 self.actual_fps = float(self.cap.get(cv2.CAP_PROP_FPS)) 

47 

48 logger.success( 

49 "Camera opened: {}x{} @ {:.1f} FPS", 

50 self.actual_width, 

51 self.actual_height, 

52 self.actual_fps, 

53 ) 

54 return True 

55 

56 def read(self) -> tuple[bool, np.ndarray | None]: 

57 """Read a frame from the camera.""" 

58 if self.cap is None: 

59 return False, None 

60 return self.cap.read() 

61 

62 def release(self) -> None: 

63 """Release the OpenCV capture handle.""" 

64 if self.cap: 

65 self.cap.release() 

66 self.cap = None 

67 

68 def is_opened(self) -> bool: 

69 """Return True if the camera is open.""" 

70 return self.cap is not None and self.cap.isOpened() 

71 

72 def get_info(self) -> dict: 

73 """Return backend metadata for diagnostics.""" 

74 return { 

75 "backend": "OpenCV", 

76 "pipeline": f"OpenCV DirectShow (device {self.config.device_index})", 

77 "width": self.actual_width, 

78 "height": self.actual_height, 

79 "fps": self.actual_fps, 

80 }