Coverage for orchestr_ant_ion / pipeline / monitoring / power.py: 17%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""Power monitoring utilities.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import platform 

7import threading 

8import time 

9from contextlib import suppress 

10from pathlib import Path 

11 

12import psutil 

13from loguru import logger 

14 

15 

16class PowerMonitor: 

17 """Best-effort power reader with OS-specific backends and fallback estimation.""" 

18 

19 def __init__(self) -> None: 

20 """Initialize power monitoring state and background polling.""" 

21 self._platform = platform.system() 

22 self._last_energy_uj: int | None = None 

23 self._last_energy_ts: float | None = None 

24 self._energy_wh = 0.0 

25 self._last_os_power_ts = 0.0 

26 self._os_power_enabled = self._resolve_os_power_enabled() 

27 self._os_power_cache: float | None = None 

28 self._os_power_warned: set[str] = set() 

29 self._stop_event = threading.Event() 

30 self._poll_thread: threading.Thread | None = None 

31 self._rapl_energy_paths = self._find_rapl_energy_paths() 

32 self._start_polling() 

33 

34 def _start_polling(self) -> None: 

35 """Start background polling if OS power is enabled.""" 

36 if not self._os_power_enabled: 

37 return 

38 if self._platform not in {"Windows", "Darwin"}: 

39 return 

40 if self._poll_thread and self._poll_thread.is_alive(): 

41 return 

42 self._poll_thread = threading.Thread( 

43 target=self._poll_os_power, 

44 name="power-poll", 

45 daemon=True, 

46 ) 

47 self._poll_thread.start() 

48 

49 def shutdown(self) -> None: 

50 """Stop background polling.""" 

51 if self._poll_thread and self._poll_thread.is_alive(): 

52 self._stop_event.set() 

53 self._poll_thread.join(timeout=1.0) 

54 

55 def _poll_os_power(self) -> None: 

56 """Poll OS power metrics on supported platforms.""" 

57 interval = float(os.getenv("KATAGLYPHIS_OS_POWER_INTERVAL", "2.0") or 2.0) 

58 while not self._stop_event.is_set(): 

59 power = 0.0 

60 if self._platform == "Windows": 

61 power = self._read_windows_ohm_power(blocking=False) 

62 elif self._platform == "Darwin": 

63 power = self._read_macos_powermetrics_power(blocking=False) 

64 if power > 0.0: 

65 self._os_power_cache = power 

66 self._stop_event.wait(interval) 

67 

68 def _resolve_os_power_enabled(self) -> bool: 

69 """Determine whether OS power sampling should be enabled.""" 

70 flag = os.getenv("KATAGLYPHIS_ENABLE_OS_POWER", "") 

71 if flag: 

72 return flag.strip() in {"1", "true", "TRUE", "yes", "YES"} 

73 return self._platform == "Linux" 

74 

75 def update( 

76 self, 

77 sys_gpu_power: float, 

78 cpu_util_percent: float, 

79 cpu_tdp_watts: float, 

80 freq_ratio: float, 

81 dt_seconds: float, 

82 ) -> dict[str, float]: 

83 """Update power metrics and return aggregated values.""" 

84 cpu_power = self._read_cpu_power( 

85 cpu_util_percent=cpu_util_percent, 

86 cpu_tdp_watts=cpu_tdp_watts, 

87 freq_ratio=freq_ratio, 

88 dt_seconds=dt_seconds, 

89 ) 

90 gpu_power = float(sys_gpu_power or 0.0) 

91 system_power = 0.0 

92 if cpu_power or gpu_power: 

93 system_power = cpu_power + gpu_power 

94 if system_power > 0.0 and dt_seconds > 0.0: 

95 self._energy_wh += (system_power * dt_seconds) / 3600.0 

96 

97 return { 

98 "system_power_watts": system_power, 

99 "cpu_power_watts": cpu_power, 

100 "gpu_power_watts": gpu_power, 

101 "energy_wh": self._energy_wh, 

102 } 

103 

104 def _read_cpu_power( 

105 self, 

106 cpu_util_percent: float, 

107 cpu_tdp_watts: float, 

108 freq_ratio: float, 

109 dt_seconds: float, 

110 ) -> float: 

111 """Estimate CPU power, preferring OS and RAPL sources.""" 

112 power = 0.0 

113 if self._platform == "Linux": 

114 power = self._read_linux_rapl_power(dt_seconds) 

115 elif self._platform in {"Darwin", "Windows"}: 

116 power = float(self._os_power_cache or 0.0) 

117 

118 if power <= 0.0: 

119 power = ( 

120 cpu_tdp_watts * (cpu_util_percent / 100.0) * max(0.2, freq_ratio) 

121 if cpu_tdp_watts > 0.0 

122 else 0.0 

123 ) 

124 if cpu_util_percent > 0.5 and power < 1.0: 

125 power = 1.0 

126 return power 

127 

128 def _find_rapl_energy_paths(self) -> tuple[Path, ...]: 

129 """Find available Linux RAPL energy counters.""" 

130 root = Path("/sys/class/powercap") 

131 if not root.exists(): 

132 return () 

133 paths = [] 

134 for entry in root.glob("intel-rapl:*"): 

135 name_file = entry / "name" 

136 energy_file = entry / "energy_uj" 

137 if not energy_file.exists() or not name_file.exists(): 

138 continue 

139 try: 

140 name = name_file.read_text(encoding="utf-8").strip().lower() 

141 except OSError as exc: 

142 logger.debug("Skipping RAPL path {}: {}", name_file, exc) 

143 continue 

144 if "package" in name or "cpu" in name: 

145 paths.append(energy_file) 

146 return tuple(paths) 

147 

148 def _read_linux_rapl_power(self, dt_seconds: float) -> float: 

149 """Read power from Linux RAPL counters.""" 

150 if not self._rapl_energy_paths or dt_seconds <= 0.0: 

151 return 0.0 

152 try: 

153 total_uj = 0 

154 for energy_path in self._rapl_energy_paths: 

155 total_uj += int(energy_path.read_text(encoding="utf-8").strip()) 

156 now_ts = time.perf_counter() 

157 if self._last_energy_uj is None or self._last_energy_ts is None: 

158 self._last_energy_uj = total_uj 

159 self._last_energy_ts = now_ts 

160 return 0.0 

161 delta_uj = total_uj - self._last_energy_uj 

162 delta_ts = now_ts - self._last_energy_ts 

163 self._last_energy_uj = total_uj 

164 self._last_energy_ts = now_ts 

165 if delta_uj <= 0 or delta_ts <= 0: 

166 return 0.0 

167 return (delta_uj * 1e-6) / delta_ts 

168 except (OSError, ValueError): 

169 return 0.0 

170 

171 def _read_macos_powermetrics_power(self, *, blocking: bool = True) -> float: 

172 """Read CPU power using powermetrics on macOS.""" 

173 if blocking and time.perf_counter() - self._last_os_power_ts < 2.0: 

174 return 0.0 

175 self._last_os_power_ts = time.perf_counter() 

176 if "macos" not in self._os_power_warned: 

177 logger.debug( 

178 "powermetrics subprocess sampling disabled; returning 0.0 watts" 

179 ) 

180 self._os_power_warned.add("macos") 

181 return 0.0 

182 

183 def _read_windows_ohm_power(self, *, blocking: bool = True) -> float: 

184 """Read CPU power via OpenHardwareMonitor on Windows.""" 

185 if blocking and time.perf_counter() - self._last_os_power_ts < 2.0: 

186 return 0.0 

187 self._last_os_power_ts = time.perf_counter() 

188 if "windows" not in self._os_power_warned: 

189 logger.debug( 

190 "OpenHardwareMonitor subprocess sampling disabled; returning 0.0 watts" 

191 ) 

192 self._os_power_warned.add("windows") 

193 return 0.0 

194 

195 

196def get_cpu_freq_ratio() -> float: 

197 """Return normalized CPU frequency ratio between 0 and 1.""" 

198 with suppress(OSError, RuntimeError): 

199 freq = psutil.cpu_freq() 

200 if freq and freq.max: 

201 return max(0.0, min(1.0, (freq.current or 0.0) / freq.max)) 

202 return 1.0