""" DSS Runtime - Dependency Injection & Boundary Enforcement This module provides a bounded runtime environment for DSS MCP tools. All external API access (Figma, Browser, HTTP) MUST go through this runtime. Key Features: - Dependency Injection pattern prevents direct external imports - Capability Provider pattern controls what operations are allowed - All access is validated against .dss-boundaries.yaml - All violations are logged for audit Usage: runtime = DSSRuntime(config_path=".dss-boundaries.yaml") figma_client = runtime.get_figma_client() # Validated & wrapped browser = runtime.get_browser() # Sandboxed """ import logging import json from pathlib import Path from typing import Optional, Dict, Any, List from datetime import datetime import yaml # Setup logging logger = logging.getLogger("dss.runtime") class BoundaryViolationError(Exception): """Raised when an operation violates DSS boundaries""" pass class DSSRuntime: """ Bounded runtime environment for DSS operations. Enforces architectural boundaries by: 1. Controlling all external API access 2. Validating operations against boundary configuration 3. Logging all access for audit trail 4. Providing sandboxed clients instead of raw access """ def __init__(self, config_path: str = ".dss-boundaries.yaml"): """ Initialize DSS Runtime with boundary configuration. Args: config_path: Path to boundary configuration file """ self.config_path = Path(config_path) self.config = self._load_config() self.enforcement_mode = self.config.get("enforcement", {}).get("mode", "strict") self.log_violations = self.config.get("enforcement", {}).get("log_violations", True) self.violation_log_path = Path(self.config.get("enforcement", {}).get("violation_log", ".dss/logs/boundary-violations.jsonl")) # Client caches (lazy initialization) self._figma_client = None self._browser_strategy = None self._http_client = None logger.info(f"DSSRuntime initialized with enforcement mode: {self.enforcement_mode}") def _load_config(self) -> Dict[str, Any]: """Load boundary configuration from YAML""" if not self.config_path.exists(): logger.warning(f"Boundary config not found: {self.config_path}, using defaults") return self._default_config() try: with open(self.config_path) as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load boundary config: {e}") return self._default_config() def _default_config(self) -> Dict[str, Any]: """Default boundary configuration (strict)""" return { "version": "1.0", "blocked_external_apis": ["api.figma.com"], "blocked_imports": ["requests", "playwright", "httpx"], "enforcement": { "mode": "strict", "log_violations": True, "violation_log": ".dss/logs/boundary-violations.jsonl" } } def _log_violation(self, operation: str, details: Dict[str, Any]): """Log boundary violation to audit trail""" if not self.log_violations: return self.violation_log_path.parent.mkdir(parents=True, exist_ok=True) log_entry = { "timestamp": datetime.utcnow().isoformat(), "type": "boundary_violation", "operation": operation, "enforcement_mode": self.enforcement_mode, "details": details } with open(self.violation_log_path, "a") as f: f.write(json.dumps(log_entry) + "\n") logger.warning(f"Boundary violation: {operation} - {details}") def _log_access(self, operation: str, allowed: bool, details: Dict[str, Any]): """Log successful access for audit trail""" access_log_path = Path(".dss/logs/runtime-access.jsonl") access_log_path.parent.mkdir(parents=True, exist_ok=True) log_entry = { "timestamp": datetime.utcnow().isoformat(), "type": "runtime_access", "operation": operation, "allowed": allowed, "details": details } with open(access_log_path, "a") as f: f.write(json.dumps(log_entry) + "\n") def validate_operation(self, operation: str, context: Dict[str, Any]) -> bool: """ Validate if an operation is allowed by DSS boundaries. Args: operation: Operation name (e.g., "figma_api_call", "browser_launch") context: Operation context for validation Returns: True if allowed, raises BoundaryViolationError if not (in strict mode) """ required_tools = self.config.get("required_dss_tools", {}) # Check if operation requires going through DSS tools for category, tools in required_tools.items(): if operation in category: details = { "operation": operation, "context": context, "required_tools": tools } self._log_violation(operation, details) if self.enforcement_mode == "strict": raise BoundaryViolationError( f"Direct {operation} blocked. Use DSS tools: {', '.join(tools)}" ) elif self.enforcement_mode == "warn": logger.warning(f"Boundary warning: {operation} should use DSS tools") return True self._log_access(operation, True, context) return True def get_figma_client(self, token: Optional[str] = None): """ Get a wrapped Figma API client with boundary enforcement. Args: token: Optional Figma token (uses env var if not provided) Returns: SafeFigmaClient instance (read-only by default) """ if self._figma_client is None: from core.safe_figma_client import SafeFigmaClient self._figma_client = SafeFigmaClient( token=token, allow_write=False, # Read-only by default runtime=self ) logger.info("Figma client initialized (read-only mode)") return self._figma_client def get_browser(self, strategy: str = "local"): """ Get a sandboxed browser automation instance. Args: strategy: Browser strategy ("local" or "remote") Returns: BrowserStrategy instance with sandbox enforcement """ if self._browser_strategy is None: if strategy == "local": try: from strategies.local.browser import LocalBrowserStrategy self._browser_strategy = LocalBrowserStrategy(runtime=self) logger.info("Local browser strategy initialized") except ImportError: raise BoundaryViolationError( "LocalBrowserStrategy not available. Use dss_browser_* tools." ) elif strategy == "remote": try: from strategies.remote.browser import RemoteBrowserStrategy self._browser_strategy = RemoteBrowserStrategy(runtime=self) logger.info("Remote browser strategy initialized") except ImportError: raise BoundaryViolationError( "RemoteBrowserStrategy not available. Use dss_browser_* tools." ) return self._browser_strategy def get_http_client(self): """ Get a wrapped HTTP client with URL validation. Returns: SafeHTTPClient instance that validates URLs against allowed domains """ if self._http_client is None: from core.safe_http_client import SafeHTTPClient self._http_client = SafeHTTPClient( blocked_domains=self.config.get("blocked_external_apis", []), runtime=self ) logger.info("HTTP client initialized with URL validation") return self._http_client def check_import(self, module_name: str) -> bool: """ Check if a direct import is allowed. Args: module_name: Module being imported Returns: True if allowed, raises BoundaryViolationError if blocked """ blocked = self.config.get("blocked_imports", []) if module_name in blocked: details = { "module": module_name, "blocked_imports": blocked } self._log_violation(f"direct_import:{module_name}", details) if self.enforcement_mode == "strict": raise BoundaryViolationError( f"Direct import of '{module_name}' blocked. " f"Use DSS runtime clients instead." ) return True def get_temp_dir(self, session_id: Optional[str] = None) -> Path: """ Get session-specific temporary directory. Args: session_id: Optional session identifier (auto-generated if not provided) Returns: Path to session temp directory """ if session_id is None: session_id = f"session-{int(datetime.utcnow().timestamp())}" temp_dir = Path(".dss/temp") / session_id temp_dir.mkdir(parents=True, exist_ok=True) return temp_dir def get_stats(self) -> Dict[str, Any]: """ Get runtime statistics. Returns: Dictionary with access counts, violations, etc. """ return { "enforcement_mode": self.enforcement_mode, "clients_initialized": { "figma": self._figma_client is not None, "browser": self._browser_strategy is not None, "http": self._http_client is not None, }, "config_version": self.config.get("version", "unknown") } # Global runtime instance (singleton pattern) _runtime_instance: Optional[DSSRuntime] = None def get_runtime() -> DSSRuntime: """Get the global DSSRuntime instance (singleton)""" global _runtime_instance if _runtime_instance is None: _runtime_instance = DSSRuntime() return _runtime_instance