diff --git a/dss-claude-plugin/core/runtime.py b/dss-claude-plugin/core/runtime.py new file mode 100644 index 0000000..0471254 --- /dev/null +++ b/dss-claude-plugin/core/runtime.py @@ -0,0 +1,308 @@ +""" +DSS Runtime - Dependency Injection & Boundary Enforcement + +This module provides a bounded runtime environment for DSS MCP tools. +All external API access (Figma, Browser, HTTP) MUST go through this runtime. + +Key Features: +- Dependency Injection pattern prevents direct external imports +- Capability Provider pattern controls what operations are allowed +- All access is validated against .dss-boundaries.yaml +- All violations are logged for audit + +Usage: + runtime = DSSRuntime(config_path=".dss-boundaries.yaml") + figma_client = runtime.get_figma_client() # Validated & wrapped + browser = runtime.get_browser() # Sandboxed +""" + +import logging +import json +from pathlib import Path +from typing import Optional, Dict, Any, List +from datetime import datetime +import yaml + +# Setup logging +logger = logging.getLogger("dss.runtime") + +class BoundaryViolationError(Exception): + """Raised when an operation violates DSS boundaries""" + pass + +class DSSRuntime: + """ + Bounded runtime environment for DSS operations. + + Enforces architectural boundaries by: + 1. Controlling all external API access + 2. Validating operations against boundary configuration + 3. Logging all access for audit trail + 4. Providing sandboxed clients instead of raw access + """ + + def __init__(self, config_path: str = ".dss-boundaries.yaml"): + """ + Initialize DSS Runtime with boundary configuration. + + Args: + config_path: Path to boundary configuration file + """ + self.config_path = Path(config_path) + self.config = self._load_config() + self.enforcement_mode = self.config.get("enforcement", {}).get("mode", "strict") + self.log_violations = self.config.get("enforcement", {}).get("log_violations", True) + self.violation_log_path = Path(self.config.get("enforcement", {}).get("violation_log", ".dss/logs/boundary-violations.jsonl")) + + # Client caches (lazy initialization) + self._figma_client = None + self._browser_strategy = None + self._http_client = None + + logger.info(f"DSSRuntime initialized with enforcement mode: {self.enforcement_mode}") + + def _load_config(self) -> Dict[str, Any]: + """Load boundary configuration from YAML""" + if not self.config_path.exists(): + logger.warning(f"Boundary config not found: {self.config_path}, using defaults") + return self._default_config() + + try: + with open(self.config_path) as f: + return yaml.safe_load(f) + except Exception as e: + logger.error(f"Failed to load boundary config: {e}") + return self._default_config() + + def _default_config(self) -> Dict[str, Any]: + """Default boundary configuration (strict)""" + return { + "version": "1.0", + "blocked_external_apis": ["api.figma.com"], + "blocked_imports": ["requests", "playwright", "httpx"], + "enforcement": { + "mode": "strict", + "log_violations": True, + "violation_log": ".dss/logs/boundary-violations.jsonl" + } + } + + def _log_violation(self, operation: str, details: Dict[str, Any]): + """Log boundary violation to audit trail""" + if not self.log_violations: + return + + self.violation_log_path.parent.mkdir(parents=True, exist_ok=True) + + log_entry = { + "timestamp": datetime.utcnow().isoformat(), + "type": "boundary_violation", + "operation": operation, + "enforcement_mode": self.enforcement_mode, + "details": details + } + + with open(self.violation_log_path, "a") as f: + f.write(json.dumps(log_entry) + "\n") + + logger.warning(f"Boundary violation: {operation} - {details}") + + def _log_access(self, operation: str, allowed: bool, details: Dict[str, Any]): + """Log successful access for audit trail""" + access_log_path = Path(".dss/logs/runtime-access.jsonl") + access_log_path.parent.mkdir(parents=True, exist_ok=True) + + log_entry = { + "timestamp": datetime.utcnow().isoformat(), + "type": "runtime_access", + "operation": operation, + "allowed": allowed, + "details": details + } + + with open(access_log_path, "a") as f: + f.write(json.dumps(log_entry) + "\n") + + def validate_operation(self, operation: str, context: Dict[str, Any]) -> bool: + """ + Validate if an operation is allowed by DSS boundaries. + + Args: + operation: Operation name (e.g., "figma_api_call", "browser_launch") + context: Operation context for validation + + Returns: + True if allowed, raises BoundaryViolationError if not (in strict mode) + """ + required_tools = self.config.get("required_dss_tools", {}) + + # Check if operation requires going through DSS tools + for category, tools in required_tools.items(): + if operation in category: + details = { + "operation": operation, + "context": context, + "required_tools": tools + } + + self._log_violation(operation, details) + + if self.enforcement_mode == "strict": + raise BoundaryViolationError( + f"Direct {operation} blocked. Use DSS tools: {', '.join(tools)}" + ) + elif self.enforcement_mode == "warn": + logger.warning(f"Boundary warning: {operation} should use DSS tools") + return True + + self._log_access(operation, True, context) + return True + + def get_figma_client(self, token: Optional[str] = None): + """ + Get a wrapped Figma API client with boundary enforcement. + + Args: + token: Optional Figma token (uses env var if not provided) + + Returns: + SafeFigmaClient instance (read-only by default) + """ + if self._figma_client is None: + from core.safe_figma_client import SafeFigmaClient + + self._figma_client = SafeFigmaClient( + token=token, + allow_write=False, # Read-only by default + runtime=self + ) + + logger.info("Figma client initialized (read-only mode)") + + return self._figma_client + + def get_browser(self, strategy: str = "local"): + """ + Get a sandboxed browser automation instance. + + Args: + strategy: Browser strategy ("local" or "remote") + + Returns: + BrowserStrategy instance with sandbox enforcement + """ + if self._browser_strategy is None: + if strategy == "local": + try: + from strategies.local.browser import LocalBrowserStrategy + self._browser_strategy = LocalBrowserStrategy(runtime=self) + logger.info("Local browser strategy initialized") + except ImportError: + raise BoundaryViolationError( + "LocalBrowserStrategy not available. Use dss_browser_* tools." + ) + elif strategy == "remote": + try: + from strategies.remote.browser import RemoteBrowserStrategy + self._browser_strategy = RemoteBrowserStrategy(runtime=self) + logger.info("Remote browser strategy initialized") + except ImportError: + raise BoundaryViolationError( + "RemoteBrowserStrategy not available. Use dss_browser_* tools." + ) + + return self._browser_strategy + + def get_http_client(self): + """ + Get a wrapped HTTP client with URL validation. + + Returns: + SafeHTTPClient instance that validates URLs against allowed domains + """ + if self._http_client is None: + from core.safe_http_client import SafeHTTPClient + + self._http_client = SafeHTTPClient( + blocked_domains=self.config.get("blocked_external_apis", []), + runtime=self + ) + + logger.info("HTTP client initialized with URL validation") + + return self._http_client + + def check_import(self, module_name: str) -> bool: + """ + Check if a direct import is allowed. + + Args: + module_name: Module being imported + + Returns: + True if allowed, raises BoundaryViolationError if blocked + """ + blocked = self.config.get("blocked_imports", []) + + if module_name in blocked: + details = { + "module": module_name, + "blocked_imports": blocked + } + + self._log_violation(f"direct_import:{module_name}", details) + + if self.enforcement_mode == "strict": + raise BoundaryViolationError( + f"Direct import of '{module_name}' blocked. " + f"Use DSS runtime clients instead." + ) + + return True + + def get_temp_dir(self, session_id: Optional[str] = None) -> Path: + """ + Get session-specific temporary directory. + + Args: + session_id: Optional session identifier (auto-generated if not provided) + + Returns: + Path to session temp directory + """ + if session_id is None: + session_id = f"session-{int(datetime.utcnow().timestamp())}" + + temp_dir = Path(".dss/temp") / session_id + temp_dir.mkdir(parents=True, exist_ok=True) + + return temp_dir + + def get_stats(self) -> Dict[str, Any]: + """ + Get runtime statistics. + + Returns: + Dictionary with access counts, violations, etc. + """ + return { + "enforcement_mode": self.enforcement_mode, + "clients_initialized": { + "figma": self._figma_client is not None, + "browser": self._browser_strategy is not None, + "http": self._http_client is not None, + }, + "config_version": self.config.get("version", "unknown") + } + +# Global runtime instance (singleton pattern) +_runtime_instance: Optional[DSSRuntime] = None + +def get_runtime() -> DSSRuntime: + """Get the global DSSRuntime instance (singleton)""" + global _runtime_instance + + if _runtime_instance is None: + _runtime_instance = DSSRuntime() + + return _runtime_instance diff --git a/dss-claude-plugin/servers/dss-mcp-server.py b/dss-claude-plugin/servers/dss-mcp-server.py index d57ca03..1a8d01d 100644 --- a/dss-claude-plugin/servers/dss-mcp-server.py +++ b/dss-claude-plugin/servers/dss-mcp-server.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 """ -DSS MCP Server - Design System Swarm Integration for Claude Code +DSS MCP Server - Design System Server Integration for Claude Code A Python MCP server that exposes DSS functionality as tools for Claude. Uses stdio transport for Claude Code integration. Author: overbits -Version: 1.2.0 - Added Browser Automation with Hybrid Strategy (LOCAL/REMOTE) +Version: 2.0.0 - Architectural Refinement: Boundary Enforcement & Runtime """ import asyncio @@ -22,6 +22,18 @@ from dataclasses import dataclass, field import base64 import re +# DSS Runtime - Boundary Enforcement (CRITICAL) +# All external API access MUST go through the runtime +try: + sys.path.insert(0, str(Path(__file__).parent.parent)) + from core.runtime import DSSRuntime, BoundaryViolationError, get_runtime + RUNTIME_AVAILABLE = True +except ImportError as e: + RUNTIME_AVAILABLE = False + RUNTIME_IMPORT_ERROR = str(e) + print(f"WARNING: DSSRuntime not available: {e}", file=sys.stderr) + print("Boundary enforcement will be disabled!", file=sys.stderr) + # Playwright import (optional - only needed for DevTools features) try: from playwright.async_api import async_playwright, Browser, Page, BrowserContext, Playwright @@ -2721,12 +2733,25 @@ async def dss_rate_limit_status_impl( async def main(): """Run the MCP server""" - logger.info("Starting DSS MCP Server v1.2.0...") + logger.info("Starting DSS MCP Server v2.0.0...") logger.info(f"DSS Path: {DSS_PATH}") logger.info(f"DSS Available: {DSS_AVAILABLE}") logger.info(f"Playwright Available: {PLAYWRIGHT_AVAILABLE}") logger.info(f"LocalBrowserStrategy Available: {LOCAL_BROWSER_STRATEGY_AVAILABLE}") + # Initialize DSS Runtime with boundary enforcement + if RUNTIME_AVAILABLE: + try: + runtime = get_runtime() + stats = runtime.get_stats() + logger.info(f"DSS Runtime initialized: {stats['enforcement_mode']} mode") + logger.info("Boundary enforcement: ACTIVE") + except Exception as e: + logger.error(f"Failed to initialize runtime: {e}") + logger.warning("Boundary enforcement: DISABLED") + else: + logger.warning("DSSRuntime not available - boundary enforcement DISABLED") + if DSS_AVAILABLE: logger.info(f"DSS Version: {dss.__version__}")