Revert "chore: Remove dss-claude-plugin directory"
Some checks failed
DSS Project Analysis / dss-context-update (push) Has been cancelled

This reverts commit 72cb7319f5.
This commit is contained in:
2025-12-10 15:54:39 -03:00
parent 72cb7319f5
commit 4de266de61
50 changed files with 10243 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
"""
DSS Core Module - Configuration and Context Management
Extended with Context Compiler for design system context resolution.
"""
from .config import DSSConfig, DSSMode
from .context import DSSContext
from .compiler import ContextCompiler, EMERGENCY_SKIN
from .mcp_extensions import (
get_active_context,
resolve_token,
validate_manifest,
list_skins,
get_compiler_status,
with_context,
COMPILER
)
__all__ = [
"DSSConfig",
"DSSMode",
"DSSContext",
"ContextCompiler",
"EMERGENCY_SKIN",
"get_active_context",
"resolve_token",
"validate_manifest",
"list_skins",
"get_compiler_status",
"with_context",
"COMPILER"
]

View File

@@ -0,0 +1,179 @@
"""
DSS Context Compiler
Resolves project context via 3-layer cascade: Base -> Skin -> Project
Includes Safe Boot Protocol and Debug Provenance.
"""
import json
import os
import copy
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List, Union
from pathlib import Path
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("DSSCompiler")
# --- SAFE BOOT PROTOCOL ---
# Hardcoded emergency skin in case file system or JSON parsing fails catastrophicly
EMERGENCY_SKIN = {
"meta": {"id": "emergency", "version": "1.0.0"},
"tokens": {
"colors": {
"primary": "#FF0000",
"background": "#FFFFFF",
"text": "#000000"
},
"spacing": {"base": "4px"}
},
"status": "emergency_mode"
}
class ContextCompiler:
def __init__(self, skins_dir: str = "./skins"):
self.skins_dir = Path(skins_dir)
self.cache: Dict[str, Any] = {}
self._manifest_mtimes: Dict[str, float] = {} # Track file modification times
def compile(self, manifest_path: str, debug: bool = False, force_refresh: bool = False) -> Dict[str, Any]:
"""
Main entry point. Compiles context by merging:
1. Base Skin (Implicit or Explicit)
2. Extended Skin (defined in manifest)
3. Project Overrides (defined in manifest)
Args:
manifest_path: Path to ds.config.json
debug: Enable provenance tracking
force_refresh: Bypass cache and recompile (for long-running servers)
"""
try:
# Check cache with mtime validation (unless force_refresh or debug mode)
# Note: Debug mode bypasses cache because provenance must be recalculated
cache_key = f"{manifest_path}:debug={debug}"
if not force_refresh and not debug and cache_key in self.cache:
# Verify manifest hasn't changed
manifest_file = Path(manifest_path)
if manifest_file.exists():
current_mtime = manifest_file.stat().st_mtime
cached_mtime = self._manifest_mtimes.get(cache_key, 0)
if current_mtime == cached_mtime:
logger.debug(f"Cache hit for {manifest_path}")
return self.cache[cache_key]
else:
logger.info(f"Manifest modified, invalidating cache: {manifest_path}")
# 1. Load Project Manifest
manifest = self._load_json(manifest_path)
# 2. Resolve Skin
skin_id = manifest.get("extends", {}).get("skin", "classic")
skin = self._load_skin(skin_id)
# 3. Resolve Base (Single Inheritance Enforced)
# If the skin extends another, we merge that first.
# Simplified for Phase 1: We assume all skins extend 'base' implicitly unless specified
base_skin = self._load_skin("base")
# 4. Cascade Merge: Base -> Skin -> Project
# Merge Base + Skin
context = self._deep_merge(base_skin, skin, path="base->skin", debug=debug)
# Merge Result + Project Overrides
# Need to wrap project overrides in same structure as skins
project_overrides_wrapped = {
"tokens": manifest.get("overrides", {}).get("tokens", {})
}
final_context = self._deep_merge(context, project_overrides_wrapped, path="skin->project", debug=debug)
# Inject Metadata
final_context["_meta"] = {
"project_id": manifest["project"]["id"],
"compiled_at": datetime.now(timezone.utc).isoformat(),
"debug_enabled": debug,
"compiler_config": manifest.get("compiler", {})
}
if debug:
final_context["_provenance"] = self.provenance_log
# Cache result with mtime tracking (only cache non-debug mode results)
if not debug:
manifest_file = Path(manifest_path)
if manifest_file.exists():
cache_key = f"{manifest_path}:debug={debug}"
self.cache[cache_key] = final_context
self._manifest_mtimes[cache_key] = manifest_file.stat().st_mtime
logger.debug(f"Cached compilation result for {manifest_path}")
return final_context
except Exception as e:
logger.error(f"Compiler specific error: {e}")
logger.warning("Initiating SAFE BOOT PROTOCOL")
return self._enter_safe_mode(e)
def _load_skin(self, skin_id: str) -> Dict[str, Any]:
"""Loads a skin by ID from the skins directory."""
# Simple caching strategy
if skin_id in self.cache:
return self.cache[skin_id]
# Security: Prevent path traversal attacks
path = (self.skins_dir / f"{skin_id}.json").resolve()
if not str(path).startswith(str(self.skins_dir.resolve())):
raise ValueError(f"Invalid skin ID (path traversal detected): {skin_id}")
if not path.exists():
logger.warning(f"Skin {skin_id} not found, falling back to base.")
if skin_id == "base":
# Return emergency tokens if base is missing
return EMERGENCY_SKIN
return self._load_skin("base")
data = self._load_json(str(path))
self.cache[skin_id] = data
return data
def _load_json(self, path: str) -> Dict[str, Any]:
with open(path, 'r') as f:
return json.load(f)
def _deep_merge(self, base: Dict, override: Dict, path: str = "", debug: bool = False, provenance: List[Dict] = None) -> Dict:
"""
Deep merge dictionaries. Replaces arrays.
Populates provenance list if debug is True (thread-safe).
"""
# Thread-safe: use method parameter instead of instance variable
if provenance is None and debug:
provenance = []
# Store reference on first call for later retrieval
if not hasattr(self, 'provenance_log'):
self.provenance_log = provenance
result = copy.deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and key in result and isinstance(result[key], dict):
# Recursive merge - pass provenance down
result[key] = self._deep_merge(result[key], value, path=f"{path}.{key}", debug=debug, provenance=provenance)
else:
# Direct replacement (Primitive or Array)
if debug and provenance is not None:
provenance.append({
"key": key,
"action": "override",
"layer": path,
"value_type": type(value).__name__
})
result[key] = copy.deepcopy(value)
return result
def _enter_safe_mode(self, error: Exception) -> Dict[str, Any]:
"""Returns the hardcoded emergency skin with error details."""
safe_context = copy.deepcopy(EMERGENCY_SKIN)
safe_context["_error"] = str(error)
return safe_context

View File

@@ -0,0 +1,161 @@
"""
DSS Configuration Module
========================
Handles configuration management for the Design System Server (DSS) Claude Plugin.
Supports local/remote mode detection, persistent configuration storage, and
environment variable overrides.
"""
import os
import json
import uuid
import asyncio
import logging
from enum import Enum
from pathlib import Path
from typing import Optional, Union, Any
import aiohttp
from pydantic import BaseModel, Field, HttpUrl, ValidationError
# Configure module-level logger
logger = logging.getLogger(__name__)
CONFIG_DIR = Path.home() / ".dss"
CONFIG_FILE = CONFIG_DIR / "config.json"
DEFAULT_REMOTE_URL = "https://dss.overbits.luz.uy"
DEFAULT_LOCAL_URL = "http://localhost:6006"
class DSSMode(str, Enum):
"""Operation modes for the DSS plugin."""
LOCAL = "local"
REMOTE = "remote"
AUTO = "auto"
class DSSConfig(BaseModel):
"""
Configuration model for DSS Plugin.
Attributes:
mode (DSSMode): The configured operation mode (default: AUTO).
remote_url (str): URL for the remote DSS API.
local_url (str): URL for the local DSS API (usually localhost).
session_id (str): Unique identifier for this client instance.
"""
mode: DSSMode = Field(default=DSSMode.AUTO, description="Operation mode preference")
remote_url: str = Field(default=DEFAULT_REMOTE_URL, description="Remote API endpoint")
local_url: str = Field(default=DEFAULT_LOCAL_URL, description="Local API endpoint")
session_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Persistent session ID")
class Config:
validate_assignment = True
extra = "ignore" # Allow forward compatibility with new config keys
@classmethod
def load(cls) -> "DSSConfig":
"""
Load configuration from ~/.dss/config.json.
Returns a default instance if the file does not exist or is invalid.
"""
if not CONFIG_FILE.exists():
logger.debug(f"No config found at {CONFIG_FILE}, using defaults.")
return cls()
try:
content = CONFIG_FILE.read_text(encoding="utf-8")
data = json.loads(content)
# Ensure complex types are handled by Pydantic validation
return cls.model_validate(data)
except (json.JSONDecodeError, ValidationError) as e:
logger.warning(f"Failed to load config from {CONFIG_FILE}: {e}. Using defaults.")
return cls()
except Exception as e:
logger.error(f"Unexpected error loading config: {e}")
return cls()
def save(self) -> None:
"""
Save the current configuration to ~/.dss/config.json.
Creates the directory if it does not exist.
"""
try:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Export using mode='json' to handle enums and urls correctly
json_data = self.model_dump_json(indent=2)
CONFIG_FILE.write_text(json_data, encoding="utf-8")
logger.debug(f"Configuration saved to {CONFIG_FILE}")
except Exception as e:
logger.error(f"Failed to save config to {CONFIG_FILE}: {e}")
raise
async def get_active_mode(self) -> DSSMode:
"""
Determine the actual runtime mode based on priority rules.
Priority:
1. DSS_MODE environment variable
2. Configured 'mode' (if not AUTO)
3. Auto-detection (ping local health endpoint)
4. Fallback to REMOTE
Returns:
DSSMode: The resolved active mode (LOCAL or REMOTE).
"""
# 1. Check Environment Variable
env_mode = os.getenv("DSS_MODE")
if env_mode:
try:
# Normalize string to enum
return DSSMode(env_mode.lower())
except ValueError:
logger.warning(f"Invalid DSS_MODE env var '{env_mode}', ignoring.")
# 2. Check Configuration (if explicit)
if self.mode != DSSMode.AUTO:
return self.mode
# 3. Auto-detect
logger.info("Auto-detecting DSS mode...")
is_local_healthy = await self._check_local_health()
if is_local_healthy:
logger.info(f"Local server detected at {self.local_url}. Switching to LOCAL mode.")
return DSSMode.LOCAL
else:
logger.info("Local server unreachable. Fallback to REMOTE mode.")
# 4. Fallback
return DSSMode.REMOTE
async def _check_local_health(self) -> bool:
"""
Ping the local server health endpoint to check availability.
Returns:
bool: True if server responds with 200 OK, False otherwise.
"""
health_url = f"{self.local_url.rstrip('/')}/health"
try:
timeout = aiohttp.ClientTimeout(total=2.0) # Short timeout for responsiveness
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(health_url) as response:
if response.status == 200:
return True
logger.debug(f"Local health check returned status {response.status}")
except aiohttp.ClientError as e:
logger.debug(f"Local health check connection failed: {e}")
except Exception as e:
logger.debug(f"Unexpected error during health check: {e}")
return False
def get_api_url(self, active_mode: DSSMode) -> str:
"""
Helper to get the correct API URL for the determined mode.
"""
if active_mode == DSSMode.LOCAL:
return self.local_url
return self.remote_url

View File

@@ -0,0 +1,181 @@
"""
DSS Context Module
==================
Singleton context manager for the DSS Plugin.
Handles configuration loading, mode detection, and strategy instantiation.
"""
import asyncio
import logging
from typing import Optional, Dict, Any
from .config import DSSConfig, DSSMode
# Logger setup
logger = logging.getLogger(__name__)
# Protocol/Type placeholder for Strategies (to be replaced by base class in next steps)
Strategy = Any
class DSSContext:
"""
Singleton context manager for the DSS Plugin.
Handles configuration loading, mode detection (Local/Remote),
and strategy instantiation.
"""
_instance: Optional['DSSContext'] = None
_lock: asyncio.Lock = asyncio.Lock()
def __init__(self) -> None:
"""
Private initializer. Use get_instance() instead.
"""
if DSSContext._instance is not None:
raise RuntimeError("DSSContext is a singleton. Use get_instance() to access it.")
self.config: Optional[DSSConfig] = None
self.active_mode: DSSMode = DSSMode.REMOTE # Default safe fallback
self._capabilities: Dict[str, bool] = {}
self._strategy_cache: Dict[str, Strategy] = {}
self.session_id: Optional[str] = None
@classmethod
async def get_instance(cls) -> 'DSSContext':
"""
Async factory method to get the singleton instance.
Ensures config is loaded and mode is detected before returning.
"""
if not cls._instance:
async with cls._lock:
# Double-check locking pattern
if not cls._instance:
instance = cls()
await instance._initialize()
cls._instance = instance
return cls._instance
@classmethod
def reset(cls) -> None:
"""
Resets the singleton instance. Useful for testing.
"""
cls._instance = None
async def _initialize(self) -> None:
"""
Internal initialization logic:
1. Load Config
2. Detect Mode
3. Cache Capabilities
"""
try:
# 1. Load Configuration
self.config = DSSConfig.load()
self.session_id = self.config.session_id
# 2. Detect Mode (Async check)
self.active_mode = await self.config.get_active_mode()
logger.info(f"DSSContext initialized. Mode: {self.active_mode.value}, Session: {self.session_id}")
# 3. Cache Capabilities
self._cache_capabilities()
except Exception as e:
logger.error(f"Failed to initialize DSSContext: {e}")
# Fallback to defaults if initialization fails
self.active_mode = DSSMode.REMOTE
self._capabilities = {"limited": True}
def _cache_capabilities(self) -> None:
"""
Determines what the plugin can do based on the active mode.
"""
# Base capabilities
caps = {
"can_read_files": False,
"can_execute_browser": False,
"can_screenshot": False,
"can_connect_remote": True
}
if self.active_mode == DSSMode.LOCAL:
# Local mode allows direct filesystem access and local browser control
caps["can_read_files"] = True
caps["can_execute_browser"] = True
caps["can_screenshot"] = True
elif self.active_mode == DSSMode.REMOTE:
# Remote mode relies on API capabilities
# Depending on remote configuration, these might differ
caps["can_execute_browser"] = False # Typically restricted in pure remote unless via API
caps["can_read_files"] = False # Security restriction
self._capabilities = caps
def get_capability(self, key: str) -> bool:
"""Check if a specific capability is active."""
return self._capabilities.get(key, False)
def get_api_url(self) -> str:
"""Get the correct API URL for the current mode."""
if self.config is None:
return "https://dss.overbits.luz.uy" # Default fallback
return self.config.get_api_url(self.active_mode)
def get_strategy(self, strategy_type: str) -> Any:
"""
Factory method to retrieve operational strategies.
Args:
strategy_type: One of 'browser', 'filesystem', 'screenshot'
Returns:
An instance of the requested strategy.
"""
# Return cached strategy if available
if strategy_type in self._strategy_cache:
return self._strategy_cache[strategy_type]
strategy_instance = None
# NOTE: Strategy classes will be implemented in the next step.
# We use local imports here to avoid circular dependency issues
# if strategies define their own types using DSSContext.
try:
if strategy_type == "browser":
# Will be implemented in Phase 2 & 3
if self.active_mode == DSSMode.LOCAL:
from ..strategies.local.browser import LocalBrowserStrategy
strategy_instance = LocalBrowserStrategy(self)
else:
from ..strategies.remote.browser import RemoteBrowserStrategy
strategy_instance = RemoteBrowserStrategy(self)
elif strategy_type == "filesystem":
# Will be implemented in Phase 2
if self.active_mode == DSSMode.LOCAL:
from ..strategies.local.filesystem import LocalFilesystemStrategy
strategy_instance = LocalFilesystemStrategy(self)
else:
from ..strategies.remote.filesystem import RemoteFilesystemStrategy
strategy_instance = RemoteFilesystemStrategy(self)
elif strategy_type == "screenshot":
# Screenshot is part of browser strategy
return self.get_strategy("browser")
else:
raise ValueError(f"Unknown strategy type: {strategy_type}")
except ImportError as e:
logger.error(f"Failed to import strategy {strategy_type}: {e}")
raise NotImplementedError(f"Strategy {strategy_type} not yet implemented") from e
# Cache and return
self._strategy_cache[strategy_type] = strategy_instance
return strategy_instance

View File

@@ -0,0 +1,113 @@
"""
MCP Extensions for Context Awareness
Implements the Factory Pattern to wrap existing tools with context
and defines 5 new tools for the Context Compiler.
"""
from typing import Any, Dict, List, Callable
import functools
import json
import os
from .compiler import ContextCompiler
# Singleton compiler instance
COMPILER = ContextCompiler(skins_dir=os.path.join(os.path.dirname(__file__), "skins"))
# --- FACTORY PATTERN: Context Wrapper ---
def with_context(default_manifest_path: str = None):
"""
Decorator that injects the compiled context into the tool's arguments.
Use this to upgrade existing 'token extractor' tools to be 'context aware'.
The manifest path is extracted from kwargs['manifest_path'] if present,
otherwise falls back to the default_manifest_path provided at decoration time.
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 1. Get manifest path (runtime kwarg or decorator default)
manifest_path = kwargs.get('manifest_path', default_manifest_path)
if not manifest_path:
raise ValueError("No manifest_path provided to context-aware tool")
# 2. Compile Context
context = COMPILER.compile(manifest_path)
# 3. Inject into kwargs
kwargs['dss_context'] = context
# 4. Execute Tool
return func(*args, **kwargs)
return wrapper
return decorator
# --- 5 NEW MCP TOOLS ---
def get_active_context(manifest_path: str, debug: bool = False, force_refresh: bool = False) -> str:
"""
[Tool 1] Returns the fully resolved JSON context for the project.
Set debug=True to see provenance (which layer defined which token).
Set force_refresh=True to bypass cache (for long-running servers).
"""
context = COMPILER.compile(manifest_path, debug=debug, force_refresh=force_refresh)
return json.dumps(context, indent=2)
def resolve_token(manifest_path: str, token_path: str, force_refresh: bool = False) -> str:
"""
[Tool 2] Resolves a specific token value (e.g. 'colors.primary')
through the cascade.
Set force_refresh=True to bypass cache (for long-running servers).
"""
context = COMPILER.compile(manifest_path, force_refresh=force_refresh)
keys = token_path.split('.')
current = context.get("tokens", {})
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return f"Token not found: {token_path}"
return str(current)
def validate_manifest(manifest_path: str) -> str:
"""
[Tool 3] Validates the ds.config.json against the schema.
"""
# In a full implementation, we would use 'jsonschema' library here.
# For now, we perform a basic structural check via the Compiler's loader.
try:
COMPILER.compile(manifest_path)
return "Valid: Project manifest builds successfully."
except Exception as e:
return f"Invalid: {str(e)}"
def list_skins() -> str:
"""
[Tool 4] Lists all available skins in the registry.
"""
skins_path = COMPILER.skins_dir
if not skins_path.exists():
return "No skins directory found."
skins = [f.stem for f in skins_path.glob("*.json")]
return json.dumps(skins)
def get_compiler_status() -> str:
"""
[Tool 5] Returns the health and configuration of the Context Compiler.
"""
status = {
"status": "active",
"skins_directory": str(COMPILER.skins_dir),
"cached_skins": list(COMPILER.cache.keys()),
"safe_boot_ready": True
}
return json.dumps(status, indent=2)
# Instructions for Main Server File:
# 1. Import these tools
# 2. Register them with the MCP server instance
# 3. Apply @with_context wrapper to legacy tools if dynamic context is needed

View File

@@ -0,0 +1,167 @@
"""
MCP Integration Layer for DSS Context Compiler
Provides MCP-compliant tool wrappers for the 5 new context tools.
"""
from typing import Dict, Any
import json
from . import (
get_active_context,
resolve_token,
validate_manifest,
list_skins,
get_compiler_status
)
# MCP Tool Definitions
def mcp_get_resolved_context(manifest_path: str, debug: bool = False, force_refresh: bool = False) -> str:
"""
MCP Tool: Get Active Context
Returns the fully resolved JSON context for a project.
Set debug=True to see provenance (which layer defined which token).
Set force_refresh=True to bypass cache (for long-running servers).
Args:
manifest_path: Path to ds.config.json
debug: Enable debug provenance tracking
force_refresh: Bypass cache and recompile
Returns:
JSON string with resolved context
"""
try:
return get_active_context(manifest_path, debug, force_refresh)
except Exception as e:
return json.dumps({"error": str(e), "status": "failed"})
def mcp_resolve_token(manifest_path: str, token_path: str, force_refresh: bool = False) -> str:
"""
MCP Tool: Resolve Token
Resolves a specific token value (e.g. 'colors.primary') through the cascade.
Set force_refresh=True to bypass cache (for long-running servers).
Args:
manifest_path: Path to ds.config.json
token_path: Dot-notation path to token (e.g. 'colors.primary')
force_refresh: Bypass cache and recompile
Returns:
Resolved token value or error message
"""
try:
return resolve_token(manifest_path, token_path, force_refresh)
except Exception as e:
return f"Error resolving token: {str(e)}"
def mcp_validate_manifest(manifest_path: str) -> str:
"""
MCP Tool: Validate Manifest
Validates the ds.config.json against the schema.
Args:
manifest_path: Path to ds.config.json
Returns:
Validation result message
"""
try:
return validate_manifest(manifest_path)
except Exception as e:
return f"Validation error: {str(e)}"
def mcp_list_skins() -> str:
"""
MCP Tool: List Skins
Lists all available skins in the registry.
Returns:
JSON array of skin IDs
"""
try:
return list_skins()
except Exception as e:
return json.dumps({"error": str(e), "skins": []})
def mcp_get_compiler_status() -> str:
"""
MCP Tool: Get Compiler Status
Returns the health and configuration of the Context Compiler.
Returns:
JSON object with compiler status
"""
try:
return get_compiler_status()
except Exception as e:
return json.dumps({"error": str(e), "status": "error"})
# MCP Tool Registry
# This can be imported by dss-mcp-server.py to register the tools
MCP_TOOLS = {
"dss_get_resolved_context": {
"function": mcp_get_resolved_context,
"description": "Get fully resolved design system context for a project",
"parameters": {
"manifest_path": {
"type": "string",
"description": "Path to ds.config.json",
"required": True
},
"debug": {
"type": "boolean",
"description": "Enable debug provenance tracking",
"required": False,
"default": False
}
}
},
"dss_resolve_token": {
"function": mcp_resolve_token,
"description": "Resolve a specific design token through the cascade",
"parameters": {
"manifest_path": {
"type": "string",
"description": "Path to ds.config.json",
"required": True
},
"token_path": {
"type": "string",
"description": "Dot-notation path to token (e.g. 'colors.primary')",
"required": True
}
}
},
"dss_validate_manifest": {
"function": mcp_validate_manifest,
"description": "Validate project manifest against schema",
"parameters": {
"manifest_path": {
"type": "string",
"description": "Path to ds.config.json",
"required": True
}
}
},
"dss_list_skins": {
"function": mcp_list_skins,
"description": "List all available design system skins",
"parameters": {}
},
"dss_get_compiler_status": {
"function": mcp_get_compiler_status,
"description": "Get Context Compiler health and configuration",
"parameters": {}
}
}

View File

@@ -0,0 +1,308 @@
"""
DSS Runtime - Dependency Injection & Boundary Enforcement
This module provides a bounded runtime environment for DSS MCP tools.
All external API access (Figma, Browser, HTTP) MUST go through this runtime.
Key Features:
- Dependency Injection pattern prevents direct external imports
- Capability Provider pattern controls what operations are allowed
- All access is validated against .dss-boundaries.yaml
- All violations are logged for audit
Usage:
runtime = DSSRuntime(config_path=".dss-boundaries.yaml")
figma_client = runtime.get_figma_client() # Validated & wrapped
browser = runtime.get_browser() # Sandboxed
"""
import logging
import json
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime
import yaml
# Setup logging
logger = logging.getLogger("dss.runtime")
class BoundaryViolationError(Exception):
"""Raised when an operation violates DSS boundaries"""
pass
class DSSRuntime:
"""
Bounded runtime environment for DSS operations.
Enforces architectural boundaries by:
1. Controlling all external API access
2. Validating operations against boundary configuration
3. Logging all access for audit trail
4. Providing sandboxed clients instead of raw access
"""
def __init__(self, config_path: str = ".dss-boundaries.yaml"):
"""
Initialize DSS Runtime with boundary configuration.
Args:
config_path: Path to boundary configuration file
"""
self.config_path = Path(config_path)
self.config = self._load_config()
self.enforcement_mode = self.config.get("enforcement", {}).get("mode", "strict")
self.log_violations = self.config.get("enforcement", {}).get("log_violations", True)
self.violation_log_path = Path(self.config.get("enforcement", {}).get("violation_log", ".dss/logs/boundary-violations.jsonl"))
# Client caches (lazy initialization)
self._figma_client = None
self._browser_strategy = None
self._http_client = None
logger.info(f"DSSRuntime initialized with enforcement mode: {self.enforcement_mode}")
def _load_config(self) -> Dict[str, Any]:
"""Load boundary configuration from YAML"""
if not self.config_path.exists():
logger.warning(f"Boundary config not found: {self.config_path}, using defaults")
return self._default_config()
try:
with open(self.config_path) as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load boundary config: {e}")
return self._default_config()
def _default_config(self) -> Dict[str, Any]:
"""Default boundary configuration (strict)"""
return {
"version": "1.0",
"blocked_external_apis": ["api.figma.com"],
"blocked_imports": ["requests", "playwright", "httpx"],
"enforcement": {
"mode": "strict",
"log_violations": True,
"violation_log": ".dss/logs/boundary-violations.jsonl"
}
}
def _log_violation(self, operation: str, details: Dict[str, Any]):
"""Log boundary violation to audit trail"""
if not self.log_violations:
return
self.violation_log_path.parent.mkdir(parents=True, exist_ok=True)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "boundary_violation",
"operation": operation,
"enforcement_mode": self.enforcement_mode,
"details": details
}
with open(self.violation_log_path, "a") as f:
f.write(json.dumps(log_entry) + "\n")
logger.warning(f"Boundary violation: {operation} - {details}")
def _log_access(self, operation: str, allowed: bool, details: Dict[str, Any]):
"""Log successful access for audit trail"""
access_log_path = Path(".dss/logs/runtime-access.jsonl")
access_log_path.parent.mkdir(parents=True, exist_ok=True)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "runtime_access",
"operation": operation,
"allowed": allowed,
"details": details
}
with open(access_log_path, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def validate_operation(self, operation: str, context: Dict[str, Any]) -> bool:
"""
Validate if an operation is allowed by DSS boundaries.
Args:
operation: Operation name (e.g., "figma_api_call", "browser_launch")
context: Operation context for validation
Returns:
True if allowed, raises BoundaryViolationError if not (in strict mode)
"""
required_tools = self.config.get("required_dss_tools", {})
# Check if operation requires going through DSS tools
for category, tools in required_tools.items():
if operation in category:
details = {
"operation": operation,
"context": context,
"required_tools": tools
}
self._log_violation(operation, details)
if self.enforcement_mode == "strict":
raise BoundaryViolationError(
f"Direct {operation} blocked. Use DSS tools: {', '.join(tools)}"
)
elif self.enforcement_mode == "warn":
logger.warning(f"Boundary warning: {operation} should use DSS tools")
return True
self._log_access(operation, True, context)
return True
def get_figma_client(self, token: Optional[str] = None):
"""
Get a wrapped Figma API client with boundary enforcement.
Args:
token: Optional Figma token (uses env var if not provided)
Returns:
SafeFigmaClient instance (read-only by default)
"""
if self._figma_client is None:
from core.safe_figma_client import SafeFigmaClient
self._figma_client = SafeFigmaClient(
token=token,
allow_write=False, # Read-only by default
runtime=self
)
logger.info("Figma client initialized (read-only mode)")
return self._figma_client
def get_browser(self, strategy: str = "local"):
"""
Get a sandboxed browser automation instance.
Args:
strategy: Browser strategy ("local" or "remote")
Returns:
BrowserStrategy instance with sandbox enforcement
"""
if self._browser_strategy is None:
if strategy == "local":
try:
from strategies.local.browser import LocalBrowserStrategy
self._browser_strategy = LocalBrowserStrategy(runtime=self)
logger.info("Local browser strategy initialized")
except ImportError:
raise BoundaryViolationError(
"LocalBrowserStrategy not available. Use dss_browser_* tools."
)
elif strategy == "remote":
try:
from strategies.remote.browser import RemoteBrowserStrategy
self._browser_strategy = RemoteBrowserStrategy(runtime=self)
logger.info("Remote browser strategy initialized")
except ImportError:
raise BoundaryViolationError(
"RemoteBrowserStrategy not available. Use dss_browser_* tools."
)
return self._browser_strategy
def get_http_client(self):
"""
Get a wrapped HTTP client with URL validation.
Returns:
SafeHTTPClient instance that validates URLs against allowed domains
"""
if self._http_client is None:
from core.safe_http_client import SafeHTTPClient
self._http_client = SafeHTTPClient(
blocked_domains=self.config.get("blocked_external_apis", []),
runtime=self
)
logger.info("HTTP client initialized with URL validation")
return self._http_client
def check_import(self, module_name: str) -> bool:
"""
Check if a direct import is allowed.
Args:
module_name: Module being imported
Returns:
True if allowed, raises BoundaryViolationError if blocked
"""
blocked = self.config.get("blocked_imports", [])
if module_name in blocked:
details = {
"module": module_name,
"blocked_imports": blocked
}
self._log_violation(f"direct_import:{module_name}", details)
if self.enforcement_mode == "strict":
raise BoundaryViolationError(
f"Direct import of '{module_name}' blocked. "
f"Use DSS runtime clients instead."
)
return True
def get_temp_dir(self, session_id: Optional[str] = None) -> Path:
"""
Get session-specific temporary directory.
Args:
session_id: Optional session identifier (auto-generated if not provided)
Returns:
Path to session temp directory
"""
if session_id is None:
session_id = f"session-{int(datetime.utcnow().timestamp())}"
temp_dir = Path(".dss/temp") / session_id
temp_dir.mkdir(parents=True, exist_ok=True)
return temp_dir
def get_stats(self) -> Dict[str, Any]:
"""
Get runtime statistics.
Returns:
Dictionary with access counts, violations, etc.
"""
return {
"enforcement_mode": self.enforcement_mode,
"clients_initialized": {
"figma": self._figma_client is not None,
"browser": self._browser_strategy is not None,
"http": self._http_client is not None,
},
"config_version": self.config.get("version", "unknown")
}
# Global runtime instance (singleton pattern)
_runtime_instance: Optional[DSSRuntime] = None
def get_runtime() -> DSSRuntime:
"""Get the global DSSRuntime instance (singleton)"""
global _runtime_instance
if _runtime_instance is None:
_runtime_instance = DSSRuntime()
return _runtime_instance

View File

@@ -0,0 +1,52 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DSS Project Manifest",
"type": "object",
"required": ["version", "project", "extends", "stack"],
"properties": {
"version": {"type": "string", "pattern": "^2\\.0\\.0$"},
"project": {
"type": "object",
"required": ["id", "name", "type"],
"properties": {
"id": {"type": "string", "pattern": "^[a-z0-9-]+$"},
"name": {"type": "string"},
"type": {"enum": ["web", "mobile", "desktop"]}
}
},
"extends": {
"type": "object",
"required": ["skin", "version"],
"properties": {
"skin": {"type": "string"},
"version": {"type": "string"}
}
},
"stack": {
"type": "object",
"required": ["framework", "styling"],
"properties": {
"framework": {"enum": ["react", "vue", "angular", "ios", "android", "flutter", "vanilla"]},
"styling": {"enum": ["tailwind", "css-modules", "styled-components", "emotion", "css-vars"]},
"icons": {"enum": ["lucide", "heroicons", "material", "custom"]},
"typescript": {"type": "boolean"}
}
},
"compiler": {
"type": "object",
"properties": {
"strict_mode": {"type": "boolean"},
"validation_level": {"enum": ["error", "warning", "info"]},
"output_format": {"enum": ["css-vars", "tailwind-config", "js-tokens"]},
"cache_strategy": {"enum": ["aggressive", "moderate", "disabled"]}
}
},
"overrides": {
"type": "object",
"properties": {
"tokens": {"type": "object"},
"files": {"type": "array", "items": {"type": "string"}}
}
}
}
}

View File

@@ -0,0 +1,28 @@
{
"meta": {
"id": "base",
"version": "1.0.0",
"description": "Foundation tokens shared across all skins"
},
"tokens": {
"colors": {
"transparent": "transparent",
"current": "currentColor",
"white": "#ffffff",
"black": "#000000"
},
"spacing": {
"0": "0px",
"1": "4px",
"2": "8px",
"4": "16px",
"8": "32px"
},
"typography": {
"fontFamily": {
"sans": ["system-ui", "sans-serif"],
"mono": ["monospace"]
}
}
}
}

View File

@@ -0,0 +1,21 @@
{
"meta": {
"id": "classic",
"version": "2.0.0",
"parent": "base"
},
"tokens": {
"colors": {
"primary": "#3B82F6",
"secondary": "#10B981",
"danger": "#EF4444",
"background": "#F3F4F6",
"surface": "#FFFFFF",
"text": "#1F2937"
},
"borderRadius": {
"default": "0.25rem",
"lg": "0.5rem"
}
}
}

View File

@@ -0,0 +1,33 @@
{
"meta": {
"id": "workbench",
"version": "2.0.0",
"parent": "base",
"description": "High density technical interface skin"
},
"tokens": {
"colors": {
"primary": "#2563EB",
"secondary": "#475569",
"danger": "#DC2626",
"background": "#0F172A",
"surface": "#1E293B",
"text": "#E2E8F0"
},
"spacing": {
"1": "2px",
"2": "4px",
"4": "8px",
"8": "16px"
},
"borderRadius": {
"default": "0px",
"lg": "2px"
},
"typography": {
"fontFamily": {
"sans": ["Inter", "system-ui", "sans-serif"]
}
}
}
}

View File

@@ -0,0 +1,362 @@
"""
DSS Structured Logger - JSON-based logging for AI-consumable audit trails
Provides structured, machine-readable logging in JSONL format (one JSON object per line).
All DSS operations are logged with consistent fields for analysis, debugging, and compliance.
Features:
- JSONL format (newline-delimited JSON) for easy parsing
- Structured log entries with standardized fields
- Context tracking (session_id, tool_name, operation)
- Performance metrics (duration, timestamps)
- Log rotation and cleanup
- Integration with DSSRuntime
Usage:
from core.structured_logger import get_logger, LogContext
logger = get_logger("dss.tool.sync_figma")
with LogContext(session_id="abc123", tool="dss_sync_figma"):
logger.info("Starting Figma sync", extra={"file_key": "xyz"})
# ... operation ...
logger.info("Figma sync complete", extra={"tokens_extracted": 42})
"""
import json
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from contextlib import contextmanager
import threading
# Thread-local storage for context
_context = threading.local()
class DSSJSONFormatter(logging.Formatter):
"""
Custom JSON formatter for structured logging.
Outputs each log record as a single-line JSON object with standardized fields:
- timestamp: ISO 8601 UTC timestamp
- level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- logger: Logger name (e.g., "dss.tool.sync_figma")
- message: Human-readable log message
- context: Additional contextual data (session_id, tool_name, etc.)
- extra: Tool-specific extra data
"""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as single-line JSON"""
# Build base log entry
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# Add context from thread-local storage
if hasattr(_context, "session_id"):
log_entry["session_id"] = _context.session_id
if hasattr(_context, "tool_name"):
log_entry["tool"] = _context.tool_name
if hasattr(_context, "operation"):
log_entry["operation"] = _context.operation
# Add extra fields from record
if hasattr(record, "extra_data"):
log_entry["extra"] = record.extra_data
# Add exception info if present
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__ if record.exc_info[0] else None,
"message": str(record.exc_info[1]) if record.exc_info[1] else None,
"traceback": self.formatException(record.exc_info) if record.exc_info else None,
}
# Add location info for ERROR and above
if record.levelno >= logging.ERROR:
log_entry["location"] = {
"file": record.pathname,
"line": record.lineno,
"function": record.funcName,
}
return json.dumps(log_entry, default=str)
class DSSLogger(logging.Logger):
"""
Extended logger with structured logging support.
Wraps standard Python logger with methods that accept extra data
as keyword arguments for structured logging.
"""
def _log_with_extra(self, level: int, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Internal method to log with extra structured data"""
if extra:
# Store extra data in a custom attribute
extra_record = {"extra_data": extra}
super()._log(level, msg, (), extra=extra_record, **kwargs)
else:
super()._log(level, msg, (), **kwargs)
def debug(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Log DEBUG message with optional extra data"""
self._log_with_extra(logging.DEBUG, msg, extra, **kwargs)
def info(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Log INFO message with optional extra data"""
self._log_with_extra(logging.INFO, msg, extra, **kwargs)
def warning(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Log WARNING message with optional extra data"""
self._log_with_extra(logging.WARNING, msg, extra, **kwargs)
def error(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Log ERROR message with optional extra data"""
self._log_with_extra(logging.ERROR, msg, extra, **kwargs)
def critical(self, msg: str, extra: Optional[Dict[str, Any]] = None, **kwargs):
"""Log CRITICAL message with optional extra data"""
self._log_with_extra(logging.CRITICAL, msg, extra, **kwargs)
# Configure custom logger class
logging.setLoggerClass(DSSLogger)
def get_logger(name: str, log_file: Optional[str] = None) -> DSSLogger:
"""
Get or create a structured logger instance.
Args:
name: Logger name (e.g., "dss.tool.sync_figma")
log_file: Optional custom log file path (defaults to .dss/logs/dss-operations.jsonl)
Returns:
DSSLogger instance configured for structured logging
Example:
logger = get_logger("dss.tool.extract_tokens")
logger.info("Starting token extraction", extra={"source": "css"})
"""
logger = logging.getLogger(name)
# Only configure if not already configured
if not logger.handlers:
# Determine log file path
if log_file is None:
dss_home = os.environ.get("DSS_HOME", ".dss")
log_dir = Path(dss_home) / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = str(log_dir / "dss-operations.jsonl")
# Create file handler with JSON formatter
file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
file_handler.setFormatter(DSSJSONFormatter())
logger.addHandler(file_handler)
# Also add console handler for development (can be disabled in production)
if os.environ.get("DSS_LOG_CONSOLE", "false").lower() == "true":
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(DSSJSONFormatter())
logger.addHandler(console_handler)
# Set log level from environment or default to INFO
log_level = os.environ.get("DSS_LOG_LEVEL", "INFO").upper()
logger.setLevel(getattr(logging, log_level, logging.INFO))
# Prevent propagation to root logger
logger.propagate = False
return logger
@contextmanager
def LogContext(session_id: Optional[str] = None, tool: Optional[str] = None, operation: Optional[str] = None):
"""
Context manager for adding structured context to log entries.
All log entries within this context will include the provided fields
(session_id, tool_name, operation).
Args:
session_id: Unique session identifier
tool: Tool name (e.g., "dss_sync_figma")
operation: Operation being performed (e.g., "token_extraction")
Example:
with LogContext(session_id="abc123", tool="dss_sync_figma"):
logger.info("Starting sync")
# This log will include session_id and tool fields
"""
# Store previous context
prev_session_id = getattr(_context, "session_id", None)
prev_tool_name = getattr(_context, "tool_name", None)
prev_operation = getattr(_context, "operation", None)
# Set new context
if session_id:
_context.session_id = session_id
if tool:
_context.tool_name = tool
if operation:
_context.operation = operation
try:
yield
finally:
# Restore previous context
if prev_session_id:
_context.session_id = prev_session_id
elif hasattr(_context, "session_id"):
delattr(_context, "session_id")
if prev_tool_name:
_context.tool_name = prev_tool_name
elif hasattr(_context, "tool_name"):
delattr(_context, "tool_name")
if prev_operation:
_context.operation = prev_operation
elif hasattr(_context, "operation"):
delattr(_context, "operation")
class PerformanceLogger:
"""
Helper for logging operation performance metrics.
Automatically measures duration and logs performance data.
Example:
perf = PerformanceLogger("token_extraction")
perf.start()
# ... operation ...
perf.end(extra={"tokens_found": 42})
"""
def __init__(self, operation: str, logger: Optional[DSSLogger] = None):
"""
Initialize performance logger.
Args:
operation: Operation name
logger: Optional logger (defaults to root DSS logger)
"""
self.operation = operation
self.logger = logger or get_logger("dss.performance")
self.start_time = None
self.end_time = None
def start(self):
"""Mark operation start time"""
self.start_time = datetime.now(timezone.utc)
self.logger.debug(f"Started: {self.operation}", extra={
"operation": self.operation,
"start_time": self.start_time.isoformat(),
})
def end(self, extra: Optional[Dict[str, Any]] = None):
"""
Mark operation end time and log performance.
Args:
extra: Additional metrics to log
"""
self.end_time = datetime.now(timezone.utc)
if self.start_time is None:
self.logger.warning(f"Performance logger end() called without start() for: {self.operation}")
return
duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
perf_data = {
"operation": self.operation,
"duration_ms": round(duration_ms, 2),
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat(),
}
if extra:
perf_data.update(extra)
self.logger.info(f"Completed: {self.operation}", extra=perf_data)
def configure_log_rotation(log_dir: Optional[Path] = None, max_bytes: int = 10 * 1024 * 1024, backup_count: int = 5):
"""
Configure log rotation for DSS log files.
Args:
log_dir: Log directory (defaults to .dss/logs/)
max_bytes: Max size per log file (default: 10MB)
backup_count: Number of backup files to keep (default: 5)
Note: This uses RotatingFileHandler. For production, consider
using a log rotation service like logrotate.
"""
from logging.handlers import RotatingFileHandler
if log_dir is None:
dss_home = os.environ.get("DSS_HOME", ".dss")
log_dir = Path(dss_home) / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "dss-operations.jsonl"
# Get root DSS logger
logger = logging.getLogger("dss")
# Remove existing file handlers
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
logger.removeHandler(handler)
# Add rotating file handler
rotating_handler = RotatingFileHandler(
str(log_file),
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8"
)
rotating_handler.setFormatter(DSSJSONFormatter())
logger.addHandler(rotating_handler)
logger.info("Log rotation configured", extra={
"max_bytes": max_bytes,
"backup_count": backup_count,
"log_file": str(log_file),
})
# Example usage (can be removed in production)
if __name__ == "__main__":
# Example 1: Basic logging
logger = get_logger("dss.example")
logger.info("DSS operation started", extra={"user": "admin"})
# Example 2: Context-based logging
with LogContext(session_id="session-123", tool="dss_sync_figma"):
logger.info("Syncing Figma file", extra={"file_key": "abc123"})
logger.info("Sync complete", extra={"tokens_extracted": 42})
# Example 3: Performance logging
perf = PerformanceLogger("token_extraction", logger)
perf.start()
# Simulate work
import time
time.sleep(0.1)
perf.end(extra={"tokens_found": 100})
print(f"\nLogs written to: {Path('.dss/logs/dss-operations.jsonl').absolute()}")