Coverage for orchestr_ant_ion / pipeline / logging / __init__.py: 37%
30 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 08:36 +0000
1"""Logging helpers for monitoring pipelines."""
3from __future__ import annotations
5import os
6import sys
7from collections import deque
8from pathlib import Path
10from loguru import logger
13CONSOLE_FORMAT = (
14 "<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | "
15 "<cyan>{name}:{function}:{line}</cyan> | <level>{message}</level>"
16)
17FILE_FORMAT = (
18 "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {process}:{thread} | "
19 "{name}:{function}:{line} | {message}"
20)
23def configure_logging(
24 log_level: str = "DEBUG",
25 log_dir: str = "logs",
26 *,
27 json_logs: bool = False,
28) -> None:
29 """Configure loguru logging for the monitor."""
30 log_level = os.getenv("KATAGLYPHIS_LOG_LEVEL", log_level).upper()
31 Path(log_dir).mkdir(parents=True, exist_ok=True)
33 logger.remove()
34 logger.add(
35 sink=sys.stdout,
36 format=CONSOLE_FORMAT,
37 level=log_level,
38 )
39 log_path = Path(log_dir) / "yolo_{time:YYYY-MM-DD}.log"
40 logger.add(
41 str(log_path),
42 rotation="10 MB",
43 retention="7 days",
44 level=log_level,
45 format=FILE_FORMAT,
46 )
47 if json_logs:
48 json_path = Path(log_dir) / "yolo_{time:YYYY-MM-DD}.jsonl"
49 logger.add(
50 str(json_path),
51 rotation="10 MB",
52 retention="7 days",
53 level=log_level,
54 serialize=True,
55 )
58def create_log_buffer(max_lines: int = 200) -> deque[str]:
59 """Create a bounded log buffer for recent messages."""
60 if max_lines <= 0:
61 msg = "max_lines must be positive"
62 raise ValueError(msg)
63 return deque(maxlen=max_lines)
66def attach_log_buffer(buffer: deque[str], level: str = "INFO") -> int:
67 """Attach a loguru sink that appends messages to a deque."""
69 def _sink(message: object) -> None:
70 if hasattr(message, "rstrip"):
71 text = message.rstrip("\n") # type: ignore[union-attr]
72 else:
73 text = str(message).rstrip("\n")
74 buffer.append(text)
76 return logger.add(
77 _sink,
78 level=level,
79 format="{time:HH:mm:ss} | {level: <8} | {message}",
80 )