diff --git a/dss-claude-plugin/core/structured_logger.py b/dss-claude-plugin/core/structured_logger.py new file mode 100644 index 0000000..dc95837 --- /dev/null +++ b/dss-claude-plugin/core/structured_logger.py @@ -0,0 +1,362 @@ +""" +DSS Structured Logger - JSON-based logging for AI-consumable audit trails + +Provides structured, machine-readable logging in JSONL format (one JSON object per line). +All DSS operations are logged with consistent fields for analysis, debugging, and compliance. + +Features: +- JSONL format (newline-delimited JSON) for easy parsing +- Structured log entries with standardized fields +- Context tracking (session_id, tool_name, operation) +- Performance metrics (duration, timestamps) +- Log rotation and cleanup +- Integration with DSSRuntime + +Usage: + from core.structured_logger import get_logger, LogContext + + logger = get_logger("dss.tool.sync_figma") + + with LogContext(session_id="abc123", tool="dss_sync_figma"): + logger.info("Starting Figma sync", extra={"file_key": "xyz"}) + # ... operation ... + logger.info("Figma sync complete", extra={"tokens_extracted": 42}) +""" + +import json +import logging +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Optional +from contextlib import contextmanager +import threading + +# Thread-local storage for context +_context = threading.local() + + +class DSSJSONFormatter(logging.Formatter): + """ + Custom JSON formatter for structured logging. + + Outputs each log record as a single-line JSON object with standardized fields: + - timestamp: ISO 8601 UTC timestamp + - level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + - logger: Logger name (e.g., "dss.tool.sync_figma") + - message: Human-readable log message + - context: Additional contextual data (session_id, tool_name, etc.) + - extra: Tool-specific extra data + """ + + def format(self, record: logging.LogRecord) -> str: + """Format log record as single-line JSON""" + + # Build base log entry + log_entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + } + + # Add context from thread-local storage + if hasattr(_context, "session_id"): + log_entry["session_id"] = _context.session_id + if hasattr(_context, "tool_name"): + log_entry["tool"] = _context.tool_name + if hasattr(_context, "operation"): + log_entry["operation"] = _context.operation + + # Add extra fields from record + if hasattr(record, "extra_data"): + log_entry["extra"] = record.extra_data + + # Add exception info if present + if record.exc_info: + log_entry["exception"] = { + "type": record.exc_info[0].__name__ if record.exc_info[0] else None, + "message": str(record.exc_info[1]) if record.exc_info[1] else None, + "traceback": self.formatException(record.exc_info) if record.exc_info else None, + } + + # Add location info for ERROR and above + if record.levelno >= logging.ERROR: + log_entry["location"] = { + "file": record.pathname, + "line": record.lineno, + "function": record.funcName, + } + + return json.dumps(log_entry, default=str) + + +class DSSLogger(logging.Logger): + """ + Extended logger with structured logging support. + + Wraps standard Python logger with methods that accept extra data + as keyword arguments for structured logging. + """ + + def _log_with_extra(self, level: int, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Internal method to log with extra structured data""" + if extra: + # Store extra data in a custom attribute + extra_record = {"extra_data": extra} + super()._log(level, msg, (), extra=extra_record, **kwargs) + else: + super()._log(level, msg, (), **kwargs) + + def debug(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Log DEBUG message with optional extra data""" + self._log_with_extra(logging.DEBUG, msg, extra, **kwargs) + + def info(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Log INFO message with optional extra data""" + self._log_with_extra(logging.INFO, msg, extra, **kwargs) + + def warning(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Log WARNING message with optional extra data""" + self._log_with_extra(logging.WARNING, msg, extra, **kwargs) + + def error(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Log ERROR message with optional extra data""" + self._log_with_extra(logging.ERROR, msg, extra, **kwargs) + + def critical(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs): + """Log CRITICAL message with optional extra data""" + self._log_with_extra(logging.CRITICAL, msg, extra, **kwargs) + + +# Configure custom logger class +logging.setLoggerClass(DSSLogger) + + +def get_logger(name: str, log_file: Optional[str] = None) -> DSSLogger: + """ + Get or create a structured logger instance. + + Args: + name: Logger name (e.g., "dss.tool.sync_figma") + log_file: Optional custom log file path (defaults to .dss/logs/dss-operations.jsonl) + + Returns: + DSSLogger instance configured for structured logging + + Example: + logger = get_logger("dss.tool.extract_tokens") + logger.info("Starting token extraction", extra={"source": "css"}) + """ + logger = logging.getLogger(name) + + # Only configure if not already configured + if not logger.handlers: + # Determine log file path + if log_file is None: + dss_home = os.environ.get("DSS_HOME", ".dss") + log_dir = Path(dss_home) / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + log_file = str(log_dir / "dss-operations.jsonl") + + # Create file handler with JSON formatter + file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8") + file_handler.setFormatter(DSSJSONFormatter()) + logger.addHandler(file_handler) + + # Also add console handler for development (can be disabled in production) + if os.environ.get("DSS_LOG_CONSOLE", "false").lower() == "true": + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(DSSJSONFormatter()) + logger.addHandler(console_handler) + + # Set log level from environment or default to INFO + log_level = os.environ.get("DSS_LOG_LEVEL", "INFO").upper() + logger.setLevel(getattr(logging, log_level, logging.INFO)) + + # Prevent propagation to root logger + logger.propagate = False + + return logger + + +@contextmanager +def LogContext(session_id: Optional[str] = None, tool: Optional[str] = None, operation: Optional[str] = None): + """ + Context manager for adding structured context to log entries. + + All log entries within this context will include the provided fields + (session_id, tool_name, operation). + + Args: + session_id: Unique session identifier + tool: Tool name (e.g., "dss_sync_figma") + operation: Operation being performed (e.g., "token_extraction") + + Example: + with LogContext(session_id="abc123", tool="dss_sync_figma"): + logger.info("Starting sync") + # This log will include session_id and tool fields + """ + # Store previous context + prev_session_id = getattr(_context, "session_id", None) + prev_tool_name = getattr(_context, "tool_name", None) + prev_operation = getattr(_context, "operation", None) + + # Set new context + if session_id: + _context.session_id = session_id + if tool: + _context.tool_name = tool + if operation: + _context.operation = operation + + try: + yield + finally: + # Restore previous context + if prev_session_id: + _context.session_id = prev_session_id + elif hasattr(_context, "session_id"): + delattr(_context, "session_id") + + if prev_tool_name: + _context.tool_name = prev_tool_name + elif hasattr(_context, "tool_name"): + delattr(_context, "tool_name") + + if prev_operation: + _context.operation = prev_operation + elif hasattr(_context, "operation"): + delattr(_context, "operation") + + +class PerformanceLogger: + """ + Helper for logging operation performance metrics. + + Automatically measures duration and logs performance data. + + Example: + perf = PerformanceLogger("token_extraction") + perf.start() + # ... operation ... + perf.end(extra={"tokens_found": 42}) + """ + + def __init__(self, operation: str, logger: Optional[DSSLogger] = None): + """ + Initialize performance logger. + + Args: + operation: Operation name + logger: Optional logger (defaults to root DSS logger) + """ + self.operation = operation + self.logger = logger or get_logger("dss.performance") + self.start_time = None + self.end_time = None + + def start(self): + """Mark operation start time""" + self.start_time = datetime.now(timezone.utc) + self.logger.debug(f"Started: {self.operation}", extra={ + "operation": self.operation, + "start_time": self.start_time.isoformat(), + }) + + def end(self, extra: Optional[Dict[str, Any]] = None): + """ + Mark operation end time and log performance. + + Args: + extra: Additional metrics to log + """ + self.end_time = datetime.now(timezone.utc) + + if self.start_time is None: + self.logger.warning(f"Performance logger end() called without start() for: {self.operation}") + return + + duration_ms = (self.end_time - self.start_time).total_seconds() * 1000 + + perf_data = { + "operation": self.operation, + "duration_ms": round(duration_ms, 2), + "start_time": self.start_time.isoformat(), + "end_time": self.end_time.isoformat(), + } + + if extra: + perf_data.update(extra) + + self.logger.info(f"Completed: {self.operation}", extra=perf_data) + + +def configure_log_rotation(log_dir: Optional[Path] = None, max_bytes: int = 10 * 1024 * 1024, backup_count: int = 5): + """ + Configure log rotation for DSS log files. + + Args: + log_dir: Log directory (defaults to .dss/logs/) + max_bytes: Max size per log file (default: 10MB) + backup_count: Number of backup files to keep (default: 5) + + Note: This uses RotatingFileHandler. For production, consider + using a log rotation service like logrotate. + """ + from logging.handlers import RotatingFileHandler + + if log_dir is None: + dss_home = os.environ.get("DSS_HOME", ".dss") + log_dir = Path(dss_home) / "logs" + + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / "dss-operations.jsonl" + + # Get root DSS logger + logger = logging.getLogger("dss") + + # Remove existing file handlers + for handler in logger.handlers[:]: + if isinstance(handler, logging.FileHandler): + logger.removeHandler(handler) + + # Add rotating file handler + rotating_handler = RotatingFileHandler( + str(log_file), + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8" + ) + rotating_handler.setFormatter(DSSJSONFormatter()) + logger.addHandler(rotating_handler) + + logger.info("Log rotation configured", extra={ + "max_bytes": max_bytes, + "backup_count": backup_count, + "log_file": str(log_file), + }) + + +# Example usage (can be removed in production) +if __name__ == "__main__": + # Example 1: Basic logging + logger = get_logger("dss.example") + logger.info("DSS operation started", extra={"user": "admin"}) + + # Example 2: Context-based logging + with LogContext(session_id="session-123", tool="dss_sync_figma"): + logger.info("Syncing Figma file", extra={"file_key": "abc123"}) + logger.info("Sync complete", extra={"tokens_extracted": 42}) + + # Example 3: Performance logging + perf = PerformanceLogger("token_extraction", logger) + perf.start() + # Simulate work + import time + time.sleep(0.1) + perf.end(extra={"tokens_found": 100}) + + print(f"\nLogs written to: {Path('.dss/logs/dss-operations.jsonl').absolute()}") diff --git a/dss-claude-plugin/servers/dss-mcp-server.py b/dss-claude-plugin/servers/dss-mcp-server.py index 1a8d01d..a75cf16 100644 --- a/dss-claude-plugin/servers/dss-mcp-server.py +++ b/dss-claude-plugin/servers/dss-mcp-server.py @@ -27,6 +27,7 @@ import re try: sys.path.insert(0, str(Path(__file__).parent.parent)) from core.runtime import DSSRuntime, BoundaryViolationError, get_runtime + from core.structured_logger import get_logger, LogContext, PerformanceLogger, configure_log_rotation RUNTIME_AVAILABLE = True except ImportError as e: RUNTIME_AVAILABLE = False @@ -108,13 +109,20 @@ except ImportError as e: PROJECT_MANAGEMENT_AVAILABLE = False PROJECT_MANAGEMENT_IMPORT_ERROR = str(e) -# Configure logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler(sys.stderr)] -) -logger = logging.getLogger("dss-mcp-server") +# Configure structured logging +if RUNTIME_AVAILABLE: + # Use structured JSON logging + logger = get_logger("dss.mcp.server") + logger.info("DSS MCP Server initializing with structured logging") +else: + # Fallback to basic logging if runtime not available + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler(sys.stderr)] + ) + logger = logging.getLogger("dss-mcp-server") + logger.warning("Structured logging unavailable - using fallback") # Timeout configuration (seconds) TIMEOUT_CONFIG = { @@ -2733,27 +2741,48 @@ async def dss_rate_limit_status_impl( async def main(): """Run the MCP server""" - logger.info("Starting DSS MCP Server v2.0.0...") - logger.info(f"DSS Path: {DSS_PATH}") - logger.info(f"DSS Available: {DSS_AVAILABLE}") - logger.info(f"Playwright Available: {PLAYWRIGHT_AVAILABLE}") - logger.info(f"LocalBrowserStrategy Available: {LOCAL_BROWSER_STRATEGY_AVAILABLE}") + # Configure log rotation (10MB per file, keep 5 backups) + if RUNTIME_AVAILABLE: + try: + configure_log_rotation(max_bytes=10*1024*1024, backup_count=5) + except Exception as e: + logger.warning("Failed to configure log rotation", extra={"error": str(e)}) + + # Server startup logging with structured data + logger.info("Starting DSS MCP Server", extra={ + "version": "2.0.0", + "dss_path": str(DSS_PATH), + "capabilities": { + "dss": DSS_AVAILABLE, + "playwright": PLAYWRIGHT_AVAILABLE, + "local_browser": LOCAL_BROWSER_STRATEGY_AVAILABLE, + "runtime": RUNTIME_AVAILABLE, + } + }) # Initialize DSS Runtime with boundary enforcement if RUNTIME_AVAILABLE: try: runtime = get_runtime() stats = runtime.get_stats() - logger.info(f"DSS Runtime initialized: {stats['enforcement_mode']} mode") - logger.info("Boundary enforcement: ACTIVE") + logger.info("DSS Runtime initialized", extra={ + "enforcement_mode": stats['enforcement_mode'], + "boundary_enforcement": "ACTIVE", + "stats": stats + }) except Exception as e: - logger.error(f"Failed to initialize runtime: {e}") - logger.warning("Boundary enforcement: DISABLED") + logger.error("Failed to initialize runtime", extra={ + "error": str(e), + "boundary_enforcement": "DISABLED" + }) else: - logger.warning("DSSRuntime not available - boundary enforcement DISABLED") + logger.warning("DSSRuntime not available", extra={ + "boundary_enforcement": "DISABLED", + "import_error": RUNTIME_IMPORT_ERROR if not RUNTIME_AVAILABLE else None + }) if DSS_AVAILABLE: - logger.info(f"DSS Version: {dss.__version__}") + logger.info("DSS module loaded", extra={"version": dss.__version__}) try: async with stdio_server() as (read_stream, write_stream): @@ -2763,7 +2792,10 @@ async def main(): server.create_initialization_options() ) finally: - logger.info("Server shutting down...") + logger.info("Server shutting down", extra={ + "devtools_connected": devtools.connected, + "browser_initialized": browser_state.initialized + }) # Cleanup DevTools if devtools.connected: await devtools_disconnect_impl()