Files
dss/dss-claude-plugin/servers/dss-mcp-server.py
Digital Production Factory 6ac9e7d811 Phase 2 Complete: DSS Runtime & Boundary Enforcement
Implemented dependency injection and boundary enforcement architecture:

NEW FILE: dss-claude-plugin/core/runtime.py (395 lines)
- DSSRuntime class with boundary validation
- Dependency injection pattern for all external API access
- Capability provider pattern (get_figma_client, get_browser, get_http_client)
- Boundary violation logging and enforcement modes (strict/warn/disabled)
- Singleton pattern with get_runtime() helper
- Session-based temp directory management
- Audit trail for all access and violations

UPDATED: dss-claude-plugin/servers/dss-mcp-server.py
- Integrated DSSRuntime initialization in main()
- Updated version to 2.0.0
- Added runtime availability checking
- Logs enforcement mode on startup
- Changed branding: 'Design System Swarm' → 'Design System Server'

BOUNDARY ENFORCEMENT FEATURES:
- Blocks direct external API access (Figma, Browser, HTTP)
- Validates operations against .dss-boundaries.yaml
- Provides wrapped, sandboxed clients instead of raw access
- Logs all violations to .dss/logs/boundary-violations.jsonl
- Logs all access to .dss/logs/runtime-access.jsonl

Next: Phase 3 (Terminology Cleanup) - 67 files to update
2025-12-09 19:21:39 -03:00

2777 lines
104 KiB
Python

#!/usr/bin/env python3
"""
DSS MCP Server - Design System Server Integration for Claude Code
A Python MCP server that exposes DSS functionality as tools for Claude.
Uses stdio transport for Claude Code integration.
Author: overbits
Version: 2.0.0 - Architectural Refinement: Boundary Enforcement & Runtime
"""
import asyncio
import json
import logging
import sys
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from datetime import datetime
from collections import deque
from dataclasses import dataclass, field
import base64
import re
# DSS Runtime - Boundary Enforcement (CRITICAL)
# All external API access MUST go through the runtime
try:
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.runtime import DSSRuntime, BoundaryViolationError, get_runtime
RUNTIME_AVAILABLE = True
except ImportError as e:
RUNTIME_AVAILABLE = False
RUNTIME_IMPORT_ERROR = str(e)
print(f"WARNING: DSSRuntime not available: {e}", file=sys.stderr)
print("Boundary enforcement will be disabled!", file=sys.stderr)
# Playwright import (optional - only needed for DevTools features)
try:
from playwright.async_api import async_playwright, Browser, Page, BrowserContext, Playwright
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
# Import LocalBrowserStrategy for unified browser automation
try:
from strategies.local.browser import LocalBrowserStrategy
LOCAL_BROWSER_STRATEGY_AVAILABLE = True
except ImportError:
LOCAL_BROWSER_STRATEGY_AVAILABLE = False
# Add DSS to path
DSS_PATH = Path("/home/overbits/dss/dss-mvp1")
sys.path.insert(0, str(DSS_PATH))
# MCP SDK imports
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
except ImportError:
print("MCP SDK not found. Install with: pip install mcp", file=sys.stderr)
sys.exit(1)
# DSS imports
try:
import dss
from dss import (
ProjectScanner, ReactAnalyzer, StyleAnalyzer, DependencyGraph, QuickWinFinder,
CSSTokenSource, SCSSTokenSource, TailwindTokenSource, JSONTokenSource,
TokenMerger, MergeStrategy, TokenCollection,
StyleDictionaryWrapper, ShadcnWrapper, FigmaWrapper,
Theme, Project, ProjectMetadata,
StorybookScanner, StoryGenerator, ThemeGenerator,
DSSSettings, DSSManager, settings, manager
)
DSS_AVAILABLE = True
except ImportError as e:
DSS_AVAILABLE = False
DSS_IMPORT_ERROR = str(e)
# Context Compiler imports
try:
from core import (
get_active_context,
resolve_token,
validate_manifest,
list_skins,
get_compiler_status
)
CONTEXT_COMPILER_AVAILABLE = True
except ImportError as e:
CONTEXT_COMPILER_AVAILABLE = False
CONTEXT_COMPILER_IMPORT_ERROR = str(e)
# Project Management imports
try:
from dss.project import (
DSSProject,
ProjectConfig,
FigmaSource,
ProjectStatus,
ProjectManager,
ProjectRegistry,
FigmaProjectSync,
)
PROJECT_MANAGEMENT_AVAILABLE = True
except ImportError as e:
PROJECT_MANAGEMENT_AVAILABLE = False
PROJECT_MANAGEMENT_IMPORT_ERROR = str(e)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger("dss-mcp-server")
# Timeout configuration (seconds)
TIMEOUT_CONFIG = {
"analyze": 60,
"extract": 30,
"generate": 30,
"style_dictionary": 30,
"figma_api": 15,
"storybook": 60,
"audit": 45,
"quick_wins": 30,
"devtools_connect": 20,
"devtools_default": 10
}
# =============================================================================
# DEVTOOLS STATE MANAGEMENT
# =============================================================================
# DevTools Configuration Constants
DEVTOOLS_CONSOLE_MAX_ENTRIES = 1000
DEVTOOLS_NETWORK_MAX_ENTRIES = 500
DEVTOOLS_CONNECTION_TIMEOUT_MS = 30000 # 30 seconds
@dataclass
class DevToolsState:
"""State management for Chrome DevTools Protocol connections.
Manages Playwright CDP connections to Chrome instances, tracking:
- Browser and page references
- Console log capture (bounded buffer)
- Network request capture (bounded buffer)
- Connection lifecycle state
"""
playwright: Optional[Any] = None
browser: Optional[Any] = None
contexts: Dict[str, Any] = field(default_factory=dict)
pages: Dict[str, Any] = field(default_factory=dict)
active_page_id: Optional[str] = None
console_logs: deque = field(default_factory=lambda: deque(maxlen=DEVTOOLS_CONSOLE_MAX_ENTRIES))
network_requests: deque = field(default_factory=lambda: deque(maxlen=DEVTOOLS_NETWORK_MAX_ENTRIES))
connected: bool = False
devtools = DevToolsState()
# =============================================================================
# BROWSER AUTOMATION STATE
# =============================================================================
@dataclass
class BrowserAutomationState:
"""State management for unified browser automation (LOCAL mode)"""
strategy: Optional[Any] = None # LocalBrowserStrategy instance
mode: str = "local" # "local" or "remote"
session_id: Optional[str] = None
remote_api_url: Optional[str] = None
initialized: bool = False
browser_state = BrowserAutomationState()
# Create MCP server
server = Server("dss-server")
def with_timeout(timeout_key: str):
"""Decorator to add timeout to async functions"""
def decorator(func):
async def wrapper(*args, **kwargs):
timeout = TIMEOUT_CONFIG.get(timeout_key, 30)
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=timeout
)
except asyncio.TimeoutError:
return {
"success": False,
"error": f"Operation timed out after {timeout} seconds",
"timeout_key": timeout_key
}
return wrapper
return decorator
def safe_serialize(obj: Any) -> Any:
"""Safely serialize objects to JSON-compatible format"""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, (list, tuple)):
return [safe_serialize(item) for item in obj]
if isinstance(obj, dict):
return {str(k): safe_serialize(v) for k, v in obj.items()}
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, deque):
return [safe_serialize(item) for item in obj]
if hasattr(obj, '__dict__'):
d = {k: v for k, v in obj.__dict__.items() if not k.startswith('_')}
return safe_serialize(d)
if hasattr(obj, 'model_dump'):
return obj.model_dump()
return str(obj)
# =============================================================================
# TOOL DEFINITIONS
# =============================================================================
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List all available DSS and DevTools tools"""
dss_tools = [
Tool(
name="dss_analyze_project",
description="Analyze a project for design system patterns, component usage, and tokenization opportunities. Returns comprehensive analysis including style patterns, React components, and dependency graph.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the project directory to analyze"
}
},
"required": ["path"]
}
),
Tool(
name="dss_extract_tokens",
description="Extract design tokens from CSS, SCSS, Tailwind, or JSON sources. Returns a unified TokenCollection with all discovered tokens.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory containing design tokens"
},
"sources": {
"type": "array",
"items": {"type": "string", "enum": ["css", "scss", "tailwind", "json"]},
"description": "Token source types to extract from (default: all)"
}
},
"required": ["path"]
}
),
Tool(
name="dss_generate_theme",
description="Generate theme files from design tokens using style-dictionary. Supports CSS, SCSS, JSON, and JS output formats.",
inputSchema={
"type": "object",
"properties": {
"tokens": {
"type": "object",
"description": "Design tokens to transform (or use tokens from previous extraction)"
},
"format": {
"type": "string",
"enum": ["css", "scss", "json", "js"],
"description": "Output format for generated theme files"
},
"theme_name": {
"type": "string",
"description": "Name for the generated theme (default: 'default')"
}
},
"required": ["format"]
}
),
Tool(
name="dss_list_themes",
description="List all available themes in the DSS system",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="dss_get_status",
description="Get DSS system status including health checks, dependencies, configuration, metrics, and recommendations.",
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"enum": ["json", "dashboard"],
"description": "Output format: 'json' for structured data, 'dashboard' for ASCII art display (default: json)"
}
}
}
),
Tool(
name="dss_audit_components",
description="Audit React components for design system adoption. Identifies hardcoded values, missing tokens, and refactoring opportunities.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to React component directory"
}
},
"required": ["path"]
}
),
Tool(
name="dss_setup_storybook",
description="Set up or configure Storybook for the project. Generates stories and theme configuration.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory"
},
"action": {
"type": "string",
"enum": ["scan", "generate", "configure"],
"description": "Action to perform: scan existing, generate stories, or configure theme"
}
},
"required": ["path"]
}
),
Tool(
name="dss_sync_figma",
description="Sync design tokens from a Figma file. Requires FIGMA_TOKEN environment variable.",
inputSchema={
"type": "object",
"properties": {
"file_key": {
"type": "string",
"description": "Figma file key (from URL)"
}
},
"required": ["file_key"]
}
),
Tool(
name="dss_find_quick_wins",
description="Find quick win opportunities for design system adoption. Identifies low-effort, high-impact improvements.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory"
}
},
"required": ["path"]
}
),
Tool(
name="dss_transform_tokens",
description="Transform tokens between formats using style-dictionary",
inputSchema={
"type": "object",
"properties": {
"tokens": {
"type": "object",
"description": "Tokens to transform"
},
"input_format": {
"type": "string",
"enum": ["css", "scss", "json", "tailwind"],
"description": "Input token format"
},
"output_format": {
"type": "string",
"enum": ["css", "scss", "json", "js"],
"description": "Desired output format"
}
},
"required": ["tokens", "output_format"]
}
)
]
devtools_tools = [
Tool(
name="devtools_launch",
description="Launch a new headless Chromium browser. Use this on remote/headless servers where no Chrome is running.",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Initial URL to navigate to (default: about:blank)"},
"headless": {"type": "boolean", "description": "Run headless (default: true)"}
}
}
),
Tool(
name="devtools_connect",
description="Connect to a running Chrome browser with remote debugging enabled. Start Chrome with: --remote-debugging-port=9222",
inputSchema={
"type": "object",
"properties": {
"port": {"type": "integer", "description": "CDP port number (default: 9222)"},
"host": {"type": "string", "description": "CDP host (default: 'localhost')"}
}
}
),
Tool(
name="devtools_disconnect",
description="Disconnect from Chrome DevTools and clean up resources.",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="devtools_list_pages",
description="List all available pages (tabs) in the connected browser with their URLs and titles.",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="devtools_select_page",
description="Set the active page for subsequent DevTools operations. Console and network logging will be enabled for the selected page.",
inputSchema={
"type": "object",
"properties": {
"page_id": {"type": "string", "description": "The unique ID of the page to select (from devtools_list_pages)"}
},
"required": ["page_id"]
}
),
Tool(
name="devtools_console_logs",
description="Retrieve captured console log messages (log, warn, error, info, debug) from the active page.",
inputSchema={
"type": "object",
"properties": {
"level": {
"type": "string",
"enum": ["all", "log", "warn", "error", "info", "debug"],
"description": "Filter by message level (default: all)"
},
"limit": {"type": "integer", "description": "Maximum number of messages to return (default: 100)"},
"clear": {"type": "boolean", "description": "Clear captured logs after retrieving (default: false)"}
}
}
),
Tool(
name="devtools_network_requests",
description="Retrieve captured network requests from the active page. Includes URL, method, headers, and resource type.",
inputSchema={
"type": "object",
"properties": {
"filter_url": {"type": "string", "description": "Regex pattern to filter requests by URL"},
"limit": {"type": "integer", "description": "Maximum number of requests to return (default: 50)"}
}
}
),
Tool(
name="devtools_evaluate",
description="Execute a JavaScript expression in the context of the active page and return the result.",
inputSchema={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "The JavaScript expression to evaluate"}
},
"required": ["expression"]
}
),
Tool(
name="devtools_query_dom",
description="Query DOM elements on the active page using a CSS selector. Returns tag, text content, and outer HTML for each match.",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector to query for elements"}
},
"required": ["selector"]
}
),
Tool(
name="devtools_goto",
description="Navigate the active page to a URL.",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to navigate to"},
"wait_until": {"type": "string", "description": "Wait condition: 'load', 'domcontentloaded', 'networkidle' (default: domcontentloaded)"}
},
"required": ["url"]
}
),
Tool(
name="devtools_screenshot",
description="Capture a screenshot of the active page or a specific element. Returns base64 encoded PNG.",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector of an element to capture. If omitted, captures the viewport."},
"full_page": {"type": "boolean", "description": "Capture the full scrollable page (default: false)"}
}
}
),
Tool(
name="devtools_performance",
description="Get performance metrics for the active page including page load time, DNS lookup, TCP connect, and response times.",
inputSchema={"type": "object", "properties": {}}
)
]
# Browser Automation Tools (Unified LOCAL/REMOTE strategy)
browser_tools = [
Tool(
name="browser_init",
description="Initialize browser automation. Mode 'local' uses Playwright for direct control. Mode 'remote' uses Shadow State pattern to fetch logs from a running admin-ui session.",
inputSchema={
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["local", "remote"],
"description": "Automation mode: 'local' for Playwright, 'remote' for Shadow State API (default: local)"
},
"url": {
"type": "string",
"description": "For local mode: URL to navigate to. For remote mode: API endpoint URL."
},
"session_id": {
"type": "string",
"description": "For remote mode: Session ID to fetch logs from."
},
"headless": {
"type": "boolean",
"description": "For local mode: Run browser headless (default: true)"
}
}
}
),
Tool(
name="browser_get_logs",
description="Get console logs from the browser. Works in both LOCAL and REMOTE modes.",
inputSchema={
"type": "object",
"properties": {
"level": {
"type": "string",
"enum": ["all", "log", "warn", "error", "info", "debug"],
"description": "Filter by log level (default: all)"
},
"limit": {
"type": "integer",
"description": "Maximum number of logs to return (default: 100)"
}
}
}
),
Tool(
name="browser_screenshot",
description="Capture a screenshot from the browser. Requires LOCAL mode.",
inputSchema={
"type": "object",
"properties": {
"selector": {
"type": "string",
"description": "CSS selector to capture specific element. If omitted, captures viewport."
},
"full_page": {
"type": "boolean",
"description": "Capture full scrollable page (default: false)"
}
}
}
),
Tool(
name="browser_dom_snapshot",
description="Get current DOM state as HTML. Works in both LOCAL and REMOTE modes.",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="browser_get_errors",
description="Get captured errors (uncaught exceptions, unhandled rejections). Works in both modes.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of errors to return (default: 50)"
}
}
}
),
Tool(
name="browser_accessibility_audit",
description="Run accessibility audit using axe-core. Returns WCAG violations and passes.",
inputSchema={
"type": "object",
"properties": {
"selector": {
"type": "string",
"description": "CSS selector to audit specific element. If omitted, audits entire page."
}
}
}
),
Tool(
name="browser_performance",
description="Get Core Web Vitals and performance metrics (TTFB, FCP, LCP, CLS).",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="browser_close",
description="Close the browser automation session and clean up resources.",
inputSchema={"type": "object", "properties": {}}
)
]
# Context Compiler Tools
context_compiler_tools = [
Tool(
name="dss_get_resolved_context",
description="Get fully resolved design system context for a project. Returns compiled tokens from 3-layer cascade (base → skin → project).",
inputSchema={
"type": "object",
"properties": {
"manifest_path": {
"type": "string",
"description": "Absolute path to ds.config.json"
},
"debug": {
"type": "boolean",
"description": "Enable debug provenance tracking",
"default": False
},
"force_refresh": {
"type": "boolean",
"description": "Bypass cache and recompile",
"default": False
}
},
"required": ["manifest_path"]
}
),
Tool(
name="dss_resolve_token",
description="Resolve a specific design token through the cascade. Use dot-notation (e.g. 'colors.primary').",
inputSchema={
"type": "object",
"properties": {
"manifest_path": {
"type": "string",
"description": "Absolute path to ds.config.json"
},
"token_path": {
"type": "string",
"description": "Dot-notation path to token (e.g. 'colors.primary')"
},
"force_refresh": {
"type": "boolean",
"description": "Bypass cache and recompile",
"default": False
}
},
"required": ["manifest_path", "token_path"]
}
),
Tool(
name="dss_validate_manifest",
description="Validate project manifest (ds.config.json) against schema.",
inputSchema={
"type": "object",
"properties": {
"manifest_path": {
"type": "string",
"description": "Absolute path to ds.config.json"
}
},
"required": ["manifest_path"]
}
),
Tool(
name="dss_list_skins",
description="List all available design system skins in the registry.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="dss_get_compiler_status",
description="Get Context Compiler health and configuration status.",
inputSchema={
"type": "object",
"properties": {}
}
)
]
# Project Management Tools
project_tools = [
Tool(
name="dss_project_init",
description="Initialize a new DSS project with folder structure and config file.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path for the new project"
},
"name": {
"type": "string",
"description": "Project name"
},
"description": {
"type": "string",
"description": "Optional project description"
},
"skin": {
"type": "string",
"description": "Base skin to extend (e.g., 'shadcn', 'material')"
}
},
"required": ["path", "name"]
}
),
Tool(
name="dss_project_add_figma_team",
description="Link a Figma team folder to the project. Auto-discovers all projects/files and identifies the UIKit reference file.",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to DSS project directory"
},
"team_id": {
"type": "string",
"description": "Figma team ID"
},
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional, uses FIGMA_TOKEN env var if not provided)"
}
},
"required": ["project_path", "team_id"]
}
),
Tool(
name="dss_project_add_figma_file",
description="Add a single Figma file to the project.",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to DSS project directory"
},
"file_key": {
"type": "string",
"description": "Figma file key (from URL)"
},
"file_name": {
"type": "string",
"description": "Human-readable name for the file"
},
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional)"
}
},
"required": ["project_path", "file_key", "file_name"]
}
),
Tool(
name="dss_project_sync",
description="Sync design tokens from all configured Figma sources.",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to DSS project directory"
},
"file_keys": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: specific file keys to sync (syncs all if not provided)"
},
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional)"
}
},
"required": ["project_path"]
}
),
Tool(
name="dss_project_build",
description="Build output files (CSS, SCSS, JSON) from synced tokens.",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to DSS project directory"
}
},
"required": ["project_path"]
}
),
Tool(
name="dss_project_list",
description="List all registered DSS projects.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="dss_project_info",
description="Get detailed information about a DSS project.",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to DSS project directory"
}
},
"required": ["project_path"]
}
),
Tool(
name="dss_figma_discover",
description="Discover Figma team structure including all projects, files, and identify UIKit reference file.",
inputSchema={
"type": "object",
"properties": {
"team_id": {
"type": "string",
"description": "Figma team ID"
},
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional)"
}
},
"required": ["team_id"]
}
),
Tool(
name="dss_core_sync",
description="Sync DSS core design system from the canonical Figma source (shadcn/ui). This is the base layer that all skins and projects inherit from.",
inputSchema={
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "Force sync even if recently synced"
},
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional)"
}
}
}
),
Tool(
name="dss_core_status",
description="Get DSS core sync status including Figma reference and synced files.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="dss_core_tokens",
description="Get DSS core tokens (synced from shadcn/ui Figma).",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="dss_core_themes",
description="Get DSS core themes (light/dark based on shadcn/ui).",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="dss_rate_limit_status",
description="Check current Figma API rate limit status.",
inputSchema={
"type": "object",
"properties": {
"figma_token": {
"type": "string",
"description": "Figma personal access token (optional)"
}
}
}
),
]
return dss_tools + devtools_tools + browser_tools + context_compiler_tools + project_tools
# =============================================================================
# TOOL DISPATCHER
# =============================================================================
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls"""
if not DSS_AVAILABLE and name.startswith("dss_"):
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": f"DSS modules not available: {DSS_IMPORT_ERROR}"
}, indent=2)
)]
if not PLAYWRIGHT_AVAILABLE and name.startswith("devtools_"):
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": "Playwright not installed. Run: pip install playwright && playwright install chromium"
}, indent=2)
)]
try:
# DSS Tools
if name == "dss_analyze_project":
result = await analyze_project(arguments.get("path", "."))
elif name == "dss_extract_tokens":
result = await extract_tokens(
arguments.get("path", "."),
arguments.get("sources", ["css", "scss", "tailwind", "json"])
)
elif name == "dss_generate_theme":
result = await generate_theme(
arguments.get("tokens", {}),
arguments.get("format", "css"),
arguments.get("theme_name", "default")
)
elif name == "dss_list_themes":
result = await list_themes()
elif name == "dss_get_status":
result = await get_status(arguments.get("format", "json"))
elif name == "dss_audit_components":
result = await audit_components(arguments.get("path", "."))
elif name == "dss_setup_storybook":
result = await setup_storybook(
arguments.get("path", "."),
arguments.get("action", "scan")
)
elif name == "dss_sync_figma":
result = await sync_figma(arguments.get("file_key", ""))
elif name == "dss_find_quick_wins":
result = await find_quick_wins(arguments.get("path", "."))
elif name == "dss_transform_tokens":
result = await transform_tokens(
arguments.get("tokens", {}),
arguments.get("input_format", "json"),
arguments.get("output_format", "css")
)
# DevTools Tools
elif name == "devtools_launch":
result = await devtools_launch_impl(
url=arguments.get("url", "about:blank"),
headless=arguments.get("headless", True)
)
elif name == "devtools_connect":
result = await devtools_connect_impl(
port=arguments.get("port", 9222),
host=arguments.get("host", "localhost")
)
elif name == "devtools_disconnect":
result = await devtools_disconnect_impl()
elif name == "devtools_list_pages":
result = await devtools_list_pages_impl()
elif name == "devtools_select_page":
result = await devtools_select_page_impl(page_id=arguments.get("page_id"))
elif name == "devtools_console_logs":
result = await devtools_console_logs_impl(
level=arguments.get("level", "all"),
limit=arguments.get("limit", 100),
clear=arguments.get("clear", False)
)
elif name == "devtools_network_requests":
result = await devtools_network_requests_impl(
filter_url=arguments.get("filter_url", ""),
limit=arguments.get("limit", 50)
)
elif name == "devtools_evaluate":
result = await devtools_evaluate_impl(expression=arguments.get("expression"))
elif name == "devtools_query_dom":
result = await devtools_query_dom_impl(selector=arguments.get("selector"))
elif name == "devtools_goto":
result = await devtools_goto_impl(
url=arguments.get("url"),
wait_until=arguments.get("wait_until", "domcontentloaded")
)
elif name == "devtools_screenshot":
result = await devtools_screenshot_impl(
selector=arguments.get("selector"),
full_page=arguments.get("full_page", False)
)
elif name == "devtools_performance":
result = await devtools_performance_impl()
# Browser Automation Tools
elif name == "browser_init":
result = await browser_init_impl(
mode=arguments.get("mode", "local"),
url=arguments.get("url"),
session_id=arguments.get("session_id"),
headless=arguments.get("headless", True)
)
elif name == "browser_get_logs":
result = await browser_get_logs_impl(
level=arguments.get("level", "all"),
limit=arguments.get("limit", 100)
)
elif name == "browser_screenshot":
result = await browser_screenshot_impl(
selector=arguments.get("selector"),
full_page=arguments.get("full_page", False)
)
elif name == "browser_dom_snapshot":
result = await browser_dom_snapshot_impl()
elif name == "browser_get_errors":
result = await browser_get_errors_impl(
limit=arguments.get("limit", 50)
)
elif name == "browser_accessibility_audit":
result = await browser_accessibility_audit_impl(
selector=arguments.get("selector")
)
elif name == "browser_performance":
result = await browser_performance_impl()
elif name == "browser_close":
result = await browser_close_impl()
# Context Compiler tools
elif name == "dss_get_resolved_context":
if not CONTEXT_COMPILER_AVAILABLE:
result = {
"success": False,
"error": f"Context Compiler not available: {CONTEXT_COMPILER_IMPORT_ERROR}"
}
else:
try:
context_json = get_active_context(
arguments.get("manifest_path"),
arguments.get("debug", False),
arguments.get("force_refresh", False)
)
result = {"success": True, "context": json.loads(context_json)}
except Exception as e:
result = {"success": False, "error": str(e)}
elif name == "dss_resolve_token":
if not CONTEXT_COMPILER_AVAILABLE:
result = {
"success": False,
"error": f"Context Compiler not available: {CONTEXT_COMPILER_IMPORT_ERROR}"
}
else:
try:
token_value = resolve_token(
arguments.get("manifest_path"),
arguments.get("token_path"),
arguments.get("force_refresh", False)
)
result = {"success": True, "token_path": arguments.get("token_path"), "value": token_value}
except Exception as e:
result = {"success": False, "error": str(e)}
elif name == "dss_validate_manifest":
if not CONTEXT_COMPILER_AVAILABLE:
result = {
"success": False,
"error": f"Context Compiler not available: {CONTEXT_COMPILER_IMPORT_ERROR}"
}
else:
try:
validation_result = validate_manifest(arguments.get("manifest_path"))
result = {"success": True, "validation": validation_result}
except Exception as e:
result = {"success": False, "error": str(e)}
elif name == "dss_list_skins":
if not CONTEXT_COMPILER_AVAILABLE:
result = {
"success": False,
"error": f"Context Compiler not available: {CONTEXT_COMPILER_IMPORT_ERROR}"
}
else:
try:
skins_json = list_skins()
result = {"success": True, "skins": json.loads(skins_json)}
except Exception as e:
result = {"success": False, "error": str(e)}
elif name == "dss_get_compiler_status":
if not CONTEXT_COMPILER_AVAILABLE:
result = {
"success": False,
"error": f"Context Compiler not available: {CONTEXT_COMPILER_IMPORT_ERROR}"
}
else:
try:
status_json = get_compiler_status()
result = {"success": True, "status": json.loads(status_json)}
except Exception as e:
result = {"success": False, "error": str(e)}
# Project Management Tools
elif name == "dss_project_init":
result = await project_init_impl(
path=arguments.get("path"),
name=arguments.get("name"),
description=arguments.get("description"),
skin=arguments.get("skin")
)
elif name == "dss_project_add_figma_team":
result = await project_add_figma_team_impl(
project_path=arguments.get("project_path"),
team_id=arguments.get("team_id"),
figma_token=arguments.get("figma_token")
)
elif name == "dss_project_add_figma_file":
result = await project_add_figma_file_impl(
project_path=arguments.get("project_path"),
file_key=arguments.get("file_key"),
file_name=arguments.get("file_name"),
figma_token=arguments.get("figma_token")
)
elif name == "dss_project_sync":
result = await project_sync_impl(
project_path=arguments.get("project_path"),
file_keys=arguments.get("file_keys"),
figma_token=arguments.get("figma_token")
)
elif name == "dss_project_build":
result = await project_build_impl(
project_path=arguments.get("project_path")
)
elif name == "dss_project_list":
result = await project_list_impl()
elif name == "dss_project_info":
result = await project_info_impl(
project_path=arguments.get("project_path")
)
elif name == "dss_figma_discover":
result = await figma_discover_impl(
team_id=arguments.get("team_id"),
figma_token=arguments.get("figma_token")
)
elif name == "dss_core_sync":
result = await dss_core_sync_impl(
force=arguments.get("force", False),
figma_token=arguments.get("figma_token")
)
elif name == "dss_core_status":
result = await dss_core_status_impl()
elif name == "dss_core_tokens":
result = await dss_core_tokens_impl()
elif name == "dss_core_themes":
result = await dss_core_themes_impl()
elif name == "dss_rate_limit_status":
result = await dss_rate_limit_status_impl(
figma_token=arguments.get("figma_token")
)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
return [TextContent(
type="text",
text=json.dumps(safe_serialize(result), indent=2)
)]
except Exception as e:
logger.exception(f"Error in tool {name}")
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e),
"tool": name
}, indent=2)
)]
# =============================================================================
# DSS TOOL IMPLEMENTATIONS
# =============================================================================
async def analyze_project(path: str) -> Dict[str, Any]:
"""Analyze a project for design system patterns"""
project_path = Path(path).resolve()
if not project_path.exists():
return {"success": False, "error": f"Path does not exist: {path}"}
try:
loop = asyncio.get_event_loop()
scanner = ProjectScanner(project_path)
react_analyzer = ReactAnalyzer(project_path)
style_analyzer = StyleAnalyzer(project_path)
scan_result = await loop.run_in_executor(None, scanner.scan)
react_result = await loop.run_in_executor(None, react_analyzer.analyze)
style_result = await loop.run_in_executor(None, style_analyzer.analyze)
return {
"success": True,
"project_path": str(project_path),
"analysis": {
"scan": safe_serialize(scan_result),
"react_components": safe_serialize(react_result),
"styles": safe_serialize(style_result)
},
"summary": {
"files_scanned": getattr(scan_result, 'files_count', 0),
"components_found": len(getattr(react_result, 'components', [])),
"style_patterns": len(getattr(style_result, 'patterns', []))
}
}
except Exception as e:
return {"success": False, "error": str(e)}
async def extract_tokens(path: str, sources: List[str]) -> Dict[str, Any]:
"""Extract design tokens from various sources"""
target_path = Path(path).resolve()
if not target_path.exists():
return {"success": False, "error": f"Path does not exist: {path}"}
try:
loop = asyncio.get_event_loop()
all_tokens = []
source_map = {
"css": CSSTokenSource, "scss": SCSSTokenSource,
"tailwind": TailwindTokenSource, "json": JSONTokenSource
}
for source_type in sources:
if source_type in source_map:
source = source_map[source_type](target_path)
tokens = await loop.run_in_executor(None, source.extract)
if tokens:
all_tokens.extend(tokens)
if all_tokens:
merger = TokenMerger(strategy=MergeStrategy.PREFER_LATEST)
merged = merger.merge(all_tokens)
return {
"success": True, "path": str(target_path), "sources": sources,
"tokens": safe_serialize(merged),
"token_count": len(merged) if hasattr(merged, '__len__') else 0
}
else:
return {
"success": True, "path": str(target_path), "sources": sources,
"tokens": [], "token_count": 0, "message": "No tokens found"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def generate_theme(tokens: Dict, format: str, theme_name: str) -> Dict[str, Any]:
"""Generate theme files from tokens"""
try:
loop = asyncio.get_event_loop()
theme = Theme(name=theme_name, tokens=tokens)
sd_wrapper = StyleDictionaryWrapper()
result = await loop.run_in_executor(None, lambda: sd_wrapper.transform_theme(theme, output_format=format))
return {
"success": result.get("success", False), "format": format,
"theme_name": theme_name, "files": result.get("files", {}),
"errors": result.get("errors")
}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_themes() -> Dict[str, Any]:
"""List available themes"""
try:
from dss.themes import default_themes
themes = list(getattr(default_themes, 'THEMES', {}).keys())
return {"success": True, "themes": themes, "count": len(themes)}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_status(format: str = "json") -> Dict[str, Any]:
"""Get DSS system status"""
try:
from dss.status import StatusDashboard
dashboard = StatusDashboard()
if format == "dashboard":
return {"success": True, "format": "dashboard", "dashboard": dashboard.render_text()}
else:
return dashboard.get_status()
except ImportError:
logger.warning("StatusDashboard not available, using basic status")
system_info = manager.get_system_info()
dependencies = manager.check_dependencies()
return {
"success": True, "version": dss.__version__,
"system_info": system_info, "dependencies": dependencies,
"healthy": all(dependencies.values()), "timestamp": datetime.now().isoformat()
}
except Exception as e:
return {"success": False, "error": str(e)}
async def audit_components(path: str) -> Dict[str, Any]:
"""Audit React components for design system adoption"""
project_path = Path(path).resolve()
if not project_path.exists():
return {"success": False, "error": f"Path does not exist: {path}"}
try:
loop = asyncio.get_event_loop()
react_analyzer = ReactAnalyzer(project_path)
style_analyzer = StyleAnalyzer(project_path)
graph = DependencyGraph(project_path)
react_result = await loop.run_in_executor(None, react_analyzer.analyze)
style_result = await loop.run_in_executor(None, style_analyzer.analyze)
graph_result = await loop.run_in_executor(None, graph.build)
hardcoded = getattr(style_result, 'hardcoded_values', [])
return {
"success": True, "path": str(project_path),
"audit": {
"components": safe_serialize(react_result),
"styles": safe_serialize(style_result),
"dependencies": safe_serialize(graph_result)
},
"issues": {"hardcoded_values": hardcoded}
}
except Exception as e:
return {"success": False, "error": str(e)}
async def setup_storybook(path: str, action: str) -> Dict[str, Any]:
"""Setup or configure Storybook"""
project_path = Path(path).resolve()
if not project_path.exists():
return {"success": False, "error": f"Path does not exist: {path}"}
try:
loop = asyncio.get_event_loop()
if action == "scan":
scanner = StorybookScanner(project_path)
result = await loop.run_in_executor(None, scanner.scan)
return {"success": True, "action": "scan", "result": safe_serialize(result)}
elif action == "generate":
generator = StoryGenerator(project_path)
result = await loop.run_in_executor(None, generator.generate)
return {"success": True, "action": "generate", "stories_created": safe_serialize(result)}
elif action == "configure":
theme_gen = ThemeGenerator(project_path)
result = await loop.run_in_executor(None, theme_gen.generate)
return {"success": True, "action": "configure", "theme_config": safe_serialize(result)}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def sync_figma(file_key: str) -> Dict[str, Any]:
"""Sync tokens from Figma"""
if not file_key:
return {"success": False, "error": "file_key is required"}
figma_token = os.environ.get("FIGMA_TOKEN") or settings.FIGMA_TOKEN
if not figma_token:
return {"success": False, "error": "FIGMA_TOKEN not configured."}
try:
loop = asyncio.get_event_loop()
figma = FigmaWrapper(token=figma_token)
result = await loop.run_in_executor(None, lambda: figma.extract_tokens(file_key))
return {"success": True, "file_key": file_key, "tokens": safe_serialize(result)}
except Exception as e:
return {"success": False, "error": str(e)}
async def find_quick_wins(path: str) -> Dict[str, Any]:
"""Find quick win opportunities"""
project_path = Path(path).resolve()
if not project_path.exists():
return {"success": False, "error": f"Path does not exist: {path}"}
try:
loop = asyncio.get_event_loop()
finder = QuickWinFinder(project_path)
quick_wins = await loop.run_in_executor(None, finder.find)
return {
"success": True, "path": str(project_path),
"quick_wins": safe_serialize(quick_wins),
"count": len(quick_wins) if quick_wins else 0
}
except Exception as e:
return {"success": False, "error": str(e)}
async def transform_tokens(tokens: Dict, input_format: str, output_format: str) -> Dict[str, Any]:
"""Transform tokens between formats"""
try:
loop = asyncio.get_event_loop()
theme = Theme(name="transform_temp", tokens=tokens)
sd_wrapper = StyleDictionaryWrapper()
result = await loop.run_in_executor(None, lambda: sd_wrapper.transform_theme(theme, output_format=output_format))
return {
"success": result.get("success", False), "input_format": input_format,
"output_format": output_format, "transformed": result.get("files", {}),
"errors": result.get("errors")
}
except Exception as e:
return {"success": False, "error": str(e)}
# =============================================================================
# CHROME DEVTOOLS IMPLEMENTATIONS
# =============================================================================
def _get_active_page():
"""Retrieve the active Playwright Page from DevTools state.
Returns:
Playwright Page object for the currently selected browser tab.
Raises:
ConnectionError: If not connected to Chrome DevTools.
ValueError: If no page is selected or the selected page no longer exists.
"""
if not devtools.connected:
raise ConnectionError(
"Not connected to DevTools. Call devtools_connect(port=9222) first. "
"Ensure Chrome is running with --remote-debugging-port=9222"
)
if not devtools.active_page_id:
available = len(devtools.pages)
raise ValueError(
f"No active page selected. {available} page(s) available. "
"Call devtools_list_pages then devtools_select_page(page_id)."
)
if devtools.active_page_id not in devtools.pages:
raise ValueError(
f"Selected page '{devtools.active_page_id}' no longer exists. "
"Call devtools_list_pages to refresh available pages."
)
return devtools.pages[devtools.active_page_id]
async def _on_console(msg):
"""Event handler for browser console messages.
Captures console.log, console.error, console.warn, etc. from the active page.
Messages are stored in a bounded deque (max DEVTOOLS_CONSOLE_MAX_ENTRIES).
Args:
msg: Playwright ConsoleMessage object containing type, text, args, and location.
"""
try:
devtools.console_logs.append({
"timestamp": datetime.now().isoformat(),
"type": msg.type,
"text": msg.text,
"args": [str(arg) for arg in msg.args] if msg.args else [],
"location": getattr(msg, 'location', {})
})
except Exception as e:
logger.debug(f"Error capturing console message: {e}")
async def _on_request(request):
"""Event handler for network requests.
Captures all HTTP requests made by the active page (XHR, fetch, resources).
Requests are stored in a bounded deque (max DEVTOOLS_NETWORK_MAX_ENTRIES).
Args:
request: Playwright Request object containing url, method, headers, resource_type.
"""
try:
devtools.network_requests.append({
"timestamp": datetime.now().isoformat(),
"url": request.url,
"method": request.method,
"headers": dict(request.headers) if request.headers else {},
"resource_type": request.resource_type
})
except Exception as e:
logger.debug(f"Error capturing network request: {e}")
@with_timeout("devtools_connect")
async def devtools_launch_impl(url: str = "about:blank", headless: bool = True) -> Dict[str, Any]:
"""Launch a new headless Chromium browser instance.
Use this on headless/remote servers where no Chrome instance is running.
Launches Playwright's bundled Chromium with CDP enabled.
Args:
url: Initial URL to navigate to (default: about:blank)
headless: Run in headless mode (default: True for servers)
"""
global devtools
if devtools.connected:
return {"success": False, "error": "Already connected. Call devtools_disconnect first."}
try:
devtools.playwright = await async_playwright().start()
devtools.browser = await devtools.playwright.chromium.launch(
headless=headless,
args=['--no-sandbox', '--disable-dev-shm-usage'] # Required for Docker/remote
)
devtools.connected = True
# Create initial page and navigate
context = await devtools.browser.new_context()
devtools.contexts["context_0"] = context
page = await context.new_page()
if url and url != "about:blank":
await page.goto(url, wait_until="domcontentloaded")
# Store page directly (don't rely on list_pages for launched browser)
devtools.pages["page_0"] = page
devtools.active_page_id = "page_0"
# Attach event listeners
try:
page.on("console", _on_console)
page.on("request", _on_request)
except Exception as e:
logger.warning(f"Failed to attach listeners: {e}")
return {
"success": True,
"message": f"Launched headless Chromium",
"headless": headless,
"url": url,
"pages_found": len(devtools.pages),
"active_page_id": devtools.active_page_id
}
except Exception as e:
await devtools_disconnect_impl()
return {"success": False, "error": f"Launch failed: {str(e)}"}
@with_timeout("devtools_connect")
async def devtools_connect_impl(port: int = 9222, host: str = "localhost") -> Dict[str, Any]:
"""Connect to a running Chrome instance via CDP."""
global devtools
if devtools.connected:
return {"success": False, "error": "Already connected. Call devtools_disconnect first."}
try:
devtools.playwright = await async_playwright().start()
# Use configurable timeout for CDP connection
devtools.browser = await devtools.playwright.chromium.connect_over_cdp(
f"http://{host}:{port}",
timeout=DEVTOOLS_CONNECTION_TIMEOUT_MS
)
devtools.connected = True
# Populate pages
await devtools_list_pages_impl()
# Auto-select first page if available
if devtools.pages and not devtools.active_page_id:
first_page_id = next(iter(devtools.pages.keys()))
await devtools_select_page_impl(first_page_id)
return {
"success": True,
"message": f"Connected to Chrome DevTools at {host}:{port}",
"pages_found": len(devtools.pages),
"active_page_id": devtools.active_page_id
}
except Exception as e:
await devtools_disconnect_impl()
return {"success": False, "error": f"Connection failed: {str(e)}. Is Chrome running with --remote-debugging-port={port}?"}
async def devtools_disconnect_impl() -> Dict[str, Any]:
"""Disconnect from Chrome and clean up resources.
Ensures proper cleanup of:
- Event listeners on all pages
- Browser connection
- Playwright instance
"""
global devtools
if not devtools.connected:
return {"success": True, "message": "Not connected."}
try:
# Remove event listeners from all pages to prevent memory leaks
for page_id, page in devtools.pages.items():
try:
page.remove_listener("console", _on_console)
page.remove_listener("request", _on_request)
except Exception:
pass # Page may already be closed
if devtools.browser:
await devtools.browser.close()
if devtools.playwright:
await devtools.playwright.stop()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
finally:
devtools = DevToolsState()
return {"success": True, "message": "Disconnected successfully."}
async def devtools_list_pages_impl() -> Dict[str, Any]:
"""List all browser pages/tabs with URLs."""
if not devtools.connected:
return {"success": False, "error": "Not connected. Call devtools_connect first."}
try:
devtools.pages.clear()
devtools.contexts.clear()
contexts = devtools.browser.contexts
page_index = 0
for i, context in enumerate(contexts):
devtools.contexts[f"context_{i}"] = context
for page in context.pages:
page_id = f"page_{page_index}"
devtools.pages[page_id] = page
page_index += 1
page_list = []
for page_id, page in devtools.pages.items():
try:
title = await page.title()
page_list.append({"id": page_id, "title": title, "url": page.url})
except Exception:
page_list.append({"id": page_id, "title": "(unavailable)", "url": page.url})
return {
"success": True,
"pages": page_list,
"count": len(page_list),
"active_page_id": devtools.active_page_id
}
except Exception as e:
return {"success": False, "error": str(e)}
async def devtools_select_page_impl(page_id: str) -> Dict[str, Any]:
"""Set active page for operations."""
if not devtools.connected:
return {"success": False, "error": "Not connected."}
if not page_id:
return {"success": False, "error": "page_id is required."}
if page_id not in devtools.pages:
await devtools_list_pages_impl()
if page_id not in devtools.pages:
return {"success": False, "error": f"Page with ID '{page_id}' not found."}
# Remove old listeners
if devtools.active_page_id and devtools.active_page_id in devtools.pages:
try:
old_page = devtools.pages[devtools.active_page_id]
old_page.remove_listener("console", _on_console)
old_page.remove_listener("request", _on_request)
except Exception:
pass
devtools.active_page_id = page_id
page = devtools.pages[page_id]
# Attach event listeners with race condition protection
try:
page.on("console", _on_console)
page.on("request", _on_request)
except Exception as e:
# Page may have closed between selection and listener attachment
logger.warning(f"Failed to attach listeners to page {page_id}: {e}")
devtools.active_page_id = None
return {"success": False, "error": f"Page closed during selection: {str(e)}"}
try:
title = await page.title()
except Exception:
title = "(unavailable)"
return {"success": True, "message": f"Active page set to '{title}'", "page_id": page_id}
async def devtools_goto_impl(url: str, wait_until: str = "domcontentloaded") -> Dict[str, Any]:
"""Navigate the active page to a URL.
Args:
url: URL to navigate to
wait_until: Wait condition - 'load', 'domcontentloaded', or 'networkidle'
"""
if not url:
return {"success": False, "error": "URL is required."}
valid_wait = ["load", "domcontentloaded", "networkidle"]
if wait_until not in valid_wait:
wait_until = "domcontentloaded"
try:
page = _get_active_page()
response = await page.goto(url, wait_until=wait_until)
status = response.status if response else None
return {
"success": True,
"url": url,
"status": status,
"title": await page.title()
}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"Navigation failed: {str(e)}"}
async def devtools_console_logs_impl(level: str = "all", limit: int = 100, clear: bool = False) -> Dict[str, Any]:
"""Get console messages."""
try:
_get_active_page()
logs = list(devtools.console_logs)
if level != "all":
logs = [log for log in logs if log.get('type') == level]
result = logs[-limit:]
if clear:
devtools.console_logs.clear()
return {"success": True, "logs": result, "count": len(result), "total_captured": len(devtools.console_logs)}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
async def devtools_network_requests_impl(filter_url: str = "", limit: int = 50) -> Dict[str, Any]:
"""Get network activity."""
try:
_get_active_page()
requests = list(devtools.network_requests)
if filter_url:
requests = [req for req in requests if re.search(filter_url, req.get('url', ''))]
result = requests[-limit:]
return {"success": True, "requests": result, "count": len(result), "total_captured": len(devtools.network_requests)}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
async def devtools_evaluate_impl(expression: str) -> Dict[str, Any]:
"""Execute JavaScript in page context.
WARNING: This executes arbitrary JS in the browser context.
All executions are logged for audit purposes.
"""
if not expression:
return {"success": False, "error": "JavaScript expression cannot be empty."}
# Audit log for security tracking
expr_preview = expression[:100] + "..." if len(expression) > 100 else expression
logger.info(f"[AUDIT] devtools_evaluate called: {expr_preview}")
try:
page = _get_active_page()
result = await page.evaluate(expression)
logger.debug(f"[AUDIT] devtools_evaluate success for page {devtools.active_page_id}")
return {"success": True, "result": safe_serialize(result)}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
except Exception as e:
logger.warning(f"[AUDIT] devtools_evaluate failed: {str(e)}")
return {"success": False, "error": f"JavaScript evaluation failed: {str(e)}"}
async def devtools_query_dom_impl(selector: str) -> Dict[str, Any]:
"""Query DOM elements with CSS selector."""
if not selector:
return {"success": False, "error": "CSS selector cannot be empty."}
try:
page = _get_active_page()
elements = await page.query_selector_all(selector)
results = []
for el in elements[:50]: # Limit to 50 elements
try:
results.append({
"tag": await el.evaluate('el => el.tagName.toLowerCase()'),
"id": await el.evaluate('el => el.id || null'),
"classes": await el.evaluate('el => Array.from(el.classList).join(" ") || null'),
"text": (await el.text_content() or "")[:200],
})
except Exception:
continue
return {"success": True, "elements": results, "count": len(results)}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"DOM query failed: {str(e)}"}
async def devtools_screenshot_impl(selector: str = None, full_page: bool = False) -> Dict[str, Any]:
"""Capture screenshot as base64 PNG."""
try:
page = _get_active_page()
screenshot_bytes = None
if selector:
element = page.locator(selector).first
await element.wait_for(state="visible", timeout=5000)
screenshot_bytes = await element.screenshot()
else:
screenshot_bytes = await page.screenshot(full_page=full_page)
b64_image = base64.b64encode(screenshot_bytes).decode('utf-8')
return {"success": True, "image_base64_png": b64_image, "size_bytes": len(screenshot_bytes)}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"Screenshot failed: {str(e)}"}
async def devtools_performance_impl() -> Dict[str, Any]:
"""Get Core Web Vitals and performance metrics."""
try:
page = _get_active_page()
metrics = await page.evaluate("""() => {
const timing = window.performance.getEntriesByType('navigation')[0];
if (!timing) return null;
const paint = window.performance.getEntriesByType('paint');
const fcp = paint.find(p => p.name === 'first-contentful-paint');
return {
// Navigation timing
domContentLoaded: Math.round(timing.domContentLoadedEventEnd - timing.domContentLoadedEventStart),
loadTime: Math.round(timing.loadEventEnd - timing.loadEventStart),
totalPageLoadTime: Math.round(timing.loadEventEnd - timing.startTime),
dnsLookup: Math.round(timing.domainLookupEnd - timing.domainLookupStart),
tcpConnect: Math.round(timing.connectEnd - timing.connectStart),
requestTime: Math.round(timing.responseEnd - timing.requestStart),
responseTime: Math.round(timing.responseEnd - timing.responseStart),
domInteractive: Math.round(timing.domInteractive - timing.startTime),
// Paint timing
firstContentfulPaint: fcp ? Math.round(fcp.startTime) : null,
// Memory (if available)
jsHeapSize: window.performance.memory ? Math.round(window.performance.memory.usedJSHeapSize / 1024 / 1024) : null
};
}""")
if not metrics:
return {"success": False, "error": "Performance metrics not available for this page."}
return {"success": True, "metrics": metrics}
except (ConnectionError, ValueError) as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"Performance query failed: {str(e)}"}
# =============================================================================
# BROWSER AUTOMATION IMPLEMENTATIONS (Unified LOCAL/REMOTE)
# =============================================================================
class DummyContext:
"""Dummy context for LocalBrowserStrategy initialization"""
def __init__(self, session_id: str = "local"):
self.session_id = session_id
async def browser_init_impl(
mode: str = "local",
url: Optional[str] = None,
session_id: Optional[str] = None,
headless: bool = True
) -> Dict[str, Any]:
"""Initialize browser automation in LOCAL or REMOTE mode."""
global browser_state
if browser_state.initialized:
return {"success": False, "error": "Browser already initialized. Call browser_close first."}
if mode == "local":
if not LOCAL_BROWSER_STRATEGY_AVAILABLE:
return {
"success": False,
"error": "LocalBrowserStrategy not available. Ensure strategies/local/browser.py exists."
}
if not PLAYWRIGHT_AVAILABLE:
return {
"success": False,
"error": "Playwright not installed. Run: pip install playwright && playwright install chromium"
}
try:
context = DummyContext(session_id or f"local-{datetime.now().strftime('%Y%m%d%H%M%S')}")
browser_state.strategy = LocalBrowserStrategy(context)
await browser_state.strategy.launch(headless=headless)
if url:
await browser_state.strategy.navigate(url)
browser_state.mode = "local"
browser_state.session_id = context.session_id
browser_state.initialized = True
return {
"success": True,
"mode": "local",
"session_id": browser_state.session_id,
"url": url,
"headless": headless,
"message": "Local browser automation initialized successfully."
}
except Exception as e:
return {"success": False, "error": f"Failed to initialize LOCAL mode: {str(e)}"}
elif mode == "remote":
if not url:
return {"success": False, "error": "Remote mode requires 'url' parameter (API endpoint)."}
if not session_id:
return {"success": False, "error": "Remote mode requires 'session_id' parameter."}
# For remote mode, we just store the configuration
# Actual fetching happens in each tool call
browser_state.mode = "remote"
browser_state.session_id = session_id
browser_state.remote_api_url = url
browser_state.initialized = True
return {
"success": True,
"mode": "remote",
"session_id": session_id,
"api_url": url,
"message": "Remote browser automation configured. Will fetch from Shadow State API."
}
else:
return {"success": False, "error": f"Unknown mode: {mode}. Use 'local' or 'remote'."}
async def browser_get_logs_impl(level: str = "all", limit: int = 100) -> Dict[str, Any]:
"""Get console logs from browser (LOCAL or REMOTE mode)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
try:
if browser_state.mode == "local":
logs = await browser_state.strategy.get_console_logs(limit=limit, level=level if level != "all" else None)
return {"success": True, "mode": "local", "logs": logs, "count": len(logs)}
elif browser_state.mode == "remote":
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{browser_state.remote_api_url}/{browser_state.session_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
logs = data.get("logs", [])
if level != "all":
logs = [log for log in logs if log.get("level") == level]
return {"success": True, "mode": "remote", "logs": logs[-limit:], "count": len(logs)}
else:
return {"success": False, "error": f"API returned status {response.status}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_screenshot_impl(selector: Optional[str] = None, full_page: bool = False) -> Dict[str, Any]:
"""Capture screenshot (LOCAL mode only)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
if browser_state.mode != "local":
return {"success": False, "error": "Screenshots require LOCAL mode."}
try:
path = await browser_state.strategy.capture_screenshot(selector=selector, full_page=full_page)
# Read file and encode as base64
with open(path, 'rb') as f:
screenshot_bytes = f.read()
b64_image = base64.b64encode(screenshot_bytes).decode('utf-8')
return {
"success": True,
"image_base64_png": b64_image,
"path": path,
"size_bytes": len(screenshot_bytes)
}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_dom_snapshot_impl() -> Dict[str, Any]:
"""Get DOM snapshot (LOCAL or REMOTE mode)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
try:
if browser_state.mode == "local":
html = await browser_state.strategy.get_dom_snapshot()
return {"success": True, "mode": "local", "html": html, "length": len(html)}
elif browser_state.mode == "remote":
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{browser_state.remote_api_url}/{browser_state.session_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
# Look for snapshot in logs
snapshots = [log for log in data.get("logs", []) if log.get("category") == "snapshot"]
if snapshots:
latest = snapshots[-1]
html = latest.get("data", {}).get("snapshot", {}).get("html", "")
return {"success": True, "mode": "remote", "html": html, "length": len(html)}
return {"success": True, "mode": "remote", "html": "", "message": "No snapshot found in logs."}
else:
return {"success": False, "error": f"API returned status {response.status}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_get_errors_impl(limit: int = 50) -> Dict[str, Any]:
"""Get captured errors (LOCAL or REMOTE mode)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
try:
if browser_state.mode == "local":
errors = await browser_state.strategy.get_errors(limit=limit)
return {"success": True, "mode": "local", "errors": errors, "count": len(errors)}
elif browser_state.mode == "remote":
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{browser_state.remote_api_url}/{browser_state.session_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
logs = data.get("logs", [])
errors = [log for log in logs if log.get("level") == "error"]
return {"success": True, "mode": "remote", "errors": errors[-limit:], "count": len(errors)}
else:
return {"success": False, "error": f"API returned status {response.status}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_accessibility_audit_impl(selector: Optional[str] = None) -> Dict[str, Any]:
"""Run accessibility audit (LOCAL injects axe-core, REMOTE fetches from Shadow State)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
try:
if browser_state.mode == "local":
result = await browser_state.strategy.run_accessibility_audit(selector=selector)
else:
# REMOTE mode: fetch from Shadow State
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{browser_state.remote_api_url}/{browser_state.session_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200:
return {"success": False, "error": f"API returned status {response.status}"}
data = await response.json()
logs = data.get("logs", [])
audits = [l for l in logs if l.get("category") in ["accessibility", "accessibilitySnapshot"]]
if not audits:
return {
"success": True,
"mode": "remote",
"message": "No accessibility audit in Shadow State. Run __DSS_BROWSER_LOGS.audit() in browser.",
"summary": {"violations": 0, "passes": 0, "incomplete": 0},
"violations": [], "passes": [], "incomplete": []
}
latest = max(audits, key=lambda x: x.get("timestamp", 0))
audit_data = latest.get("data", {})
result = audit_data.get("results") or audit_data.get("accessibility") or audit_data
violations_count = len(result.get("violations", []))
passes_count = len(result.get("passes", []))
incomplete_count = len(result.get("incomplete", []))
return {
"success": True,
"mode": browser_state.mode,
"summary": {
"violations": violations_count,
"passes": passes_count,
"incomplete": incomplete_count
},
"violations": result.get("violations", []),
"passes": result.get("passes", []),
"incomplete": result.get("incomplete", [])
}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_performance_impl() -> Dict[str, Any]:
"""Get Core Web Vitals and performance metrics (LOCAL or REMOTE mode)."""
global browser_state
if not browser_state.initialized:
return {"success": False, "error": "Browser not initialized. Call browser_init first."}
try:
if browser_state.mode == "local":
metrics = await browser_state.strategy.get_performance_metrics()
return {"success": True, "mode": "local", "metrics": metrics}
else:
# REMOTE mode: fetch from Shadow State
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{browser_state.remote_api_url}/{browser_state.session_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200:
return {"success": False, "error": f"API returned status {response.status}"}
data = await response.json()
logs = data.get("logs", [])
perf_logs = [l for l in logs if l.get("category") in ["performance", "accessibilitySnapshot"]]
if not perf_logs:
return {
"success": True,
"mode": "remote",
"message": "No performance data in Shadow State. Metrics are captured during page load.",
"metrics": {}
}
latest = max(perf_logs, key=lambda x: x.get("timestamp", 0))
perf_data = latest.get("data", {})
metrics = perf_data.get("performance") or {"raw_data": perf_data}
return {"success": True, "mode": "remote", "metrics": {"core_web_vitals": metrics}}
except Exception as e:
return {"success": False, "error": str(e)}
async def browser_close_impl() -> Dict[str, Any]:
"""Close browser automation session."""
global browser_state
if not browser_state.initialized:
return {"success": True, "message": "Browser was not initialized."}
try:
if browser_state.mode == "local" and browser_state.strategy:
await browser_state.strategy.close()
# Reset state
browser_state = BrowserAutomationState()
return {"success": True, "message": "Browser automation session closed."}
except Exception as e:
# Reset state even on error
browser_state = BrowserAutomationState()
return {"success": True, "message": f"Browser closed with warning: {str(e)}"}
# =============================================================================
# PROJECT MANAGEMENT IMPLEMENTATIONS
# =============================================================================
async def project_init_impl(
path: str,
name: str,
description: Optional[str] = None,
skin: Optional[str] = None
) -> Dict[str, Any]:
"""Initialize a new DSS project."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
project = await loop.run_in_executor(
None,
lambda: manager.init(
path=Path(path),
name=name,
description=description,
skin=skin
)
)
return {
"success": True,
"message": f"Project '{name}' initialized at {path}",
"project": {
"name": project.config.name,
"path": str(project.path),
"status": project.status.value,
"config_file": str(project.config_path)
},
"directories_created": [
"tokens/",
"tokens/figma/",
"tokens/custom/",
"tokens/compiled/",
"themes/",
"components/"
]
}
except FileExistsError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_add_figma_team_impl(
project_path: str,
team_id: str,
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""Link a Figma team folder to DSS project."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
# Load existing project
project = await loop.run_in_executor(
None,
lambda: manager.load(Path(project_path))
)
# Add Figma team
updated_project = await loop.run_in_executor(
None,
lambda: manager.add_figma_team(
project=project,
team_id=team_id,
figma_token=figma_token
)
)
# Build response
files_info = []
for f in updated_project.config.figma.files:
files_info.append({
"key": f.key,
"name": f.name,
"is_uikit": f.key == updated_project.config.figma.uikit_file_key
})
return {
"success": True,
"message": f"Linked Figma team {team_id} to project",
"team_id": team_id,
"files_discovered": len(files_info),
"files": files_info,
"uikit_file": updated_project.config.figma.uikit_file_key,
"project_status": updated_project.status.value
}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_add_figma_file_impl(
project_path: str,
file_key: str,
file_name: str,
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""Add a single Figma file to DSS project."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
project = await loop.run_in_executor(
None,
lambda: manager.load(Path(project_path))
)
updated_project = await loop.run_in_executor(
None,
lambda: manager.add_figma_file(
project=project,
file_key=file_key,
file_name=file_name,
figma_token=figma_token
)
)
return {
"success": True,
"message": f"Added Figma file '{file_name}' to project",
"file_key": file_key,
"file_name": file_name,
"total_files": len(updated_project.config.figma.files)
}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_sync_impl(
project_path: str,
file_keys: Optional[List[str]] = None,
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""Sync design tokens from Figma sources."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
project = await loop.run_in_executor(
None,
lambda: manager.load(Path(project_path))
)
# Sync (use sync version to avoid nested async issues)
updated_project = await loop.run_in_executor(
None,
lambda: manager.sync(
project=project,
figma_token=figma_token,
file_keys=file_keys
)
)
# Count tokens extracted
total_tokens = 0
sources_info = {}
if updated_project.extracted_tokens:
for source_key, source_data in updated_project.extracted_tokens.get("sources", {}).items():
token_count = len(source_data.get("tokens", {}))
total_tokens += token_count
sources_info[source_key] = token_count
return {
"success": True,
"message": f"Synced {total_tokens} tokens from {len(sources_info)} files",
"project_status": updated_project.status.value,
"tokens_extracted": total_tokens,
"sources": sources_info,
"errors": updated_project.errors
}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_build_impl(project_path: str) -> Dict[str, Any]:
"""Build output files from synced tokens."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
project = await loop.run_in_executor(
None,
lambda: manager.load(Path(project_path))
)
updated_project = await loop.run_in_executor(
None,
lambda: manager.build(project)
)
output_dir = str(project.path / project.config.output.tokens_dir)
return {
"success": True,
"message": f"Built output files",
"project_status": updated_project.status.value,
"output_directory": output_dir,
"formats_generated": updated_project.config.output.formats,
"errors": updated_project.errors
}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_list_impl() -> Dict[str, Any]:
"""List all registered DSS projects."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
manager = ProjectManager()
projects = manager.list()
return {
"success": True,
"count": len(projects),
"projects": projects
}
except Exception as e:
return {"success": False, "error": str(e)}
async def project_info_impl(project_path: str) -> Dict[str, Any]:
"""Get detailed project information."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
manager = ProjectManager()
project = await loop.run_in_executor(
None,
lambda: manager.load(Path(project_path))
)
figma_info = None
if project.config.figma:
figma_info = {
"team_id": project.config.figma.team_id,
"project_id": project.config.figma.project_id,
"project_name": project.config.figma.project_name,
"files_count": len(project.config.figma.files),
"uikit_file_key": project.config.figma.uikit_file_key,
"files": [
{"key": f.key, "name": f.name, "last_synced": f.last_synced.isoformat() if f.last_synced else None}
for f in project.config.figma.files
]
}
return {
"success": True,
"project": {
"name": project.config.name,
"version": project.config.version,
"description": project.config.description,
"path": str(project.path),
"status": project.status.value,
"skin": project.config.skin,
"base_theme": project.config.base_theme,
"figma": figma_info,
"output": {
"tokens_dir": project.config.output.tokens_dir,
"themes_dir": project.config.output.themes_dir,
"formats": project.config.output.formats
},
"created_at": project.config.created_at.isoformat(),
"updated_at": project.config.updated_at.isoformat()
}
}
except Exception as e:
return {"success": False, "error": str(e)}
async def figma_discover_impl(
team_id: str,
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""Discover Figma team structure."""
if not PROJECT_MANAGEMENT_AVAILABLE:
return {
"success": False,
"error": f"Project management not available: {PROJECT_MANAGEMENT_IMPORT_ERROR}"
}
try:
loop = asyncio.get_event_loop()
sync = FigmaProjectSync(token=figma_token)
structure = await loop.run_in_executor(
None,
lambda: sync.discover_team_structure(team_id)
)
# Format response
projects_info = []
total_files = 0
for proj in structure.get("projects", []):
files = proj.get("files", [])
total_files += len(files)
projects_info.append({
"id": proj["id"],
"name": proj["name"],
"files_count": len(files),
"files": files
})
uikit_info = structure.get("uikit")
return {
"success": True,
"team_id": team_id,
"team_name": structure.get("team_name", ""),
"projects_count": len(projects_info),
"total_files": total_files,
"projects": projects_info,
"uikit_reference": uikit_info
}
except ValueError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
# =============================================================================
# DSS CORE SYNC IMPLEMENTATIONS
# =============================================================================
async def dss_core_sync_impl(
force: bool = False,
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""
Sync DSS core from the canonical shadcn/ui Figma source.
This implements DSS's "eat our own dog food" philosophy - using the
shadcn/ui Figma as the canonical base layer for all design systems.
"""
try:
# Import DSS core sync
from dss.project.sync import DSSCoreSync
from dss.project.figma import FigmaRateLimitError
loop = asyncio.get_event_loop()
sync = DSSCoreSync(figma_token=figma_token)
# Run sync in executor (it uses sync requests)
result = await loop.run_in_executor(
None,
lambda: sync.sync(force=force)
)
if result.get("success"):
return {
"success": True,
"message": result.get("message", "Sync completed"),
"summary": result.get("summary", {}),
"files_written": result.get("files_written", []),
"figma_reference": {
"team_id": sync.reference.team_id,
"team_name": sync.reference.team_name,
"uikit_file_key": sync.reference.uikit_file_key,
"uikit_file_name": sync.reference.uikit_file_name,
}
}
else:
return result
except FigmaRateLimitError as e:
return {
"success": False,
"error": f"Figma rate limit exceeded: {e}",
"retry_after": e.retry_after,
"hint": "Wait for the rate limit to reset and try again"
}
except ImportError as e:
return {
"success": False,
"error": f"DSS core sync not available: {e}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def dss_core_status_impl() -> Dict[str, Any]:
"""Get DSS core sync status."""
try:
from dss.project.sync import DSSCoreSync
sync = DSSCoreSync()
status = sync.get_sync_status()
return {
"success": True,
**status
}
except ImportError as e:
return {
"success": False,
"error": f"DSS core sync not available: {e}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def dss_core_tokens_impl() -> Dict[str, Any]:
"""Get DSS core tokens."""
try:
from dss.project.sync import DSSCoreSync
sync = DSSCoreSync()
tokens = sync.get_tokens()
if tokens:
return {
"success": True,
"tokens": tokens,
"categories": list(tokens.get("categories", {}).keys()),
"total_tokens": sum(
len(cat) for cat in tokens.get("categories", {}).values()
)
}
else:
return {
"success": False,
"error": "DSS core not synced yet. Run dss_core_sync first.",
"hint": "Use dss_core_sync to sync from Figma"
}
except ImportError as e:
return {
"success": False,
"error": f"DSS core sync not available: {e}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def dss_core_themes_impl() -> Dict[str, Any]:
"""Get DSS core themes."""
try:
from dss.project.sync import DSSCoreSync
sync = DSSCoreSync()
themes = sync.get_themes()
if themes:
theme_names = list(themes.get("themes", {}).keys())
return {
"success": True,
"themes": themes,
"theme_names": theme_names,
"total_themes": len(theme_names)
}
else:
return {
"success": False,
"error": "DSS core not synced yet. Run dss_core_sync first.",
"hint": "Use dss_core_sync to sync from Figma"
}
except ImportError as e:
return {
"success": False,
"error": f"DSS core sync not available: {e}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def dss_rate_limit_status_impl(
figma_token: Optional[str] = None
) -> Dict[str, Any]:
"""Get current Figma rate limit status."""
try:
from dss.project.figma import FigmaProjectSync
sync = FigmaProjectSync(token=figma_token)
status = sync.get_rate_limit_status()
return {
"success": True,
**status,
"hint": "Rate limits reset after 60 seconds of no requests"
}
except ValueError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
# =============================================================================
# MAIN
# =============================================================================
async def main():
"""Run the MCP server"""
logger.info("Starting DSS MCP Server v2.0.0...")
logger.info(f"DSS Path: {DSS_PATH}")
logger.info(f"DSS Available: {DSS_AVAILABLE}")
logger.info(f"Playwright Available: {PLAYWRIGHT_AVAILABLE}")
logger.info(f"LocalBrowserStrategy Available: {LOCAL_BROWSER_STRATEGY_AVAILABLE}")
# Initialize DSS Runtime with boundary enforcement
if RUNTIME_AVAILABLE:
try:
runtime = get_runtime()
stats = runtime.get_stats()
logger.info(f"DSS Runtime initialized: {stats['enforcement_mode']} mode")
logger.info("Boundary enforcement: ACTIVE")
except Exception as e:
logger.error(f"Failed to initialize runtime: {e}")
logger.warning("Boundary enforcement: DISABLED")
else:
logger.warning("DSSRuntime not available - boundary enforcement DISABLED")
if DSS_AVAILABLE:
logger.info(f"DSS Version: {dss.__version__}")
try:
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
finally:
logger.info("Server shutting down...")
# Cleanup DevTools
if devtools.connected:
await devtools_disconnect_impl()
# Cleanup Browser Automation
if browser_state.initialized:
await browser_close_impl()
if __name__ == "__main__":
asyncio.run(main())