Coverage for orchestr_ant_ion / pipeline / capture / gstreamer.py: 13%
233 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""GStreamer subprocess capture backend."""
3from __future__ import annotations
5import os
6import platform
7import shlex
8import shutil
9import subprocess
10import threading
11import time
12from contextlib import suppress
13from pathlib import Path
14from queue import Empty, Queue
15from typing import TYPE_CHECKING
17import numpy as np
18from loguru import logger
20from orchestr_ant_ion.pipeline.constants import (
21 GST_DEFAULT_TIMEOUT_SECONDS,
22 GST_FALLBACK_HEIGHT,
23 GST_FALLBACK_WIDTH,
24 GST_FRAME_QUEUE_TIMEOUT,
25 GST_PROCESS_STARTUP_DELAY,
26 GST_PROCESS_WAIT_TIMEOUT,
27 WINDOWS_GSTREAMER_PATHS,
28)
31if TYPE_CHECKING:
32 from orchestr_ant_ion.pipeline.types import CameraConfig
35def find_gstreamer_launch() -> tuple[str | None, str]:
36 """Find the gst-launch-1.0 executable."""
37 exe_name = (
38 "gst-launch-1.0.exe" if platform.system() == "Windows" else "gst-launch-1.0"
39 )
41 resolved = shutil.which("gst-launch-1.0")
42 if resolved:
43 return resolved, "Found in PATH"
45 if platform.system() == "Windows":
46 config_paths = _get_config_gstreamer_paths()
47 search_paths = config_paths or WINDOWS_GSTREAMER_PATHS
48 for base_path in search_paths:
49 exe_path = Path(base_path) / "bin" / exe_name
50 if exe_path.exists():
51 return str(exe_path), f"Found at {exe_path}"
53 return None, "gst-launch-1.0 not found"
56def _get_config_gstreamer_paths() -> list[str] | None:
57 """Get GStreamer paths from environment configuration."""
58 env_path = os.environ.get("KATAGLYPHIS_GSTREAMER_PATHS", "")
59 if not env_path:
60 return None
61 return [p.strip() for p in env_path.split(os.pathsep) if p.strip()]
64def get_gstreamer_env() -> dict[str, str]:
65 """Get minimal environment variables needed for GStreamer.
67 Only allowlists specific GStreamer-related environment variables.
68 """
69 env = {
70 "PATH": os.environ.get("PATH", ""),
71 "HOME": os.environ.get("HOME", ""),
72 "USER": os.environ.get("USER", ""),
73 "TMPDIR": os.environ.get("TMPDIR", ""),
74 "TEMP": os.environ.get("TEMP", ""),
75 }
77 if platform.system() != "Windows":
78 return env
80 search_paths = _get_config_gstreamer_paths() or [
81 p for p in WINDOWS_GSTREAMER_PATHS if Path(p).exists()
82 ]
84 for gst_root in search_paths:
85 root_path = Path(gst_root)
86 if not root_path.exists():
87 continue
88 bin_path = root_path / "bin"
89 lib_path = root_path / "lib"
90 plugin_path = lib_path / "gstreamer-1.0"
92 current_path = env.get("PATH", "")
93 if str(bin_path) not in current_path:
94 env["PATH"] = str(bin_path) + os.pathsep + current_path
96 env["GST_PLUGIN_PATH"] = str(plugin_path)
97 env["GST_PLUGIN_SYSTEM_PATH"] = str(plugin_path)
99 logger.debug("GStreamer environment configured from: {}", root_path)
100 break
102 return env
105class GStreamerSubprocessCapture:
106 """GStreamer video capture using subprocess and raw frames on stdout."""
108 def __init__(self, config: CameraConfig) -> None:
109 """Initialize the capture backend."""
110 self.config = config
111 self.process: subprocess.Popen[bytes] | None = None
112 self.frame_queue: Queue[np.ndarray] = Queue(maxsize=2)
113 self.running = False
114 self.reader_thread: threading.Thread | None = None
115 self._frame_buffer: np.ndarray | None = None
117 self.actual_width = config.width
118 self.actual_height = config.height
119 self.actual_fps = float(config.fps)
120 self.pipeline_string = ""
121 self.frame_size = 0
123 self.gst_launch_path, self.gst_status = find_gstreamer_launch()
124 self.gst_env = get_gstreamer_env()
126 def _allocate_frame_buffer(self) -> None:
127 """Pre-allocate reusable frame buffer for zero-copy reading."""
128 frame_size = self.actual_width * self.actual_height * 3
129 self._frame_buffer = np.empty(frame_size, dtype=np.uint8)
131 def _build_pipeline_string(
132 self,
133 source: str,
134 width: int,
135 height: int,
136 fps: int,
137 pipeline_type: str = "strict",
138 *,
139 with_device: bool = True,
140 include_size: bool = True,
141 include_fps: bool = True,
142 include_format: bool = True,
143 ) -> str:
144 """Build a GStreamer pipeline string for the requested settings."""
145 sink = "fdsink fd=1 sync=false"
146 device = f" device-index={self.config.device_index}" if with_device else ""
147 source_prefix = f"{source}{device}"
149 caps_parts = []
150 if include_size:
151 caps_parts.append(f"width={width}")
152 caps_parts.append(f"height={height}")
153 if include_fps:
154 caps_parts.append(f"framerate={fps}/1")
155 if include_format:
156 caps_parts.append("format=BGR")
158 caps = f"video/x-raw,{','.join(caps_parts)}" if caps_parts else "video/x-raw"
160 if pipeline_type == "strict":
161 return f"{source_prefix} ! {caps} ! videoconvert ! {caps} ! {sink}"
162 if pipeline_type == "flexible":
163 return (
164 f"{source_prefix} ! "
165 f"videoconvert ! videoscale ! "
166 f"{caps} ! "
167 f"videorate ! {caps} ! "
168 f"videoconvert ! {caps} ! {sink}"
169 )
170 return f"{source_prefix} ! videoconvert ! videoscale ! {caps} ! {sink}"
172 def _frame_reader(self) -> None:
173 frame_size = self.actual_width * self.actual_height * 3
175 while self.running and self.process and self.process.poll() is None:
176 try:
177 if self.process.stdout is None:
178 break
180 if (
181 self._frame_buffer is not None
182 and self._frame_buffer.size == frame_size
183 ):
184 stdout = self.process.stdout
185 if hasattr(stdout, "readinto"):
186 bytes_read = stdout.readinto(self._frame_buffer) # type: ignore[union-attr]
187 if bytes_read != frame_size:
188 if bytes_read == 0:
189 logger.warning("GStreamer process ended (no data)")
190 break
191 logger.warning(
192 "Incomplete frame: {}/{}", bytes_read, frame_size
193 )
194 continue
195 frame = self._frame_buffer.reshape(
196 (self.actual_height, self.actual_width, 3)
197 )
198 else:
199 raw_data = stdout.read(frame_size)
200 if len(raw_data) != frame_size:
201 if len(raw_data) == 0:
202 logger.warning("GStreamer process ended (no data)")
203 break
204 logger.warning(
205 "Incomplete frame: {}/{}", len(raw_data), frame_size
206 )
207 continue
208 frame = np.frombuffer(raw_data, dtype=np.uint8).reshape(
209 (self.actual_height, self.actual_width, 3)
210 )
211 else:
212 raw_data = self.process.stdout.read(frame_size)
213 if len(raw_data) != frame_size:
214 if len(raw_data) == 0:
215 logger.warning("GStreamer process ended (no data)")
216 break
217 logger.warning(
218 "Incomplete frame: {}/{}", len(raw_data), frame_size
219 )
220 continue
221 frame = np.frombuffer(raw_data, dtype=np.uint8).reshape(
222 (self.actual_height, self.actual_width, 3)
223 )
225 if self.frame_queue.full():
226 with suppress(Empty):
227 self.frame_queue.get_nowait()
229 self.frame_queue.put(np.ascontiguousarray(frame), block=False)
231 except (OSError, ValueError) as exc:
232 if self.running:
233 logger.error("Frame reader error: {}", exc)
234 break
236 self.running = False
238 def _start_pipeline(self, pipeline_str: str) -> bool:
239 if self.gst_launch_path is None:
240 return False
242 cmd = [self.gst_launch_path, "-q", *shlex.split(pipeline_str)]
243 try:
244 self.process = subprocess.Popen(
245 cmd,
246 stdout=subprocess.PIPE,
247 stderr=subprocess.PIPE,
248 bufsize=0,
249 env=self.gst_env,
250 )
251 except OSError as exc:
252 logger.warning("Failed to start gst-launch: {}", exc)
253 self.process = None
254 return False
256 time.sleep(GST_PROCESS_STARTUP_DELAY)
257 if self.process.poll() is not None:
258 stderr_text = ""
259 if self.process.stderr is not None:
260 stderr_text = self.process.stderr.read().decode(
261 "utf-8", errors="ignore"
262 )
263 logger.warning("Pipeline failed: {}", stderr_text[:300])
264 self.release()
265 return False
267 self.running = True
268 self.reader_thread = threading.Thread(target=self._frame_reader, daemon=True)
269 self.reader_thread.start()
271 try:
272 frame = self.frame_queue.get(timeout=GST_FRAME_QUEUE_TIMEOUT)
273 except Empty:
274 logger.warning("No frames from pipeline")
275 self.release()
276 return False
278 if frame.shape[0] != self.actual_height or frame.shape[1] != self.actual_width:
279 logger.info("Adjusting dimensions: {}", frame.shape[:2])
280 self.actual_height, self.actual_width = frame.shape[:2]
281 self.frame_size = self.actual_width * self.actual_height * 3
283 self.frame_queue.put(frame)
284 self._allocate_frame_buffer()
285 return True
287 def _try_pipelines(
288 self,
289 sources: list[tuple[str, bool]],
290 specs: list[tuple[str, bool, bool, bool]],
291 width: int,
292 height: int,
293 fps: int,
294 deadline: float,
295 ) -> bool:
296 """Try multiple pipeline configurations until one succeeds or deadline passes."""
297 for source, with_device in sources:
298 for pipeline_type, include_size, include_fps, include_format in specs:
299 if time.monotonic() > deadline:
300 return False
302 pipeline_str = self._build_pipeline_string(
303 source,
304 width,
305 height,
306 fps,
307 pipeline_type,
308 with_device=with_device,
309 include_size=include_size,
310 include_fps=include_fps,
311 include_format=include_format,
312 )
313 logger.debug("Pipeline: {}", pipeline_str)
315 if self._start_pipeline(pipeline_str):
316 self.pipeline_string = pipeline_str
317 return True
319 logger.warning("Pipeline failed: {} ({})", pipeline_type, source)
320 return False
322 def open(self, timeout: float = GST_DEFAULT_TIMEOUT_SECONDS) -> bool:
323 """Start the GStreamer subprocess and begin frame capture.
325 Args:
326 timeout: Maximum wall-clock seconds to spend trying pipelines.
327 """
328 if self.gst_launch_path is None:
329 logger.error("GStreamer not available: {}", self.gst_status)
330 return False
332 logger.info("GStreamer: {}", self.gst_status)
333 logger.debug("Executable: {}", self.gst_launch_path)
335 self.frame_size = self.actual_width * self.actual_height * 3
336 deadline = time.monotonic() + timeout
338 sources: list[tuple[str, bool]] = [
339 ("mfvideosrc", True),
340 ("ksvideosrc", True),
341 ("dshowvideosrc", False),
342 ("autovideosrc", False),
343 ]
344 pipeline_specs: list[tuple[str, bool, bool, bool]] = [
345 ("strict", True, True, True),
346 ("strict", True, False, True),
347 ("strict", False, False, True),
348 ("strict", False, False, False),
349 ("flexible", True, True, True),
350 ("flexible", True, False, True),
351 ("flexible", False, False, True),
352 ("auto", True, True, True),
353 ("auto", False, False, True),
354 ("auto", False, False, False),
355 ]
357 if self._try_pipelines(
358 sources,
359 pipeline_specs,
360 self.actual_width,
361 self.actual_height,
362 int(self.actual_fps),
363 deadline,
364 ):
365 logger.success(
366 "GStreamer started: {}x{}", self.actual_width, self.actual_height
367 )
368 return True
370 if time.monotonic() > deadline:
371 logger.warning("GStreamer pipeline search timed out after {}s", timeout)
372 return False
374 fallback_width = GST_FALLBACK_WIDTH
375 fallback_height = GST_FALLBACK_HEIGHT
376 fallback_fps = int(self.actual_fps) if self.actual_fps > 0 else 30
377 logger.info("Trying fallback pipeline ({}x{})", fallback_width, fallback_height)
379 self.actual_width = fallback_width
380 self.actual_height = fallback_height
381 self.frame_size = self.actual_width * self.actual_height * 3
383 if self._try_pipelines(
384 sources,
385 pipeline_specs,
386 fallback_width,
387 fallback_height,
388 fallback_fps,
389 deadline,
390 ):
391 logger.success(
392 "GStreamer started (fallback): {}x{}",
393 self.actual_width,
394 self.actual_height,
395 )
396 return True
398 logger.error("All GStreamer pipelines failed!")
399 return False
401 def read(self) -> tuple[bool, np.ndarray | None]:
402 """Read a frame from the capture queue."""
403 if not self.running:
404 return False, None
406 try:
407 frame = self.frame_queue.get(timeout=1.0)
408 except Empty:
409 return False, None
410 else:
411 return True, frame
413 def release(self) -> None:
414 """Stop capture and release subprocess resources."""
415 self.running = False
416 if self.process is not None:
417 try:
418 self.process.terminate()
419 self.process.wait(timeout=GST_PROCESS_WAIT_TIMEOUT)
420 except subprocess.TimeoutExpired:
421 self.process.kill()
422 except Exception as exc:
423 logger.debug("Failed to terminate GStreamer process: {}", exc)
424 self.process = None
426 if self.reader_thread and self.reader_thread.is_alive():
427 self.reader_thread.join(timeout=1.0)
429 while not self.frame_queue.empty():
430 with suppress(Empty):
431 self.frame_queue.get_nowait()
433 def is_opened(self) -> bool:
434 """Return True if the subprocess is running."""
435 return self.running and self.process is not None and self.process.poll() is None
437 def get_info(self) -> dict:
438 """Return backend metadata for diagnostics."""
439 return {
440 "backend": "GStreamer (subprocess)",
441 "pipeline": self.pipeline_string,
442 "width": self.actual_width,
443 "height": self.actual_height,
444 "fps": self.actual_fps,
445 }