Coverage for orchestr_ant_ion / pipeline / monitoring / power.py: 17%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""Power monitoring utilities."""
3from __future__ import annotations
5import os
6import platform
7import threading
8import time
9from contextlib import suppress
10from pathlib import Path
12import psutil
13from loguru import logger
16class PowerMonitor:
17 """Best-effort power reader with OS-specific backends and fallback estimation."""
19 def __init__(self) -> None:
20 """Initialize power monitoring state and background polling."""
21 self._platform = platform.system()
22 self._last_energy_uj: int | None = None
23 self._last_energy_ts: float | None = None
24 self._energy_wh = 0.0
25 self._last_os_power_ts = 0.0
26 self._os_power_enabled = self._resolve_os_power_enabled()
27 self._os_power_cache: float | None = None
28 self._os_power_warned: set[str] = set()
29 self._stop_event = threading.Event()
30 self._poll_thread: threading.Thread | None = None
31 self._rapl_energy_paths = self._find_rapl_energy_paths()
32 self._start_polling()
34 def _start_polling(self) -> None:
35 """Start background polling if OS power is enabled."""
36 if not self._os_power_enabled:
37 return
38 if self._platform not in {"Windows", "Darwin"}:
39 return
40 if self._poll_thread and self._poll_thread.is_alive():
41 return
42 self._poll_thread = threading.Thread(
43 target=self._poll_os_power,
44 name="power-poll",
45 daemon=True,
46 )
47 self._poll_thread.start()
49 def shutdown(self) -> None:
50 """Stop background polling."""
51 if self._poll_thread and self._poll_thread.is_alive():
52 self._stop_event.set()
53 self._poll_thread.join(timeout=1.0)
55 def _poll_os_power(self) -> None:
56 """Poll OS power metrics on supported platforms."""
57 interval = float(os.getenv("KATAGLYPHIS_OS_POWER_INTERVAL", "2.0") or 2.0)
58 while not self._stop_event.is_set():
59 power = 0.0
60 if self._platform == "Windows":
61 power = self._read_windows_ohm_power(blocking=False)
62 elif self._platform == "Darwin":
63 power = self._read_macos_powermetrics_power(blocking=False)
64 if power > 0.0:
65 self._os_power_cache = power
66 self._stop_event.wait(interval)
68 def _resolve_os_power_enabled(self) -> bool:
69 """Determine whether OS power sampling should be enabled."""
70 flag = os.getenv("KATAGLYPHIS_ENABLE_OS_POWER", "")
71 if flag:
72 return flag.strip() in {"1", "true", "TRUE", "yes", "YES"}
73 return self._platform == "Linux"
75 def update(
76 self,
77 sys_gpu_power: float,
78 cpu_util_percent: float,
79 cpu_tdp_watts: float,
80 freq_ratio: float,
81 dt_seconds: float,
82 ) -> dict[str, float]:
83 """Update power metrics and return aggregated values."""
84 cpu_power = self._read_cpu_power(
85 cpu_util_percent=cpu_util_percent,
86 cpu_tdp_watts=cpu_tdp_watts,
87 freq_ratio=freq_ratio,
88 dt_seconds=dt_seconds,
89 )
90 gpu_power = float(sys_gpu_power or 0.0)
91 system_power = 0.0
92 if cpu_power or gpu_power:
93 system_power = cpu_power + gpu_power
94 if system_power > 0.0 and dt_seconds > 0.0:
95 self._energy_wh += (system_power * dt_seconds) / 3600.0
97 return {
98 "system_power_watts": system_power,
99 "cpu_power_watts": cpu_power,
100 "gpu_power_watts": gpu_power,
101 "energy_wh": self._energy_wh,
102 }
104 def _read_cpu_power(
105 self,
106 cpu_util_percent: float,
107 cpu_tdp_watts: float,
108 freq_ratio: float,
109 dt_seconds: float,
110 ) -> float:
111 """Estimate CPU power, preferring OS and RAPL sources."""
112 power = 0.0
113 if self._platform == "Linux":
114 power = self._read_linux_rapl_power(dt_seconds)
115 elif self._platform in {"Darwin", "Windows"}:
116 power = float(self._os_power_cache or 0.0)
118 if power <= 0.0:
119 power = (
120 cpu_tdp_watts * (cpu_util_percent / 100.0) * max(0.2, freq_ratio)
121 if cpu_tdp_watts > 0.0
122 else 0.0
123 )
124 if cpu_util_percent > 0.5 and power < 1.0:
125 power = 1.0
126 return power
128 def _find_rapl_energy_paths(self) -> tuple[Path, ...]:
129 """Find available Linux RAPL energy counters."""
130 root = Path("/sys/class/powercap")
131 if not root.exists():
132 return ()
133 paths = []
134 for entry in root.glob("intel-rapl:*"):
135 name_file = entry / "name"
136 energy_file = entry / "energy_uj"
137 if not energy_file.exists() or not name_file.exists():
138 continue
139 try:
140 name = name_file.read_text(encoding="utf-8").strip().lower()
141 except OSError as exc:
142 logger.debug("Skipping RAPL path {}: {}", name_file, exc)
143 continue
144 if "package" in name or "cpu" in name:
145 paths.append(energy_file)
146 return tuple(paths)
148 def _read_linux_rapl_power(self, dt_seconds: float) -> float:
149 """Read power from Linux RAPL counters."""
150 if not self._rapl_energy_paths or dt_seconds <= 0.0:
151 return 0.0
152 try:
153 total_uj = 0
154 for energy_path in self._rapl_energy_paths:
155 total_uj += int(energy_path.read_text(encoding="utf-8").strip())
156 now_ts = time.perf_counter()
157 if self._last_energy_uj is None or self._last_energy_ts is None:
158 self._last_energy_uj = total_uj
159 self._last_energy_ts = now_ts
160 return 0.0
161 delta_uj = total_uj - self._last_energy_uj
162 delta_ts = now_ts - self._last_energy_ts
163 self._last_energy_uj = total_uj
164 self._last_energy_ts = now_ts
165 if delta_uj <= 0 or delta_ts <= 0:
166 return 0.0
167 return (delta_uj * 1e-6) / delta_ts
168 except (OSError, ValueError):
169 return 0.0
171 def _read_macos_powermetrics_power(self, *, blocking: bool = True) -> float:
172 """Read CPU power using powermetrics on macOS."""
173 if blocking and time.perf_counter() - self._last_os_power_ts < 2.0:
174 return 0.0
175 self._last_os_power_ts = time.perf_counter()
176 if "macos" not in self._os_power_warned:
177 logger.debug(
178 "powermetrics subprocess sampling disabled; returning 0.0 watts"
179 )
180 self._os_power_warned.add("macos")
181 return 0.0
183 def _read_windows_ohm_power(self, *, blocking: bool = True) -> float:
184 """Read CPU power via OpenHardwareMonitor on Windows."""
185 if blocking and time.perf_counter() - self._last_os_power_ts < 2.0:
186 return 0.0
187 self._last_os_power_ts = time.perf_counter()
188 if "windows" not in self._os_power_warned:
189 logger.debug(
190 "OpenHardwareMonitor subprocess sampling disabled; returning 0.0 watts"
191 )
192 self._os_power_warned.add("windows")
193 return 0.0
196def get_cpu_freq_ratio() -> float:
197 """Return normalized CPU frequency ratio between 0 and 1."""
198 with suppress(OSError, RuntimeError):
199 freq = psutil.cpu_freq()
200 if freq and freq.max:
201 return max(0.0, min(1.0, (freq.current or 0.0) / freq.max))
202 return 1.0