Some checks failed
DSS Project Analysis / dss-context-update (push) Has been cancelled
This reverts commit 72cb7319f5.
363 lines
12 KiB
Python
363 lines
12 KiB
Python
"""
|
|
DSS Structured Logger - JSON-based logging for AI-consumable audit trails
|
|
|
|
Provides structured, machine-readable logging in JSONL format (one JSON object per line).
|
|
All DSS operations are logged with consistent fields for analysis, debugging, and compliance.
|
|
|
|
Features:
|
|
- JSONL format (newline-delimited JSON) for easy parsing
|
|
- Structured log entries with standardized fields
|
|
- Context tracking (session_id, tool_name, operation)
|
|
- Performance metrics (duration, timestamps)
|
|
- Log rotation and cleanup
|
|
- Integration with DSSRuntime
|
|
|
|
Usage:
|
|
from core.structured_logger import get_logger, LogContext
|
|
|
|
logger = get_logger("dss.tool.sync_figma")
|
|
|
|
with LogContext(session_id="abc123", tool="dss_sync_figma"):
|
|
logger.info("Starting Figma sync", extra={"file_key": "xyz"})
|
|
# ... operation ...
|
|
logger.info("Figma sync complete", extra={"tokens_extracted": 42})
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
from contextlib import contextmanager
|
|
import threading
|
|
|
|
# Thread-local storage for context
|
|
_context = threading.local()
|
|
|
|
|
|
class DSSJSONFormatter(logging.Formatter):
|
|
"""
|
|
Custom JSON formatter for structured logging.
|
|
|
|
Outputs each log record as a single-line JSON object with standardized fields:
|
|
- timestamp: ISO 8601 UTC timestamp
|
|
- level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
|
- logger: Logger name (e.g., "dss.tool.sync_figma")
|
|
- message: Human-readable log message
|
|
- context: Additional contextual data (session_id, tool_name, etc.)
|
|
- extra: Tool-specific extra data
|
|
"""
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
"""Format log record as single-line JSON"""
|
|
|
|
# Build base log entry
|
|
log_entry = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"level": record.levelname,
|
|
"logger": record.name,
|
|
"message": record.getMessage(),
|
|
}
|
|
|
|
# Add context from thread-local storage
|
|
if hasattr(_context, "session_id"):
|
|
log_entry["session_id"] = _context.session_id
|
|
if hasattr(_context, "tool_name"):
|
|
log_entry["tool"] = _context.tool_name
|
|
if hasattr(_context, "operation"):
|
|
log_entry["operation"] = _context.operation
|
|
|
|
# Add extra fields from record
|
|
if hasattr(record, "extra_data"):
|
|
log_entry["extra"] = record.extra_data
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_entry["exception"] = {
|
|
"type": record.exc_info[0].__name__ if record.exc_info[0] else None,
|
|
"message": str(record.exc_info[1]) if record.exc_info[1] else None,
|
|
"traceback": self.formatException(record.exc_info) if record.exc_info else None,
|
|
}
|
|
|
|
# Add location info for ERROR and above
|
|
if record.levelno >= logging.ERROR:
|
|
log_entry["location"] = {
|
|
"file": record.pathname,
|
|
"line": record.lineno,
|
|
"function": record.funcName,
|
|
}
|
|
|
|
return json.dumps(log_entry, default=str)
|
|
|
|
|
|
class DSSLogger(logging.Logger):
|
|
"""
|
|
Extended logger with structured logging support.
|
|
|
|
Wraps standard Python logger with methods that accept extra data
|
|
as keyword arguments for structured logging.
|
|
"""
|
|
|
|
def _log_with_extra(self, level: int, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Internal method to log with extra structured data"""
|
|
if extra:
|
|
# Store extra data in a custom attribute
|
|
extra_record = {"extra_data": extra}
|
|
super()._log(level, msg, (), extra=extra_record, **kwargs)
|
|
else:
|
|
super()._log(level, msg, (), **kwargs)
|
|
|
|
def debug(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Log DEBUG message with optional extra data"""
|
|
self._log_with_extra(logging.DEBUG, msg, extra, **kwargs)
|
|
|
|
def info(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Log INFO message with optional extra data"""
|
|
self._log_with_extra(logging.INFO, msg, extra, **kwargs)
|
|
|
|
def warning(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Log WARNING message with optional extra data"""
|
|
self._log_with_extra(logging.WARNING, msg, extra, **kwargs)
|
|
|
|
def error(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Log ERROR message with optional extra data"""
|
|
self._log_with_extra(logging.ERROR, msg, extra, **kwargs)
|
|
|
|
def critical(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
|
|
"""Log CRITICAL message with optional extra data"""
|
|
self._log_with_extra(logging.CRITICAL, msg, extra, **kwargs)
|
|
|
|
|
|
# Configure custom logger class
|
|
logging.setLoggerClass(DSSLogger)
|
|
|
|
|
|
def get_logger(name: str, log_file: Optional[str] = None) -> DSSLogger:
|
|
"""
|
|
Get or create a structured logger instance.
|
|
|
|
Args:
|
|
name: Logger name (e.g., "dss.tool.sync_figma")
|
|
log_file: Optional custom log file path (defaults to .dss/logs/dss-operations.jsonl)
|
|
|
|
Returns:
|
|
DSSLogger instance configured for structured logging
|
|
|
|
Example:
|
|
logger = get_logger("dss.tool.extract_tokens")
|
|
logger.info("Starting token extraction", extra={"source": "css"})
|
|
"""
|
|
logger = logging.getLogger(name)
|
|
|
|
# Only configure if not already configured
|
|
if not logger.handlers:
|
|
# Determine log file path
|
|
if log_file is None:
|
|
dss_home = os.environ.get("DSS_HOME", ".dss")
|
|
log_dir = Path(dss_home) / "logs"
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
log_file = str(log_dir / "dss-operations.jsonl")
|
|
|
|
# Create file handler with JSON formatter
|
|
file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
|
|
file_handler.setFormatter(DSSJSONFormatter())
|
|
logger.addHandler(file_handler)
|
|
|
|
# Also add console handler for development (can be disabled in production)
|
|
if os.environ.get("DSS_LOG_CONSOLE", "false").lower() == "true":
|
|
console_handler = logging.StreamHandler(sys.stderr)
|
|
console_handler.setFormatter(DSSJSONFormatter())
|
|
logger.addHandler(console_handler)
|
|
|
|
# Set log level from environment or default to INFO
|
|
log_level = os.environ.get("DSS_LOG_LEVEL", "INFO").upper()
|
|
logger.setLevel(getattr(logging, log_level, logging.INFO))
|
|
|
|
# Prevent propagation to root logger
|
|
logger.propagate = False
|
|
|
|
return logger
|
|
|
|
|
|
@contextmanager
|
|
def LogContext(session_id: Optional[str] = None, tool: Optional[str] = None, operation: Optional[str] = None):
|
|
"""
|
|
Context manager for adding structured context to log entries.
|
|
|
|
All log entries within this context will include the provided fields
|
|
(session_id, tool_name, operation).
|
|
|
|
Args:
|
|
session_id: Unique session identifier
|
|
tool: Tool name (e.g., "dss_sync_figma")
|
|
operation: Operation being performed (e.g., "token_extraction")
|
|
|
|
Example:
|
|
with LogContext(session_id="abc123", tool="dss_sync_figma"):
|
|
logger.info("Starting sync")
|
|
# This log will include session_id and tool fields
|
|
"""
|
|
# Store previous context
|
|
prev_session_id = getattr(_context, "session_id", None)
|
|
prev_tool_name = getattr(_context, "tool_name", None)
|
|
prev_operation = getattr(_context, "operation", None)
|
|
|
|
# Set new context
|
|
if session_id:
|
|
_context.session_id = session_id
|
|
if tool:
|
|
_context.tool_name = tool
|
|
if operation:
|
|
_context.operation = operation
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
# Restore previous context
|
|
if prev_session_id:
|
|
_context.session_id = prev_session_id
|
|
elif hasattr(_context, "session_id"):
|
|
delattr(_context, "session_id")
|
|
|
|
if prev_tool_name:
|
|
_context.tool_name = prev_tool_name
|
|
elif hasattr(_context, "tool_name"):
|
|
delattr(_context, "tool_name")
|
|
|
|
if prev_operation:
|
|
_context.operation = prev_operation
|
|
elif hasattr(_context, "operation"):
|
|
delattr(_context, "operation")
|
|
|
|
|
|
class PerformanceLogger:
|
|
"""
|
|
Helper for logging operation performance metrics.
|
|
|
|
Automatically measures duration and logs performance data.
|
|
|
|
Example:
|
|
perf = PerformanceLogger("token_extraction")
|
|
perf.start()
|
|
# ... operation ...
|
|
perf.end(extra={"tokens_found": 42})
|
|
"""
|
|
|
|
def __init__(self, operation: str, logger: Optional[DSSLogger] = None):
|
|
"""
|
|
Initialize performance logger.
|
|
|
|
Args:
|
|
operation: Operation name
|
|
logger: Optional logger (defaults to root DSS logger)
|
|
"""
|
|
self.operation = operation
|
|
self.logger = logger or get_logger("dss.performance")
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def start(self):
|
|
"""Mark operation start time"""
|
|
self.start_time = datetime.now(timezone.utc)
|
|
self.logger.debug(f"Started: {self.operation}", extra={
|
|
"operation": self.operation,
|
|
"start_time": self.start_time.isoformat(),
|
|
})
|
|
|
|
def end(self, extra: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Mark operation end time and log performance.
|
|
|
|
Args:
|
|
extra: Additional metrics to log
|
|
"""
|
|
self.end_time = datetime.now(timezone.utc)
|
|
|
|
if self.start_time is None:
|
|
self.logger.warning(f"Performance logger end() called without start() for: {self.operation}")
|
|
return
|
|
|
|
duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
|
|
|
|
perf_data = {
|
|
"operation": self.operation,
|
|
"duration_ms": round(duration_ms, 2),
|
|
"start_time": self.start_time.isoformat(),
|
|
"end_time": self.end_time.isoformat(),
|
|
}
|
|
|
|
if extra:
|
|
perf_data.update(extra)
|
|
|
|
self.logger.info(f"Completed: {self.operation}", extra=perf_data)
|
|
|
|
|
|
def configure_log_rotation(log_dir: Optional[Path] = None, max_bytes: int = 10 * 1024 * 1024, backup_count: int = 5):
|
|
"""
|
|
Configure log rotation for DSS log files.
|
|
|
|
Args:
|
|
log_dir: Log directory (defaults to .dss/logs/)
|
|
max_bytes: Max size per log file (default: 10MB)
|
|
backup_count: Number of backup files to keep (default: 5)
|
|
|
|
Note: This uses RotatingFileHandler. For production, consider
|
|
using a log rotation service like logrotate.
|
|
"""
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
if log_dir is None:
|
|
dss_home = os.environ.get("DSS_HOME", ".dss")
|
|
log_dir = Path(dss_home) / "logs"
|
|
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
log_file = log_dir / "dss-operations.jsonl"
|
|
|
|
# Get root DSS logger
|
|
logger = logging.getLogger("dss")
|
|
|
|
# Remove existing file handlers
|
|
for handler in logger.handlers[:]:
|
|
if isinstance(handler, logging.FileHandler):
|
|
logger.removeHandler(handler)
|
|
|
|
# Add rotating file handler
|
|
rotating_handler = RotatingFileHandler(
|
|
str(log_file),
|
|
maxBytes=max_bytes,
|
|
backupCount=backup_count,
|
|
encoding="utf-8"
|
|
)
|
|
rotating_handler.setFormatter(DSSJSONFormatter())
|
|
logger.addHandler(rotating_handler)
|
|
|
|
logger.info("Log rotation configured", extra={
|
|
"max_bytes": max_bytes,
|
|
"backup_count": backup_count,
|
|
"log_file": str(log_file),
|
|
})
|
|
|
|
|
|
# Example usage (can be removed in production)
|
|
if __name__ == "__main__":
|
|
# Example 1: Basic logging
|
|
logger = get_logger("dss.example")
|
|
logger.info("DSS operation started", extra={"user": "admin"})
|
|
|
|
# Example 2: Context-based logging
|
|
with LogContext(session_id="session-123", tool="dss_sync_figma"):
|
|
logger.info("Syncing Figma file", extra={"file_key": "abc123"})
|
|
logger.info("Sync complete", extra={"tokens_extracted": 42})
|
|
|
|
# Example 3: Performance logging
|
|
perf = PerformanceLogger("token_extraction", logger)
|
|
perf.start()
|
|
# Simulate work
|
|
import time
|
|
time.sleep(0.1)
|
|
perf.end(extra={"tokens_found": 100})
|
|
|
|
print(f"\nLogs written to: {Path('.dss/logs/dss-operations.jsonl').absolute()}")
|