Coverage for orchestr_ant_ion / pipeline / logging / __init__.py: 37%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 08:36 +0000

1"""Logging helpers for monitoring pipelines.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import sys 

7from collections import deque 

8from pathlib import Path 

9 

10from loguru import logger 

11 

12 

13CONSOLE_FORMAT = ( 

14 "<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | " 

15 "<cyan>{name}:{function}:{line}</cyan> | <level>{message}</level>" 

16) 

17FILE_FORMAT = ( 

18 "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {process}:{thread} | " 

19 "{name}:{function}:{line} | {message}" 

20) 

21 

22 

23def configure_logging( 

24 log_level: str = "DEBUG", 

25 log_dir: str = "logs", 

26 *, 

27 json_logs: bool = False, 

28) -> None: 

29 """Configure loguru logging for the monitor.""" 

30 log_level = os.getenv("KATAGLYPHIS_LOG_LEVEL", log_level).upper() 

31 Path(log_dir).mkdir(parents=True, exist_ok=True) 

32 

33 logger.remove() 

34 logger.add( 

35 sink=sys.stdout, 

36 format=CONSOLE_FORMAT, 

37 level=log_level, 

38 ) 

39 log_path = Path(log_dir) / "yolo_{time:YYYY-MM-DD}.log" 

40 logger.add( 

41 str(log_path), 

42 rotation="10 MB", 

43 retention="7 days", 

44 level=log_level, 

45 format=FILE_FORMAT, 

46 ) 

47 if json_logs: 

48 json_path = Path(log_dir) / "yolo_{time:YYYY-MM-DD}.jsonl" 

49 logger.add( 

50 str(json_path), 

51 rotation="10 MB", 

52 retention="7 days", 

53 level=log_level, 

54 serialize=True, 

55 ) 

56 

57 

58def create_log_buffer(max_lines: int = 200) -> deque[str]: 

59 """Create a bounded log buffer for recent messages.""" 

60 if max_lines <= 0: 

61 msg = "max_lines must be positive" 

62 raise ValueError(msg) 

63 return deque(maxlen=max_lines) 

64 

65 

66def attach_log_buffer(buffer: deque[str], level: str = "INFO") -> int: 

67 """Attach a loguru sink that appends messages to a deque.""" 

68 

69 def _sink(message: object) -> None: 

70 if hasattr(message, "rstrip"): 

71 text = message.rstrip("\n") # type: ignore[union-attr] 

72 else: 

73 text = str(message).rstrip("\n") 

74 buffer.append(text) 

75 

76 return logger.add( 

77 _sink, 

78 level=level, 

79 format="{time:HH:mm:ss} | {level: <8} | {message}", 

80 )