Files
dss/dss-claude-plugin/core/runtime.py
Digital Production Factory 6ac9e7d811 Phase 2 Complete: DSS Runtime & Boundary Enforcement
Implemented dependency injection and boundary enforcement architecture:

NEW FILE: dss-claude-plugin/core/runtime.py (395 lines)
- DSSRuntime class with boundary validation
- Dependency injection pattern for all external API access
- Capability provider pattern (get_figma_client, get_browser, get_http_client)
- Boundary violation logging and enforcement modes (strict/warn/disabled)
- Singleton pattern with get_runtime() helper
- Session-based temp directory management
- Audit trail for all access and violations

UPDATED: dss-claude-plugin/servers/dss-mcp-server.py
- Integrated DSSRuntime initialization in main()
- Updated version to 2.0.0
- Added runtime availability checking
- Logs enforcement mode on startup
- Changed branding: 'Design System Swarm' → 'Design System Server'

BOUNDARY ENFORCEMENT FEATURES:
- Blocks direct external API access (Figma, Browser, HTTP)
- Validates operations against .dss-boundaries.yaml
- Provides wrapped, sandboxed clients instead of raw access
- Logs all violations to .dss/logs/boundary-violations.jsonl
- Logs all access to .dss/logs/runtime-access.jsonl

Next: Phase 3 (Terminology Cleanup) - 67 files to update
2025-12-09 19:21:39 -03:00

309 lines
10 KiB
Python

"""
DSS Runtime - Dependency Injection & Boundary Enforcement
This module provides a bounded runtime environment for DSS MCP tools.
All external API access (Figma, Browser, HTTP) MUST go through this runtime.
Key Features:
- Dependency Injection pattern prevents direct external imports
- Capability Provider pattern controls what operations are allowed
- All access is validated against .dss-boundaries.yaml
- All violations are logged for audit
Usage:
runtime = DSSRuntime(config_path=".dss-boundaries.yaml")
figma_client = runtime.get_figma_client() # Validated & wrapped
browser = runtime.get_browser() # Sandboxed
"""
import logging
import json
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime
import yaml
# Setup logging
logger = logging.getLogger("dss.runtime")
class BoundaryViolationError(Exception):
"""Raised when an operation violates DSS boundaries"""
pass
class DSSRuntime:
"""
Bounded runtime environment for DSS operations.
Enforces architectural boundaries by:
1. Controlling all external API access
2. Validating operations against boundary configuration
3. Logging all access for audit trail
4. Providing sandboxed clients instead of raw access
"""
def __init__(self, config_path: str = ".dss-boundaries.yaml"):
"""
Initialize DSS Runtime with boundary configuration.
Args:
config_path: Path to boundary configuration file
"""
self.config_path = Path(config_path)
self.config = self._load_config()
self.enforcement_mode = self.config.get("enforcement", {}).get("mode", "strict")
self.log_violations = self.config.get("enforcement", {}).get("log_violations", True)
self.violation_log_path = Path(self.config.get("enforcement", {}).get("violation_log", ".dss/logs/boundary-violations.jsonl"))
# Client caches (lazy initialization)
self._figma_client = None
self._browser_strategy = None
self._http_client = None
logger.info(f"DSSRuntime initialized with enforcement mode: {self.enforcement_mode}")
def _load_config(self) -> Dict[str, Any]:
"""Load boundary configuration from YAML"""
if not self.config_path.exists():
logger.warning(f"Boundary config not found: {self.config_path}, using defaults")
return self._default_config()
try:
with open(self.config_path) as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load boundary config: {e}")
return self._default_config()
def _default_config(self) -> Dict[str, Any]:
"""Default boundary configuration (strict)"""
return {
"version": "1.0",
"blocked_external_apis": ["api.figma.com"],
"blocked_imports": ["requests", "playwright", "httpx"],
"enforcement": {
"mode": "strict",
"log_violations": True,
"violation_log": ".dss/logs/boundary-violations.jsonl"
}
}
def _log_violation(self, operation: str, details: Dict[str, Any]):
"""Log boundary violation to audit trail"""
if not self.log_violations:
return
self.violation_log_path.parent.mkdir(parents=True, exist_ok=True)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "boundary_violation",
"operation": operation,
"enforcement_mode": self.enforcement_mode,
"details": details
}
with open(self.violation_log_path, "a") as f:
f.write(json.dumps(log_entry) + "\n")
logger.warning(f"Boundary violation: {operation} - {details}")
def _log_access(self, operation: str, allowed: bool, details: Dict[str, Any]):
"""Log successful access for audit trail"""
access_log_path = Path(".dss/logs/runtime-access.jsonl")
access_log_path.parent.mkdir(parents=True, exist_ok=True)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "runtime_access",
"operation": operation,
"allowed": allowed,
"details": details
}
with open(access_log_path, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def validate_operation(self, operation: str, context: Dict[str, Any]) -> bool:
"""
Validate if an operation is allowed by DSS boundaries.
Args:
operation: Operation name (e.g., "figma_api_call", "browser_launch")
context: Operation context for validation
Returns:
True if allowed, raises BoundaryViolationError if not (in strict mode)
"""
required_tools = self.config.get("required_dss_tools", {})
# Check if operation requires going through DSS tools
for category, tools in required_tools.items():
if operation in category:
details = {
"operation": operation,
"context": context,
"required_tools": tools
}
self._log_violation(operation, details)
if self.enforcement_mode == "strict":
raise BoundaryViolationError(
f"Direct {operation} blocked. Use DSS tools: {', '.join(tools)}"
)
elif self.enforcement_mode == "warn":
logger.warning(f"Boundary warning: {operation} should use DSS tools")
return True
self._log_access(operation, True, context)
return True
def get_figma_client(self, token: Optional[str] = None):
"""
Get a wrapped Figma API client with boundary enforcement.
Args:
token: Optional Figma token (uses env var if not provided)
Returns:
SafeFigmaClient instance (read-only by default)
"""
if self._figma_client is None:
from core.safe_figma_client import SafeFigmaClient
self._figma_client = SafeFigmaClient(
token=token,
allow_write=False, # Read-only by default
runtime=self
)
logger.info("Figma client initialized (read-only mode)")
return self._figma_client
def get_browser(self, strategy: str = "local"):
"""
Get a sandboxed browser automation instance.
Args:
strategy: Browser strategy ("local" or "remote")
Returns:
BrowserStrategy instance with sandbox enforcement
"""
if self._browser_strategy is None:
if strategy == "local":
try:
from strategies.local.browser import LocalBrowserStrategy
self._browser_strategy = LocalBrowserStrategy(runtime=self)
logger.info("Local browser strategy initialized")
except ImportError:
raise BoundaryViolationError(
"LocalBrowserStrategy not available. Use dss_browser_* tools."
)
elif strategy == "remote":
try:
from strategies.remote.browser import RemoteBrowserStrategy
self._browser_strategy = RemoteBrowserStrategy(runtime=self)
logger.info("Remote browser strategy initialized")
except ImportError:
raise BoundaryViolationError(
"RemoteBrowserStrategy not available. Use dss_browser_* tools."
)
return self._browser_strategy
def get_http_client(self):
"""
Get a wrapped HTTP client with URL validation.
Returns:
SafeHTTPClient instance that validates URLs against allowed domains
"""
if self._http_client is None:
from core.safe_http_client import SafeHTTPClient
self._http_client = SafeHTTPClient(
blocked_domains=self.config.get("blocked_external_apis", []),
runtime=self
)
logger.info("HTTP client initialized with URL validation")
return self._http_client
def check_import(self, module_name: str) -> bool:
"""
Check if a direct import is allowed.
Args:
module_name: Module being imported
Returns:
True if allowed, raises BoundaryViolationError if blocked
"""
blocked = self.config.get("blocked_imports", [])
if module_name in blocked:
details = {
"module": module_name,
"blocked_imports": blocked
}
self._log_violation(f"direct_import:{module_name}", details)
if self.enforcement_mode == "strict":
raise BoundaryViolationError(
f"Direct import of '{module_name}' blocked. "
f"Use DSS runtime clients instead."
)
return True
def get_temp_dir(self, session_id: Optional[str] = None) -> Path:
"""
Get session-specific temporary directory.
Args:
session_id: Optional session identifier (auto-generated if not provided)
Returns:
Path to session temp directory
"""
if session_id is None:
session_id = f"session-{int(datetime.utcnow().timestamp())}"
temp_dir = Path(".dss/temp") / session_id
temp_dir.mkdir(parents=True, exist_ok=True)
return temp_dir
def get_stats(self) -> Dict[str, Any]:
"""
Get runtime statistics.
Returns:
Dictionary with access counts, violations, etc.
"""
return {
"enforcement_mode": self.enforcement_mode,
"clients_initialized": {
"figma": self._figma_client is not None,
"browser": self._browser_strategy is not None,
"http": self._http_client is not None,
},
"config_version": self.config.get("version", "unknown")
}
# Global runtime instance (singleton pattern)
_runtime_instance: Optional[DSSRuntime] = None
def get_runtime() -> DSSRuntime:
"""Get the global DSSRuntime instance (singleton)"""
global _runtime_instance
if _runtime_instance is None:
_runtime_instance = DSSRuntime()
return _runtime_instance