Coverage for orchestr_ant_ion / pipeline / capture / gstreamer.py: 13%

233 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""GStreamer subprocess capture backend.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import platform 

7import shlex 

8import shutil 

9import subprocess 

10import threading 

11import time 

12from contextlib import suppress 

13from pathlib import Path 

14from queue import Empty, Queue 

15from typing import TYPE_CHECKING 

16 

17import numpy as np 

18from loguru import logger 

19 

20from orchestr_ant_ion.pipeline.constants import ( 

21 GST_DEFAULT_TIMEOUT_SECONDS, 

22 GST_FALLBACK_HEIGHT, 

23 GST_FALLBACK_WIDTH, 

24 GST_FRAME_QUEUE_TIMEOUT, 

25 GST_PROCESS_STARTUP_DELAY, 

26 GST_PROCESS_WAIT_TIMEOUT, 

27 WINDOWS_GSTREAMER_PATHS, 

28) 

29 

30 

31if TYPE_CHECKING: 

32 from orchestr_ant_ion.pipeline.types import CameraConfig 

33 

34 

35def find_gstreamer_launch() -> tuple[str | None, str]: 

36 """Find the gst-launch-1.0 executable.""" 

37 exe_name = ( 

38 "gst-launch-1.0.exe" if platform.system() == "Windows" else "gst-launch-1.0" 

39 ) 

40 

41 resolved = shutil.which("gst-launch-1.0") 

42 if resolved: 

43 return resolved, "Found in PATH" 

44 

45 if platform.system() == "Windows": 

46 config_paths = _get_config_gstreamer_paths() 

47 search_paths = config_paths or WINDOWS_GSTREAMER_PATHS 

48 for base_path in search_paths: 

49 exe_path = Path(base_path) / "bin" / exe_name 

50 if exe_path.exists(): 

51 return str(exe_path), f"Found at {exe_path}" 

52 

53 return None, "gst-launch-1.0 not found" 

54 

55 

56def _get_config_gstreamer_paths() -> list[str] | None: 

57 """Get GStreamer paths from environment configuration.""" 

58 env_path = os.environ.get("KATAGLYPHIS_GSTREAMER_PATHS", "") 

59 if not env_path: 

60 return None 

61 return [p.strip() for p in env_path.split(os.pathsep) if p.strip()] 

62 

63 

64def get_gstreamer_env() -> dict[str, str]: 

65 """Get minimal environment variables needed for GStreamer. 

66 

67 Only allowlists specific GStreamer-related environment variables. 

68 """ 

69 env = { 

70 "PATH": os.environ.get("PATH", ""), 

71 "HOME": os.environ.get("HOME", ""), 

72 "USER": os.environ.get("USER", ""), 

73 "TMPDIR": os.environ.get("TMPDIR", ""), 

74 "TEMP": os.environ.get("TEMP", ""), 

75 } 

76 

77 if platform.system() != "Windows": 

78 return env 

79 

80 search_paths = _get_config_gstreamer_paths() or [ 

81 p for p in WINDOWS_GSTREAMER_PATHS if Path(p).exists() 

82 ] 

83 

84 for gst_root in search_paths: 

85 root_path = Path(gst_root) 

86 if not root_path.exists(): 

87 continue 

88 bin_path = root_path / "bin" 

89 lib_path = root_path / "lib" 

90 plugin_path = lib_path / "gstreamer-1.0" 

91 

92 current_path = env.get("PATH", "") 

93 if str(bin_path) not in current_path: 

94 env["PATH"] = str(bin_path) + os.pathsep + current_path 

95 

96 env["GST_PLUGIN_PATH"] = str(plugin_path) 

97 env["GST_PLUGIN_SYSTEM_PATH"] = str(plugin_path) 

98 

99 logger.debug("GStreamer environment configured from: {}", root_path) 

100 break 

101 

102 return env 

103 

104 

105class GStreamerSubprocessCapture: 

106 """GStreamer video capture using subprocess and raw frames on stdout.""" 

107 

108 def __init__(self, config: CameraConfig) -> None: 

109 """Initialize the capture backend.""" 

110 self.config = config 

111 self.process: subprocess.Popen[bytes] | None = None 

112 self.frame_queue: Queue[np.ndarray] = Queue(maxsize=2) 

113 self.running = False 

114 self.reader_thread: threading.Thread | None = None 

115 self._frame_buffer: np.ndarray | None = None 

116 

117 self.actual_width = config.width 

118 self.actual_height = config.height 

119 self.actual_fps = float(config.fps) 

120 self.pipeline_string = "" 

121 self.frame_size = 0 

122 

123 self.gst_launch_path, self.gst_status = find_gstreamer_launch() 

124 self.gst_env = get_gstreamer_env() 

125 

126 def _allocate_frame_buffer(self) -> None: 

127 """Pre-allocate reusable frame buffer for zero-copy reading.""" 

128 frame_size = self.actual_width * self.actual_height * 3 

129 self._frame_buffer = np.empty(frame_size, dtype=np.uint8) 

130 

131 def _build_pipeline_string( 

132 self, 

133 source: str, 

134 width: int, 

135 height: int, 

136 fps: int, 

137 pipeline_type: str = "strict", 

138 *, 

139 with_device: bool = True, 

140 include_size: bool = True, 

141 include_fps: bool = True, 

142 include_format: bool = True, 

143 ) -> str: 

144 """Build a GStreamer pipeline string for the requested settings.""" 

145 sink = "fdsink fd=1 sync=false" 

146 device = f" device-index={self.config.device_index}" if with_device else "" 

147 source_prefix = f"{source}{device}" 

148 

149 caps_parts = [] 

150 if include_size: 

151 caps_parts.append(f"width={width}") 

152 caps_parts.append(f"height={height}") 

153 if include_fps: 

154 caps_parts.append(f"framerate={fps}/1") 

155 if include_format: 

156 caps_parts.append("format=BGR") 

157 

158 caps = f"video/x-raw,{','.join(caps_parts)}" if caps_parts else "video/x-raw" 

159 

160 if pipeline_type == "strict": 

161 return f"{source_prefix} ! {caps} ! videoconvert ! {caps} ! {sink}" 

162 if pipeline_type == "flexible": 

163 return ( 

164 f"{source_prefix} ! " 

165 f"videoconvert ! videoscale ! " 

166 f"{caps} ! " 

167 f"videorate ! {caps} ! " 

168 f"videoconvert ! {caps} ! {sink}" 

169 ) 

170 return f"{source_prefix} ! videoconvert ! videoscale ! {caps} ! {sink}" 

171 

172 def _frame_reader(self) -> None: 

173 frame_size = self.actual_width * self.actual_height * 3 

174 

175 while self.running and self.process and self.process.poll() is None: 

176 try: 

177 if self.process.stdout is None: 

178 break 

179 

180 if ( 

181 self._frame_buffer is not None 

182 and self._frame_buffer.size == frame_size 

183 ): 

184 stdout = self.process.stdout 

185 if hasattr(stdout, "readinto"): 

186 bytes_read = stdout.readinto(self._frame_buffer) # type: ignore[union-attr] 

187 if bytes_read != frame_size: 

188 if bytes_read == 0: 

189 logger.warning("GStreamer process ended (no data)") 

190 break 

191 logger.warning( 

192 "Incomplete frame: {}/{}", bytes_read, frame_size 

193 ) 

194 continue 

195 frame = self._frame_buffer.reshape( 

196 (self.actual_height, self.actual_width, 3) 

197 ) 

198 else: 

199 raw_data = stdout.read(frame_size) 

200 if len(raw_data) != frame_size: 

201 if len(raw_data) == 0: 

202 logger.warning("GStreamer process ended (no data)") 

203 break 

204 logger.warning( 

205 "Incomplete frame: {}/{}", len(raw_data), frame_size 

206 ) 

207 continue 

208 frame = np.frombuffer(raw_data, dtype=np.uint8).reshape( 

209 (self.actual_height, self.actual_width, 3) 

210 ) 

211 else: 

212 raw_data = self.process.stdout.read(frame_size) 

213 if len(raw_data) != frame_size: 

214 if len(raw_data) == 0: 

215 logger.warning("GStreamer process ended (no data)") 

216 break 

217 logger.warning( 

218 "Incomplete frame: {}/{}", len(raw_data), frame_size 

219 ) 

220 continue 

221 frame = np.frombuffer(raw_data, dtype=np.uint8).reshape( 

222 (self.actual_height, self.actual_width, 3) 

223 ) 

224 

225 if self.frame_queue.full(): 

226 with suppress(Empty): 

227 self.frame_queue.get_nowait() 

228 

229 self.frame_queue.put(np.ascontiguousarray(frame), block=False) 

230 

231 except (OSError, ValueError) as exc: 

232 if self.running: 

233 logger.error("Frame reader error: {}", exc) 

234 break 

235 

236 self.running = False 

237 

238 def _start_pipeline(self, pipeline_str: str) -> bool: 

239 if self.gst_launch_path is None: 

240 return False 

241 

242 cmd = [self.gst_launch_path, "-q", *shlex.split(pipeline_str)] 

243 try: 

244 self.process = subprocess.Popen( 

245 cmd, 

246 stdout=subprocess.PIPE, 

247 stderr=subprocess.PIPE, 

248 bufsize=0, 

249 env=self.gst_env, 

250 ) 

251 except OSError as exc: 

252 logger.warning("Failed to start gst-launch: {}", exc) 

253 self.process = None 

254 return False 

255 

256 time.sleep(GST_PROCESS_STARTUP_DELAY) 

257 if self.process.poll() is not None: 

258 stderr_text = "" 

259 if self.process.stderr is not None: 

260 stderr_text = self.process.stderr.read().decode( 

261 "utf-8", errors="ignore" 

262 ) 

263 logger.warning("Pipeline failed: {}", stderr_text[:300]) 

264 self.release() 

265 return False 

266 

267 self.running = True 

268 self.reader_thread = threading.Thread(target=self._frame_reader, daemon=True) 

269 self.reader_thread.start() 

270 

271 try: 

272 frame = self.frame_queue.get(timeout=GST_FRAME_QUEUE_TIMEOUT) 

273 except Empty: 

274 logger.warning("No frames from pipeline") 

275 self.release() 

276 return False 

277 

278 if frame.shape[0] != self.actual_height or frame.shape[1] != self.actual_width: 

279 logger.info("Adjusting dimensions: {}", frame.shape[:2]) 

280 self.actual_height, self.actual_width = frame.shape[:2] 

281 self.frame_size = self.actual_width * self.actual_height * 3 

282 

283 self.frame_queue.put(frame) 

284 self._allocate_frame_buffer() 

285 return True 

286 

287 def _try_pipelines( 

288 self, 

289 sources: list[tuple[str, bool]], 

290 specs: list[tuple[str, bool, bool, bool]], 

291 width: int, 

292 height: int, 

293 fps: int, 

294 deadline: float, 

295 ) -> bool: 

296 """Try multiple pipeline configurations until one succeeds or deadline passes.""" 

297 for source, with_device in sources: 

298 for pipeline_type, include_size, include_fps, include_format in specs: 

299 if time.monotonic() > deadline: 

300 return False 

301 

302 pipeline_str = self._build_pipeline_string( 

303 source, 

304 width, 

305 height, 

306 fps, 

307 pipeline_type, 

308 with_device=with_device, 

309 include_size=include_size, 

310 include_fps=include_fps, 

311 include_format=include_format, 

312 ) 

313 logger.debug("Pipeline: {}", pipeline_str) 

314 

315 if self._start_pipeline(pipeline_str): 

316 self.pipeline_string = pipeline_str 

317 return True 

318 

319 logger.warning("Pipeline failed: {} ({})", pipeline_type, source) 

320 return False 

321 

322 def open(self, timeout: float = GST_DEFAULT_TIMEOUT_SECONDS) -> bool: 

323 """Start the GStreamer subprocess and begin frame capture. 

324 

325 Args: 

326 timeout: Maximum wall-clock seconds to spend trying pipelines. 

327 """ 

328 if self.gst_launch_path is None: 

329 logger.error("GStreamer not available: {}", self.gst_status) 

330 return False 

331 

332 logger.info("GStreamer: {}", self.gst_status) 

333 logger.debug("Executable: {}", self.gst_launch_path) 

334 

335 self.frame_size = self.actual_width * self.actual_height * 3 

336 deadline = time.monotonic() + timeout 

337 

338 sources: list[tuple[str, bool]] = [ 

339 ("mfvideosrc", True), 

340 ("ksvideosrc", True), 

341 ("dshowvideosrc", False), 

342 ("autovideosrc", False), 

343 ] 

344 pipeline_specs: list[tuple[str, bool, bool, bool]] = [ 

345 ("strict", True, True, True), 

346 ("strict", True, False, True), 

347 ("strict", False, False, True), 

348 ("strict", False, False, False), 

349 ("flexible", True, True, True), 

350 ("flexible", True, False, True), 

351 ("flexible", False, False, True), 

352 ("auto", True, True, True), 

353 ("auto", False, False, True), 

354 ("auto", False, False, False), 

355 ] 

356 

357 if self._try_pipelines( 

358 sources, 

359 pipeline_specs, 

360 self.actual_width, 

361 self.actual_height, 

362 int(self.actual_fps), 

363 deadline, 

364 ): 

365 logger.success( 

366 "GStreamer started: {}x{}", self.actual_width, self.actual_height 

367 ) 

368 return True 

369 

370 if time.monotonic() > deadline: 

371 logger.warning("GStreamer pipeline search timed out after {}s", timeout) 

372 return False 

373 

374 fallback_width = GST_FALLBACK_WIDTH 

375 fallback_height = GST_FALLBACK_HEIGHT 

376 fallback_fps = int(self.actual_fps) if self.actual_fps > 0 else 30 

377 logger.info("Trying fallback pipeline ({}x{})", fallback_width, fallback_height) 

378 

379 self.actual_width = fallback_width 

380 self.actual_height = fallback_height 

381 self.frame_size = self.actual_width * self.actual_height * 3 

382 

383 if self._try_pipelines( 

384 sources, 

385 pipeline_specs, 

386 fallback_width, 

387 fallback_height, 

388 fallback_fps, 

389 deadline, 

390 ): 

391 logger.success( 

392 "GStreamer started (fallback): {}x{}", 

393 self.actual_width, 

394 self.actual_height, 

395 ) 

396 return True 

397 

398 logger.error("All GStreamer pipelines failed!") 

399 return False 

400 

401 def read(self) -> tuple[bool, np.ndarray | None]: 

402 """Read a frame from the capture queue.""" 

403 if not self.running: 

404 return False, None 

405 

406 try: 

407 frame = self.frame_queue.get(timeout=1.0) 

408 except Empty: 

409 return False, None 

410 else: 

411 return True, frame 

412 

413 def release(self) -> None: 

414 """Stop capture and release subprocess resources.""" 

415 self.running = False 

416 if self.process is not None: 

417 try: 

418 self.process.terminate() 

419 self.process.wait(timeout=GST_PROCESS_WAIT_TIMEOUT) 

420 except subprocess.TimeoutExpired: 

421 self.process.kill() 

422 except Exception as exc: 

423 logger.debug("Failed to terminate GStreamer process: {}", exc) 

424 self.process = None 

425 

426 if self.reader_thread and self.reader_thread.is_alive(): 

427 self.reader_thread.join(timeout=1.0) 

428 

429 while not self.frame_queue.empty(): 

430 with suppress(Empty): 

431 self.frame_queue.get_nowait() 

432 

433 def is_opened(self) -> bool: 

434 """Return True if the subprocess is running.""" 

435 return self.running and self.process is not None and self.process.poll() is None 

436 

437 def get_info(self) -> dict: 

438 """Return backend metadata for diagnostics.""" 

439 return { 

440 "backend": "GStreamer (subprocess)", 

441 "pipeline": self.pipeline_string, 

442 "width": self.actual_width, 

443 "height": self.actual_height, 

444 "fps": self.actual_fps, 

445 }