""" DSS API Server. REST API for design system operations. Endpoints: - Project management (CRUD) - Figma integration (token extraction, component sync) - Health checks - Activity tracking - Configuration management - Service discovery Modes: - Server: Remote deployment, team distribution - Local: Development companion """ import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from dotenv import load_dotenv from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from apps.api.browser_logger import router as browser_log_router from dss import settings # Load environment variables from .env file FIRST (before any other imports) from dss.auth.atlassian_auth import get_auth from dss.figma.figma_tools import FigmaToolSuite from dss.services.config_service import ConfigService from dss.services.project_manager import ProjectManager from dss.services.sandboxed_fs import SandboxedFS from dss.storage.json_store import ( ActivityLog, Cache, CodeMetrics, Components, FigmaFiles, IntegrationHealth, Integrations, Projects, SyncHistory, Teams, TestResults, get_stats, ) # Get project root - apps/api/server.py -> apps/api -> apps -> project_root _server_file = Path(__file__).resolve() _project_root = _server_file.parent.parent.parent # /home/.../dss # Try loading from multiple possible .env locations env_paths = [ _project_root / ".env", # root .env (primary) _project_root / "storybook" / ".env", # storybook/.env _server_file.parent / ".env", # apps/api/.env ] for env_path in env_paths: if env_path.exists(): load_dotenv(env_path, override=True) break # Add project root to path for dss package sys.path.insert(0, str(Path(__file__).parent.parent.parent)) # Import browser logger router (local import from same directory) # DSS package imports - unified package # Additional DSS imports available: # from dss import DesignToken, TokenSource, ProjectScanner # from dss.ingest import CSSTokenSource, SCSSTokenSource, TailwindTokenSource # from dss.analyze import ReactAnalyzer, StyleAnalyzer, QuickWinFinder # from dss.storybook import StorybookScanner, StoryGenerator # === Legacy Config Compatibility === # Wrapper to maintain compatibility with old config.x.y references class _FigmaConfigCompat: @property def is_configured(self): return settings.figma_configured @property def token(self): return settings.FIGMA_TOKEN @property def cache_ttl(self): return settings.FIGMA_CACHE_TTL class _ServerConfigCompat: @property def env(self): return settings.SERVER_ENV @property def port(self): return settings.SERVER_PORT @property def host(self): return settings.SERVER_HOST @property def is_production(self): return settings.is_production class _ConfigCompat: figma = _FigmaConfigCompat() server = _ServerConfigCompat() def summary(self): return { "figma": { "configured": settings.figma_configured, "cache_ttl": settings.FIGMA_CACHE_TTL, }, "server": { "port": settings.SERVER_PORT, "env": settings.SERVER_ENV, "log_level": settings.LOG_LEVEL, }, "database": {"path": str(settings.DATABASE_PATH)}, } config = _ConfigCompat() # === Runtime Configuration === class RuntimeConfig: """ ⚙️ ENDOCRINE HORMONE STORAGE - Runtime configuration system. The endocrine system regulates behavior through hormones. This configuration manager stores the component's behavioral preferences and adaptation state. Persists to .dss/runtime-config.json so the component remembers its preferences even after sleep (shutdown). """ def __init__(self): self.config_path = Path(__file__).parent.parent.parent / ".dss" / "runtime-config.json" self.config_path.parent.mkdir(parents=True, exist_ok=True) self._data = self._load() def _load(self) -> dict: if self.config_path.exists(): try: return json.loads(self.config_path.read_text()) except (json.JSONDecodeError, IOError): # Config file corrupted or unreadable, use defaults pass return { "mode": "local", # "local" or "server" "figma": {"token": "", "configured": False}, "services": { "storybook": {"enabled": False, "port": 6006, "url": ""}, "chromatic": {"enabled": False, "project_token": ""}, "github": {"enabled": False, "repo": ""}, }, "features": { "visual_qa": True, "token_sync": True, "code_gen": True, "ai_advisor": False, }, } def _save(self): self.config_path.write_text(json.dumps(self._data, indent=2)) def get(self, key: str = None): if key is None: # Return safe copy without secrets safe = self._data.copy() if safe.get("figma", {}).get("token"): safe["figma"]["token"] = "***configured***" return safe return self._data.get(key) def set(self, key: str, value: Any): self._data[key] = value self._save() return self._data[key] def update(self, updates: dict): for key, value in updates.items(): if isinstance(value, dict) and isinstance(self._data.get(key), dict): self._data[key].update(value) else: self._data[key] = value self._save() return self.get() def set_figma_token(self, token: str): self._data["figma"]["token"] = token self._data["figma"]["configured"] = bool(token) self._save() # Also update the global config os.environ["FIGMA_TOKEN"] = token return {"configured": bool(token)} runtime_config = RuntimeConfig() # === MVP1 Services Initialization === # Initialize services for project configuration architecture config_service = ConfigService() project_manager = ProjectManager(Projects, config_service) # Ensure database schema is up to date (adds root_path column if missing) ProjectManager.ensure_schema() # === Service Discovery === class ServiceDiscovery: """ Service discovery for companion services (Storybook, Chromatic, dev servers). Checks known ports to discover running services. """ KNOWN_SERVICES = { "storybook": {"ports": [6006, 6007], "health": "/"}, "chromatic": {"ports": [], "health": None}, "vite": {"ports": [5173, 5174, 3000], "health": "/"}, "webpack": {"ports": [8080, 8081], "health": "/"}, "nextjs": {"ports": [3000, 3001], "health": "/"}, } @classmethod async def discover(cls) -> dict: """Discover running services by checking known ports.""" import socket discovered = {} for service, info in cls.KNOWN_SERVICES.items(): for port in info["ports"]: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) result = sock.connect_ex(("127.0.0.1", port)) sock.close() if result == 0: discovered[service] = { "running": True, "port": port, "url": f"http://localhost:{port}", } break except (OSError, socket.error): # Service not running on this port pass if service not in discovered: discovered[service] = {"running": False, "port": None, "url": None} return discovered @classmethod async def check_storybook(cls) -> dict: """Check Storybook status specifically.""" import httpx configured = runtime_config.get("services").get("storybook", {}) port = configured.get("port", 6006) url = configured.get("url") or f"http://localhost:{port}" try: async with httpx.AsyncClient(timeout=2.0) as client: resp = await client.get(url) return {"running": resp.status_code == 200, "url": url, "port": port} except (httpx.ConnectError, httpx.TimeoutException, httpx.HTTPError): # Storybook not running or unreachable return {"running": False, "url": url, "port": port} # === App Setup === app = FastAPI( title="Design System Server (DSS)", description="API for design system management and Figma integration", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include browser logger router for console log forwarding app.include_router(browser_log_router) # Mount Admin UI static files UI_DIR = Path(__file__).parent.parent.parent / "admin-ui" if UI_DIR.exists(): app.mount("/admin-ui", StaticFiles(directory=str(UI_DIR), html=True), name="admin-ui") # Initialize Figma tools with token from runtime config figma_config = runtime_config.get("figma") figma_token_at_startup = figma_config.get("token") if figma_config else None figma_suite = FigmaToolSuite( token=figma_token_at_startup, output_dir=str(Path(__file__).parent.parent.parent / ".dss" / "output"), ) # === Request/Response Models === class ProjectCreate(BaseModel): name: str description: str = "" figma_file_key: str = "" root_path: str = "" # MVP1: Project root directory path class ProjectUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None figma_file_key: Optional[str] = None status: Optional[str] = None root_path: Optional[str] = None # MVP1: Update project root path class FigmaExtractRequest(BaseModel): file_key: str format: str = "css" class FigmaSyncRequest(BaseModel): file_key: str target_path: str format: str = "css" class TeamCreate(BaseModel): name: str description: str = "" class FigmaFileCreate(BaseModel): figma_url: str file_name: str file_key: str class ESRECreate(BaseModel): name: str definition_text: str expected_value: Optional[str] = None component_name: Optional[str] = None class TokenDriftCreate(BaseModel): component_id: str property_name: str hardcoded_value: str file_path: str line_number: int severity: str = "warning" suggested_token: Optional[str] = None # === Authentication === async def get_current_user(authorization: Optional[str] = Header(None)) -> Dict[str, Any]: """ Dependency to get current authenticated user from JWT token. Usage: user = Depends(get_current_user) """ if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Not authenticated") token = authorization.replace("Bearer ", "") auth = get_auth() user_data = auth.verify_token(token) if not user_data: raise HTTPException(status_code=401, detail="Invalid or expired token") return user_data class LoginRequest(BaseModel): url: str # Atlassian URL email: str api_token: str service: str = "jira" # "jira" or "confluence" @app.post("/api/auth/login") async def login(request: LoginRequest): """ Authenticate with Atlassian credentials. Validates credentials against Jira or Confluence API, creates/updates user in database, returns JWT token. """ try: auth = get_auth() result = await auth.login( url=request.url, email=request.email, api_token=request.api_token, service=request.service, ) return result except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"Login failed: {str(e)}") @app.get("/api/auth/me") async def get_me(user: Dict[str, Any] = Depends(get_current_user)): """Get current authenticated user info.""" auth = get_auth() user_data = await auth.get_user_by_id(user["user_id"]) if not user_data: raise HTTPException(status_code=404, detail="User not found") return user_data # === Root & Health === @app.get("/") async def root(): """Redirect to Admin UI dashboard.""" from fastapi.responses import RedirectResponse return RedirectResponse(url="/admin-ui/index.html") @app.get("/health") async def health(): """ Health check endpoint. Performs a complete health diagnostic on the DSS server. Returns 200 OK with service status. Services Checked: - Storage - Is the data directory accessible? - MCP Handler - Is the MCP handler initialized? - Figma - Is the Figma integration configured? """ import os from pathlib import Path import psutil # Check storage connectivity storage_ok = False try: from dss.storage.json_store import DATA_DIR storage_ok = DATA_DIR.exists() except Exception as e: print(f"[Health] Storage check error: {type(e).__name__}: {e}", flush=True) # Check MCP handler functionality mcp_ok = False try: import sys from pathlib import Path project_root = Path(__file__).parent.parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from dss.mcp_server.handler import get_mcp_handler handler = get_mcp_handler() mcp_ok = handler is not None except Exception as e: print(f"[Health] MCP handler check error: {type(e).__name__}: {e}", flush=True) # Get uptime try: process = psutil.Process(os.getpid()) uptime_seconds = int( (datetime.now() - datetime.fromtimestamp(process.create_time())).total_seconds() ) except: uptime_seconds = 0 # Overall status status = "healthy" if (storage_ok and mcp_ok) else "degraded" return { "status": status, "uptime_seconds": uptime_seconds, "version": "0.8.0", "timestamp": datetime.utcnow().isoformat() + "Z", "services": { "storage": "ok" if storage_ok else "error", "mcp": "ok" if mcp_ok else "error", "figma": "connected" if config.figma.is_configured else "not configured", }, } # === DEBUG ENDPOINTS === @app.post("/api/browser-logs") async def receive_browser_logs(logs: dict): """ 📋 BROWSER LOG COLLECTION ENDPOINT. Receives browser logs from the dashboard and stores them for debugging. Browser logger (browser-logger.js) POSTs logs here automatically or on demand. Expected payload: { "sessionId": "session-timestamp-random", "exportedAt": "ISO timestamp", "logs": [...], "diagnostic": {...} } """ import time from pathlib import Path # Create browser logs directory if doesn't exist browser_logs_dir = Path(__file__).parent.parent.parent / ".dss" / "browser-logs" browser_logs_dir.mkdir(parents=True, exist_ok=True) # Get or generate session ID session_id = logs.get("sessionId", f"session-{int(time.time())}") # Store logs as JSON file log_file = browser_logs_dir / f"{session_id}.json" log_file.write_text(json.dumps(logs, indent=2)) # Log to activity (skip if ActivityLog not available) try: with get_connection() as conn: conn.execute( """ INSERT INTO activity_log (category, action, details, metadata, created_at) VALUES (?, ?, ?, ?, ?) """, ( "debug", "browser_logs_received", f"Received browser logs for session {session_id}", json.dumps({"session_id": session_id, "log_count": len(logs.get("logs", []))}), datetime.utcnow().isoformat(), ), ) conn.commit() except: pass # Activity logging is optional # Check for errors and create notification task error_count = logs.get("diagnostic", {}).get("errorCount", 0) warn_count = logs.get("diagnostic", {}).get("warnCount", 0) if error_count > 0 or warn_count > 0: # Create task for Claude to investigate try: import httpx task_data = { "title": f"Browser errors detected in session {session_id[:20]}...", "description": f"Detected {error_count} errors and {warn_count} warnings in browser session. Use dss_get_browser_errors('{session_id}') to investigate.", "priority": 3 if error_count > 0 else 5, "project": "dss-debug", "visibility": "public", } # Create task via task-queue MCP HTTP endpoint (if available) # This runs async - don't block browser log storage import asyncio async def create_task(): try: async with httpx.AsyncClient() as client: # Task queue typically runs on same server await client.post( "http://localhost:8765/tasks", json=task_data, timeout=2.0 ) except: pass # Task creation is best-effort # Run in background asyncio.create_task(create_task()) except: pass # Task creation is optional return { "status": "stored", "sessionId": session_id, "logCount": len(logs.get("logs", [])), "storedAt": datetime.utcnow().isoformat() + "Z", "errorsDetected": error_count > 0 or warn_count > 0, } @app.get("/api/browser-logs/{session_id}") async def get_browser_logs(session_id: str): """ 📋 RETRIEVE BROWSER LOGS. Retrieves stored browser logs by session ID. """ from pathlib import Path browser_logs_dir = Path(__file__).parent.parent.parent / ".dss" / "browser-logs" log_file = browser_logs_dir / f"{session_id}.json" if not log_file.exists(): raise HTTPException(status_code=404, detail=f"Session not found: {session_id}") logs = json.loads(log_file.read_text()) return logs @app.get("/api/debug/diagnostic") async def get_debug_diagnostic(): """ 🔍 COMPREHENSIVE SYSTEM DIAGNOSTIC. Returns detailed system diagnostic including: - Health status (from /health endpoint) - Browser log session count - API uptime - Database size and stats - Memory usage - Recent errors """ import os from pathlib import Path import psutil # Get health status health_status = await health() # Get browser log sessions browser_logs_dir = Path(__file__).parent.parent.parent / ".dss" / "browser-logs" browser_logs_dir.mkdir(parents=True, exist_ok=True) browser_sessions = len(list(browser_logs_dir.glob("*.json"))) # Get database size db_path = Path(__file__).parent.parent.parent / ".dss" / "dss.db" db_size_bytes = db_path.stat().st_size if db_path.exists() else 0 # Get process stats process = psutil.Process(os.getpid()) memory_info = process.memory_info() # Get recent errors from activity log try: with get_connection() as conn: recent_errors = conn.execute( """ SELECT category, action, details, created_at FROM activity_log WHERE category = 'error' OR action LIKE '%error%' OR action LIKE '%fail%' ORDER BY created_at DESC LIMIT 10 """ ).fetchall() recent_errors = [ {"category": row[0], "action": row[1], "details": row[2], "timestamp": row[3]} for row in recent_errors ] except: recent_errors = [] return { "status": health_status["status"], "timestamp": datetime.utcnow().isoformat() + "Z", "health": health_status, "browser": {"session_count": browser_sessions, "logs_directory": str(browser_logs_dir)}, "database": { "size_bytes": db_size_bytes, "size_mb": round(db_size_bytes / 1024 / 1024, 2), "path": str(db_path), }, "process": { "pid": os.getpid(), "memory_rss_mb": round(memory_info.rss / 1024 / 1024, 2), "memory_vms_mb": round(memory_info.vms / 1024 / 1024, 2), "threads": process.num_threads(), }, "recent_errors": recent_errors, } @app.get("/api/debug/workflows") async def list_workflows(): """ 📋 LIST AVAILABLE DEBUG WORKFLOWS. Returns list of available workflows from .dss/WORKFLOWS/ directory. Each workflow is a markdown file with step-by-step debugging procedures. """ from pathlib import Path workflows_dir = Path(__file__).parent.parent.parent / ".dss" / "WORKFLOWS" if not workflows_dir.exists(): return {"workflows": [], "count": 0} workflows = [] for workflow_file in sorted(workflows_dir.glob("*.md")): if workflow_file.name == "README.md": continue # Read first few lines for metadata content = workflow_file.read_text() lines = content.split("\n") # Extract title (first # heading) title = workflow_file.stem for line in lines[:10]: if line.startswith("# "): title = line[2:].strip() break # Extract purpose purpose = "" for i, line in enumerate(lines[:20]): if line.startswith("**Purpose**:"): purpose = line.replace("**Purpose**:", "").strip() break workflows.append( { "id": workflow_file.stem, "title": title, "purpose": purpose, "file": workflow_file.name, "path": str(workflow_file), } ) return {"workflows": workflows, "count": len(workflows), "directory": str(workflows_dir)} @app.get("/api/config") async def get_config(): """ Public configuration endpoint. Returns ONLY safe, non-sensitive configuration values that are safe to expose to the client browser. SECURITY: This endpoint is the ONLY place where configuration is exposed. All other config values (secrets, API keys, etc.) must be server-only. """ # Import here to avoid circular imports try: from config import get_public_config return get_public_config() except ImportError: # Fallback for legacy deployments return { "dssHost": os.environ.get("DSS_HOST", "localhost"), "dssPort": os.environ.get("DSS_PORT", "3456"), "storybookPort": 6006, } @app.get("/api/stats") async def get_statistics(): """Get database and system statistics.""" db_stats = get_stats() return { "database": db_stats, "figma": {"mode": figma_suite.mode, "configured": config.figma.is_configured}, } # === Projects === @app.get("/api/projects") async def list_projects(status: Optional[str] = None): """List all projects.""" projects = Projects.list(status=status) return projects @app.get("/api/projects/{project_id}") async def get_project(project_id: str): """Get a specific project.""" project = Projects.get(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") return project @app.post("/api/projects") async def create_project(project: ProjectCreate): """Create a new project.""" project_id = f"proj-{int(datetime.utcnow().timestamp() * 1000)}" created = Projects.create( id=project_id, name=project.name, description=project.description, figma_file_key=project.figma_file_key, ) ActivityLog.log( action="project_created", entity_type="project", entity_id=project_id, project_id=project_id, details={"name": project.name}, ) return created @app.put("/api/projects/{project_id}") async def update_project(project_id: str, update: ProjectUpdate): """Update a project.""" existing = Projects.get(project_id) if not existing: raise HTTPException(status_code=404, detail="Project not found") update_data = {k: v for k, v in update.dict().items() if v is not None} if not update_data: return existing updated = Projects.update(project_id, **update_data) ActivityLog.log( action="project_updated", entity_type="project", entity_id=project_id, project_id=project_id, details=update_data, ) return updated @app.delete("/api/projects/{project_id}") async def delete_project(project_id: str): """Delete a project.""" if not Projects.delete(project_id): raise HTTPException(status_code=404, detail="Project not found") ActivityLog.log(action="project_deleted", entity_type="project", entity_id=project_id) return {"success": True} # === Components === @app.get("/api/projects/{project_id}/components") async def list_components(project_id: str): """List components for a project.""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") return Components.list(project_id) # === Figma Integration === @app.post("/api/figma/extract-variables") async def extract_variables(request: FigmaExtractRequest, background_tasks: BackgroundTasks): """Extract design tokens from Figma variables.""" try: result = await figma_suite.extract_variables(request.file_key, request.format) ActivityLog.log( action="figma_extract_variables", entity_type="figma", details={ "file_key": request.file_key, "format": request.format, "tokens_count": result.get("tokens_count"), }, ) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Token extraction failed: {str(e)}") @app.post("/api/figma/extract-components") async def extract_components(request: FigmaExtractRequest): """Extract component definitions from Figma.""" try: result = await figma_suite.extract_components(request.file_key) ActivityLog.log( action="figma_extract_components", entity_type="figma", details={"file_key": request.file_key, "count": result.get("components_count")}, ) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/figma/extract-styles") async def extract_styles(request: FigmaExtractRequest): """Extract style definitions from Figma.""" try: result = await figma_suite.extract_styles(request.file_key) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Style extraction failed: {str(e)}") @app.post("/api/figma/sync-tokens") async def sync_tokens(request: FigmaSyncRequest): """Sync tokens from Figma to target file.""" try: result = await figma_suite.sync_tokens( request.file_key, request.target_path, request.format ) ActivityLog.log( action="figma_sync_tokens", entity_type="figma", details={ "file_key": request.file_key, "target": request.target_path, "tokens_synced": result.get("tokens_synced"), }, ) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Token sync failed: {str(e)}") @app.post("/api/figma/validate") async def validate_components(request: FigmaExtractRequest): """Validate component definitions against design system rules.""" try: result = await figma_suite.validate_components(request.file_key) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/figma/generate-code") async def generate_code(file_key: str, component_name: str, framework: str = "webcomponent"): """Generate component code from Figma.""" try: result = await figma_suite.generate_code(file_key, component_name, framework) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/figma/health") async def figma_health(): """Check Figma connection status.""" is_live = figma_suite.mode == "live" return { "status": "ok" if is_live else "degraded", "mode": figma_suite.mode, "message": "Figma connected" if is_live else "Running in mock mode. Configure FIGMA_TOKEN for live API.", } # === Discovery === @app.get("/api/discovery") async def run_discovery(path: str = "."): """Run project discovery.""" script_path = Path(__file__).parent.parent / "discovery" / "discover.sh" try: result = subprocess.run( [str(script_path), path], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: return json.loads(result.stdout) else: return {"error": result.stderr} except subprocess.TimeoutExpired: raise HTTPException(status_code=504, detail="Discovery timed out") except json.JSONDecodeError: return {"raw_output": result.stdout} class DiscoveryScanRequest(BaseModel): path: str = "." full_scan: bool = False @app.post("/api/discovery/scan") async def scan_project(request: DiscoveryScanRequest): """Run project discovery scan.""" script_path = Path(__file__).parent.parent / "discovery" / "discover.sh" try: result = subprocess.run( [str(script_path), request.path], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: data = json.loads(result.stdout) ActivityLog.log( action="discovery_scan", entity_type="project", details={"path": request.path, "full_scan": request.full_scan}, ) return data else: return {"error": result.stderr} except subprocess.TimeoutExpired: raise HTTPException(status_code=504, detail="Discovery timed out") except json.JSONDecodeError: return {"raw_output": result.stdout} @app.get("/api/discovery/stats") async def get_discovery_stats(): """Get project statistics.""" db_stats = get_stats() return { "projects": db_stats.get("projects", {}), "tokens": db_stats.get("tokens", {"total": 0}), "components": db_stats.get("components", {"total": 0}), "syncs": { "today": 0, "this_week": 0, "total": db_stats.get("syncs", {}).get("total", 0), "last_sync": None, }, "stories": {"total": 0}, } @app.get("/api/discovery/activity") async def get_discovery_activity(limit: int = Query(default=10, le=50)): """Get recent discovery activity.""" return ActivityLog.recent(limit=limit) @app.get("/api/discovery/ports") async def discover_ports(): """Discover listening ports and services.""" script_path = Path(__file__).parent.parent / "discovery" / "discover-ports.sh" try: result = subprocess.run([str(script_path)], capture_output=True, text=True, timeout=10) return json.loads(result.stdout) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/discovery/env") async def discover_env(path: str = "."): """Analyze environment configuration.""" script_path = Path(__file__).parent.parent / "discovery" / "discover-env.sh" try: result = subprocess.run( [str(script_path), path], capture_output=True, text=True, timeout=10 ) return json.loads(result.stdout) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # === Activity & Sync History === @app.get("/api/activity") async def get_activity(limit: int = Query(default=50, le=100)): """Get recent activity log.""" return ActivityLog.recent(limit=limit) @app.get("/api/sync-history") async def get_sync_history( project_id: Optional[str] = None, limit: int = Query(default=20, le=100) ): """Get sync history.""" return SyncHistory.recent(project_id=project_id, limit=limit) # === Audit Log (Enhanced) === @app.get("/api/audit") async def get_audit_log( project_id: Optional[str] = None, user_id: Optional[str] = None, action: Optional[str] = None, category: Optional[str] = None, entity_type: Optional[str] = None, severity: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = Query(default=0, ge=0), ): """ Get audit log with advanced filtering. Query parameters: - project_id: Filter by project - user_id: Filter by user - action: Filter by specific action - category: Filter by category (design_system, code, configuration, etc.) - entity_type: Filter by entity type (project, component, token, etc.) - severity: Filter by severity (info, warning, critical) - start_date: Filter from date (ISO format) - end_date: Filter to date (ISO format) - limit: Number of results (max 200) - offset: Pagination offset """ activities = ActivityLog.search( project_id=project_id, user_id=user_id, action=action, category=category, entity_type=entity_type, severity=severity, start_date=start_date, end_date=end_date, limit=limit, offset=offset, ) total = ActivityLog.count( project_id=project_id, user_id=user_id, action=action, category=category ) return { "activities": activities, "total": total, "limit": limit, "offset": offset, "has_more": (offset + limit) < total, } @app.get("/api/audit/stats") async def get_audit_stats(): """Get audit log statistics.""" return { "by_category": ActivityLog.get_stats_by_category(), "by_user": ActivityLog.get_stats_by_user(), "total_count": ActivityLog.count(), } @app.get("/api/audit/categories") async def get_audit_categories(): """Get list of all activity categories.""" return ActivityLog.get_categories() @app.get("/api/audit/actions") async def get_audit_actions(): """Get list of all activity actions.""" return ActivityLog.get_actions() class AuditLogRequest(BaseModel): action: str entity_type: Optional[str] = None entity_id: Optional[str] = None entity_name: Optional[str] = None project_id: Optional[str] = None user_id: Optional[str] = None user_name: Optional[str] = None team_context: Optional[str] = None description: Optional[str] = None category: Optional[str] = None severity: str = "info" details: Optional[Dict[str, Any]] = None @app.post("/api/audit") async def create_audit_entry(entry: AuditLogRequest, request: Any): """ Create a new audit log entry. Automatically captures IP and user agent from request. """ # Extract IP and user agent from request ip_address = request.client.host if hasattr(request, "client") else None user_agent = request.headers.get("user-agent") if hasattr(request, "headers") else None ActivityLog.log( action=entry.action, entity_type=entry.entity_type, entity_id=entry.entity_id, entity_name=entry.entity_name, project_id=entry.project_id, user_id=entry.user_id, user_name=entry.user_name, team_context=entry.team_context, description=entry.description, category=entry.category, severity=entry.severity, details=entry.details, ip_address=ip_address, user_agent=user_agent, ) return {"success": True, "message": "Audit entry created"} @app.get("/api/audit/export") async def export_audit_log( project_id: Optional[str] = None, category: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, format: str = Query(default="json", regex="^(json|csv)$"), ): """Export audit log in JSON or CSV format.""" activities = ActivityLog.search( project_id=project_id, category=category, start_date=start_date, end_date=end_date, limit=10000, # Max export limit ) if format == "csv": import csv import io from fastapi.responses import StreamingResponse output = io.StringIO() if activities: fieldnames = [ "created_at", "user_name", "action", "category", "description", "project_id", "entity_type", "entity_name", "severity", ] writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() writer.writerows(activities) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=audit_log_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv" }, ) else: # JSON format return { "activities": activities, "total": len(activities), "exported_at": datetime.utcnow().isoformat() + "Z", } # === Teams === @app.get("/api/teams") async def list_teams(): """List all teams.""" return Teams.list() @app.post("/api/teams") async def create_team(team: TeamCreate): """Create a new team.""" team_id = f"team-{int(datetime.utcnow().timestamp() * 1000)}" created = Teams.create(team_id, team.name, team.description) return created @app.get("/api/teams/{team_id}") async def get_team(team_id: str): """Get a specific team.""" team = Teams.get(team_id) if not team: raise HTTPException(status_code=404, detail="Team not found") return team # === Cache Management === @app.post("/api/cache/clear") async def clear_cache(): """Clear expired cache entries.""" count = Cache.clear_expired() return {"cleared": count} @app.delete("/api/cache") async def purge_cache(): """Purge all cache entries.""" Cache.clear_all() return {"success": True} # === Configuration Management === class ConfigUpdate(BaseModel): mode: Optional[str] = None figma_token: Optional[str] = None services: Optional[Dict[str, Any]] = None features: Optional[Dict[str, bool]] = None @app.get("/api/config") async def get_config(): """Get current runtime configuration (secrets masked).""" return { "config": runtime_config.get(), "env": config.summary(), "mode": runtime_config.get("mode"), } @app.put("/api/config") async def update_config(update: ConfigUpdate): """Update runtime configuration.""" updates = {} if update.mode: updates["mode"] = update.mode if update.figma_token is not None: runtime_config.set_figma_token(update.figma_token) # Reinitialize Figma tools with new token global figma_suite figma_suite = FigmaToolSuite( token=update.figma_token, output_dir=str(Path(__file__).parent.parent.parent / ".dss" / "output"), ) ActivityLog.log( action="figma_token_updated", entity_type="config", details={"configured": bool(update.figma_token)}, ) if update.services: updates["services"] = update.services if update.features: updates["features"] = update.features if updates: runtime_config.update(updates) ActivityLog.log( action="config_updated", entity_type="config", details={"keys": list(updates.keys())} ) return runtime_config.get() @app.get("/api/config/figma") async def get_figma_config(): """Get Figma configuration status.""" figma_cfg = runtime_config.get("figma") return { "configured": figma_cfg.get("configured", False), "mode": figma_suite.mode, "features": { "extract_variables": True, "extract_components": True, "extract_styles": True, "sync_tokens": True, "validate": True, "generate_code": True, }, } @app.post("/api/config/figma/test") async def test_figma_connection(): """Test Figma API connection.""" try: # Try to make a simple API call if not runtime_config.get("figma").get("configured"): return {"success": False, "error": "Figma token not configured"} # Test with a minimal API call import httpx token = runtime_config._data["figma"]["token"] async with httpx.AsyncClient() as client: resp = await client.get("https://api.figma.com/v1/me", headers={"X-Figma-Token": token}) if resp.status_code == 200: user = resp.json() return { "success": True, "user": user.get("email", "connected"), "handle": user.get("handle"), } else: return {"success": False, "error": f"API returned {resp.status_code}"} except Exception as e: return {"success": False, "error": str(e)} # === Service Discovery === @app.get("/api/services") async def list_services(): """List configured and discovered services.""" configured = runtime_config.get("services") discovered = await ServiceDiscovery.discover() return { "configured": configured, "discovered": discovered, "storybook": await ServiceDiscovery.check_storybook(), } @app.put("/api/services/{service_name}") async def configure_service(service_name: str, config_data: Dict[str, Any]): """Configure a service.""" services = runtime_config.get("services") or {} services[service_name] = {**services.get(service_name, {}), **config_data} runtime_config.set("services", services) ActivityLog.log( action="service_configured", entity_type="service", entity_id=service_name, details={"keys": list(config_data.keys())}, ) return services[service_name] @app.get("/api/services/storybook") async def get_storybook_status(): """Get Storybook service status.""" return await ServiceDiscovery.check_storybook() @app.post("/api/storybook/init") async def init_storybook(request_data: Dict[str, Any] = None): """ Initialize Storybook with design system components. Clears existing generated stories and generates new ones from the specified component source path. Request body (optional): source_path: Path to components directory (defaults to configured path) Returns: JSON with generation status and count """ import shutil import sys try: # Get paths dss_mvp1_path = Path(__file__).parent.parent.parent / "dss-mvp1" generated_dir = dss_mvp1_path / "stories" / "generated" # Default source path - can be overridden in request source_path = dss_mvp1_path / "dss" / "components" if request_data and request_data.get("source_path"): # Validate path is within allowed directories requested_path = Path(request_data["source_path"]).resolve() if not str(requested_path).startswith(str(dss_mvp1_path.resolve())): raise HTTPException(status_code=400, detail="Source path must be within dss-mvp1") source_path = requested_path # Step 1: Clear existing generated stories if generated_dir.exists(): for item in generated_dir.iterdir(): if item.name != ".gitkeep": if item.is_dir(): shutil.rmtree(item) else: item.unlink() else: generated_dir.mkdir(parents=True, exist_ok=True) # Step 2: Generate stories using StoryGenerator stories_generated = 0 errors = [] # Add dss-mvp1 to path for imports sys.path.insert(0, str(dss_mvp1_path)) try: from dss.storybook.generator import StoryGenerator, StoryTemplate generator = StoryGenerator(str(dss_mvp1_path)) # Check if source path exists and has components if source_path.exists(): results = await generator.generate_stories_for_directory( str(source_path.relative_to(dss_mvp1_path)), template=StoryTemplate.CSF3, dry_run=False, ) # Move generated stories to stories/generated/ for result in results: if "story" in result and "error" not in result: story_filename = Path(result["component"]).stem + ".stories.js" output_path = generated_dir / story_filename output_path.write_text(result["story"]) stories_generated += 1 elif "error" in result: errors.append(result) else: # No components yet - that's okay, Storybook will show welcome pass except ImportError as e: # StoryGenerator not available - log but don't fail errors.append({"error": f"StoryGenerator import failed: {str(e)}"}) finally: # Clean up path if str(dss_mvp1_path) in sys.path: sys.path.remove(str(dss_mvp1_path)) ActivityLog.log( action="storybook_initialized", entity_type="storybook", details={"stories_generated": stories_generated, "errors_count": len(errors)}, ) return { "success": True, "stories_generated": stories_generated, "message": f"Generated {stories_generated} stories" if stories_generated > 0 else "Storybook initialized (no components found)", "errors": errors if errors else None, } except HTTPException: raise except Exception as e: ActivityLog.log( action="storybook_init_failed", entity_type="storybook", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=f"Storybook initialization failed: {str(e)}") @app.delete("/api/storybook/stories") async def clear_storybook_stories(): """ Clear all generated stories from Storybook. Returns Storybook to blank state (only Welcome page). """ import shutil try: dss_mvp1_path = Path(__file__).parent.parent.parent / "dss-mvp1" generated_dir = dss_mvp1_path / "stories" / "generated" cleared_count = 0 if generated_dir.exists(): for item in generated_dir.iterdir(): if item.name != ".gitkeep": if item.is_dir(): shutil.rmtree(item) else: item.unlink() cleared_count += 1 ActivityLog.log( action="storybook_cleared", entity_type="storybook", details={"cleared_count": cleared_count}, ) return { "success": True, "cleared_count": cleared_count, "message": "Storybook stories cleared", } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to clear stories: {str(e)}") # === Design System Ingestion === class IngestionRequest(BaseModel): """Request for design system ingestion via natural language.""" prompt: str project_id: Optional[str] = None class IngestionConfirmRequest(BaseModel): """Confirm ingestion of a specific design system.""" system_id: str method: str = "npm" # npm, figma, css, manual source_url: Optional[str] = None options: Optional[Dict[str, Any]] = {} @app.post("/api/ingest/parse") async def parse_ingestion_prompt(request: IngestionRequest): """ Parse a natural language ingestion prompt. Understands prompts like: - "add heroui" - "ingest material ui" - "import from figma.com/file/abc123" - "use shadcn for our design system" Returns parsed intent, detected design systems, and next steps. """ try: from ingestion_parser import parse_and_suggest result = parse_and_suggest(request.prompt) ActivityLog.log( action="ingestion_prompt_parsed", entity_type="ingestion", project_id=request.project_id, details={ "prompt": request.prompt[:100], "intent": result.get("intent"), "sources_found": len(result.get("sources", [])), }, ) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to parse prompt: {str(e)}") @app.get("/api/ingest/systems") async def list_known_systems( category: Optional[str] = None, framework: Optional[str] = None, search: Optional[str] = None ): """ List known design systems from the registry. Query params: - category: Filter by category (component-library, css-framework, design-system, css-tokens) - framework: Filter by framework (react, vue, angular, html) - search: Search by name or alias """ try: from design_system_registry import ( get_all_systems, get_systems_by_category, get_systems_by_framework, search_design_systems, ) if search: systems = search_design_systems(search, limit=20) elif category: systems = get_systems_by_category(category) elif framework: systems = get_systems_by_framework(framework) else: systems = get_all_systems() return { "systems": [s.to_dict() for s in systems], "count": len(systems), "filters": {"category": category, "framework": framework, "search": search}, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/ingest/systems/{system_id}") async def get_system_info(system_id: str): """Get detailed information about a specific design system.""" try: from design_system_registry import find_design_system, get_alternative_ingestion_options system = find_design_system(system_id) if not system: raise HTTPException(status_code=404, detail=f"Design system not found: {system_id}") alternatives = get_alternative_ingestion_options(system) return {"system": system.to_dict(), "alternatives": alternatives} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/ingest/npm/search") async def search_npm_packages( query: str, limit: int = Query(default=10, le=50), design_systems_only: bool = True ): """ Search npm registry for design system packages. Filters results to likely design system packages by default. """ try: from npm_search import search_npm results = await search_npm(query, limit=limit, design_systems_only=design_systems_only) return { "packages": [r.to_dict() for r in results], "count": len(results), "query": query, "design_systems_only": design_systems_only, } except Exception as e: raise HTTPException(status_code=500, detail=f"npm search failed: {str(e)}") @app.get("/api/ingest/npm/package/{package_name:path}") async def get_npm_package_info(package_name: str): """ Get detailed information about an npm package. Package name can include scope (e.g., @heroui/react). """ try: from npm_search import get_package_info info = await get_package_info(package_name) if not info: raise HTTPException(status_code=404, detail=f"Package not found: {package_name}") return info.to_dict() except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/ingest/confirm") async def confirm_ingestion(request: IngestionConfirmRequest): """ Confirm and execute design system ingestion. After parsing a prompt and getting user confirmation, this endpoint performs the actual ingestion. Supports multiple methods: - npm: Install npm packages and extract tokens - figma: Extract tokens from Figma URL - css: Fetch and parse CSS file - manual: Process manual token definitions """ try: from design_system_registry import find_design_system system = find_design_system(request.system_id) if not system: # Try to find via npm from npm_search import get_package_info npm_info = await get_package_info(request.system_id) if not npm_info: raise HTTPException( status_code=404, detail=f"Design system not found: {request.system_id}" ) # Execute ingestion based on method result = { "success": True, "system_id": request.system_id, "method": request.method, "status": "queued", } if request.method == "npm": # Queue npm package installation and token extraction packages = system.npm_packages if system else [request.system_id] result["packages"] = packages result["message"] = f"Will install: {', '.join(packages)}" result["next_steps"] = [ "Install npm packages", "Extract design tokens", "Generate Storybook stories", "Update token configuration", ] elif request.method == "figma": if not request.source_url: raise HTTPException(status_code=400, detail="Figma URL required for figma method") result["figma_url"] = request.source_url result["message"] = "Will extract tokens from Figma" result["next_steps"] = [ "Authenticate with Figma", "Extract design tokens", "Map to CSS variables", "Generate component stories", ] elif request.method == "css": if not request.source_url: # Use CDN URL if available if system and system.css_cdn_url: request.source_url = system.css_cdn_url else: raise HTTPException(status_code=400, detail="CSS URL required for css method") result["css_url"] = request.source_url result["message"] = "Will parse CSS for design tokens" result["next_steps"] = [ "Fetch CSS file", "Parse CSS variables", "Extract color/spacing/typography tokens", "Create token collection", ] elif request.method == "manual": result["message"] = "Manual token entry mode" result["next_steps"] = [ "Enter color tokens", "Enter typography tokens", "Enter spacing tokens", "Review and confirm", ] ActivityLog.log( action="ingestion_confirmed", entity_type="ingestion", entity_id=request.system_id, details={"method": request.method, "status": "queued"}, ) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/ingest/execute") async def execute_ingestion( system_id: str, method: str = "npm", source_url: Optional[str] = None, project_id: Optional[str] = None, ): """ Execute the actual ingestion process. This performs the heavy lifting: - For npm: Extracts tokens from installed packages - For figma: Calls Figma API to get design tokens - For css: Fetches and parses CSS variables """ try: from design_system_registry import find_design_system system = find_design_system(system_id) tokens_extracted = 0 if method == "npm" and system: # Import existing token ingestion tools sys.path.insert(0, str(Path(__file__).parent.parent.parent / "dss-mvp1")) try: from dss.ingest import TokenCollection # Create a token collection for this design system collection = TokenCollection(name=system.name) # Based on primary ingestion method, use appropriate source if system.primary_ingestion.value == "css_variables": if system.css_cdn_url: # Fetch CSS from CDN and parse import httpx async with httpx.AsyncClient() as client: resp = await client.get(system.css_cdn_url) if resp.status_code == 200: from dss.ingest.css import CSSTokenSource # Write temp file and parse temp_css = Path("/tmp") / f"{system.id}_tokens.css" temp_css.write_text(resp.text) source = CSSTokenSource(str(temp_css)) source.parse() collection.merge(source.tokens) tokens_extracted = len(collection.tokens) elif system.primary_ingestion.value == "tailwind_config": # For Tailwind-based systems, we'll need their config tokens_extracted = 0 # Placeholder for Tailwind parsing except ImportError: # Token ingestion module not available pass finally: if str(Path(__file__).parent.parent.parent / "dss-mvp1") in sys.path: sys.path.remove(str(Path(__file__).parent.parent.parent / "dss-mvp1")) elif method == "figma" and source_url: # Use existing Figma extraction result = await figma_suite.extract_variables(source_url.split("/")[-1], "css") tokens_extracted = result.get("tokens_count", 0) elif method == "css" and source_url: # Fetch and parse CSS import httpx sys.path.insert(0, str(Path(__file__).parent.parent.parent / "dss-mvp1")) try: async with httpx.AsyncClient() as client: resp = await client.get(source_url) if resp.status_code == 200: from dss.ingest.css import CSSTokenSource temp_css = Path("/tmp") / "ingested_tokens.css" temp_css.write_text(resp.text) source = CSSTokenSource(str(temp_css)) source.parse() tokens_extracted = len(source.tokens.tokens) finally: if str(Path(__file__).parent.parent.parent / "dss-mvp1") in sys.path: sys.path.remove(str(Path(__file__).parent.parent.parent / "dss-mvp1")) ActivityLog.log( action="ingestion_executed", entity_type="ingestion", entity_id=system_id, project_id=project_id, details={"method": method, "tokens_extracted": tokens_extracted}, ) return { "success": True, "system_id": system_id, "method": method, "tokens_extracted": tokens_extracted, "message": f"Extracted {tokens_extracted} tokens from {system.name if system else system_id}", } except Exception as e: ActivityLog.log( action="ingestion_failed", entity_type="ingestion", entity_id=system_id, details={"error": str(e)}, ) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/ingest/alternatives") async def get_ingestion_alternatives(system_id: Optional[str] = None): """ Get alternative ingestion methods. When the primary method fails or isn't available, suggests other ways to ingest the design system. """ try: from design_system_registry import find_design_system, get_alternative_ingestion_options system = None if system_id: system = find_design_system(system_id) return get_alternative_ingestion_options(system) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # === DSS Mode === @app.get("/api/mode") async def get_mode(): """Get current DSS mode.""" mode = runtime_config.get("mode") return { "mode": mode, "description": "Local dev companion" if mode == "local" else "Remote design system server", "features": runtime_config.get("features"), } @app.put("/api/mode") async def set_mode(request_data: Dict[str, Any]): """Set DSS mode (local or server).""" mode = request_data.get("mode") if not mode or mode not in ["local", "server"]: raise HTTPException(status_code=400, detail="Mode must be 'local' or 'server'") runtime_config.set("mode", mode) ActivityLog.log(action="mode_changed", entity_type="config", details={"mode": mode}) return {"mode": mode, "success": True} # === Run Server === # === Static Files (Admin UI) === # Mount at the end so API routes take precedence # This enables portable mode: ./dss start serves everything on one port # === System Administration === @app.post("/api/system/reset") async def reset_dss(request_data: Dict[str, Any]): """ Reset DSS to fresh state by calling the reset command in dss-mvp1. Requires confirmation. """ confirm = request_data.get("confirm", "") if confirm != "RESET": raise HTTPException(status_code=400, detail="Must confirm with 'RESET'") try: # Path to dss-mvp1 directory dss_mvp1_path = Path(__file__).parent.parent.parent / "dss-mvp1" # Run the reset command result = subprocess.run( ["python3", "-m", "dss.settings", "reset", "--no-confirm"], cwd=str(dss_mvp1_path), capture_output=True, text=True, timeout=60, ) if result.returncode != 0: raise Exception(f"Reset failed: {result.stderr}") ActivityLog.log(action="dss_reset", entity_type="system", details={"status": "success"}) return { "success": True, "message": "DSS has been reset to fresh state", "output": result.stdout, } except subprocess.TimeoutExpired: raise HTTPException(status_code=504, detail="Reset operation timed out") except Exception as e: ActivityLog.log(action="dss_reset_failed", entity_type="system", details={"error": str(e)}) raise HTTPException(status_code=500, detail=str(e)) # === Team Dashboards === @app.get("/api/projects/{project_id}/dashboard/summary") async def get_dashboard_summary(project_id: str): """ Get dashboard summary for all teams (thin slice). Provides overview of UX, UI, and QA metrics. """ if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") # UX Dashboard data figma_files = FigmaFiles.list(project_id) # UI Dashboard data drift_stats = TokenDriftDetector.get_stats(project_id) code_summary = CodeMetrics.get_project_summary(project_id) # QA Dashboard data esre_list = ESREDefinitions.list(project_id) test_summary = TestResults.get_project_summary(project_id) return { "project_id": project_id, "ux": { "figma_files_count": len(figma_files), "figma_files": figma_files[:5], # Show first 5 }, "ui": {"token_drift": drift_stats, "code_metrics": code_summary}, "qa": {"esre_count": len(esre_list), "test_summary": test_summary}, } # === UX Dashboard: Figma File Management === @app.get("/api/projects/{project_id}/figma-files") async def list_figma_files(project_id: str): """List all Figma files for a project (UX Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") return FigmaFiles.list(project_id) @app.post("/api/projects/{project_id}/figma-files") async def create_figma_file(project_id: str, figma_file: FigmaFileCreate): """Add a Figma file to a project (UX Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") created = FigmaFiles.create( project_id=project_id, figma_url=figma_file.figma_url, file_name=figma_file.file_name, file_key=figma_file.file_key, ) ActivityLog.log( action="figma_file_added", entity_type="figma_file", entity_id=str(created["id"]), entity_name=figma_file.file_name, project_id=project_id, team_context="ux", details={"file_key": figma_file.file_key}, ) return created @app.put("/api/projects/{project_id}/figma-files/{file_id}/sync") async def update_figma_file_sync(project_id: str, file_id: int, status: str = "synced"): """Update Figma file sync status (UX Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") updated = FigmaFiles.update_sync_status( file_id=file_id, status=status, last_synced=datetime.utcnow().isoformat() ) if not updated: raise HTTPException(status_code=404, detail="Figma file not found") ActivityLog.log( action="figma_file_synced", entity_type="figma_file", entity_id=str(file_id), project_id=project_id, team_context="ux", ) return updated @app.delete("/api/projects/{project_id}/figma-files/{file_id}") async def delete_figma_file(project_id: str, file_id: int): """Delete a Figma file (UX Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") if not FigmaFiles.delete(file_id): raise HTTPException(status_code=404, detail="Figma file not found") ActivityLog.log( action="figma_file_deleted", entity_type="figma_file", entity_id=str(file_id), project_id=project_id, team_context="ux", ) return {"success": True} # === UI Dashboard: Token Drift Detection === @app.get("/api/projects/{project_id}/token-drift") async def list_token_drift(project_id: str, severity: Optional[str] = None): """List token drift issues for a project (UI Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") drifts = TokenDriftDetector.list_by_project(project_id, severity) stats = TokenDriftDetector.get_stats(project_id) return {"drifts": drifts, "stats": stats} @app.post("/api/projects/{project_id}/token-drift") async def record_token_drift(project_id: str, drift: TokenDriftCreate): """Record a token drift issue (UI Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") created = TokenDriftDetector.record_drift( component_id=drift.component_id, property_name=drift.property_name, hardcoded_value=drift.hardcoded_value, file_path=drift.file_path, line_number=drift.line_number, severity=drift.severity, suggested_token=drift.suggested_token, ) ActivityLog.log( action="token_drift_detected", entity_type="token_drift", entity_id=str(created["id"]), project_id=project_id, team_context="ui", details={"severity": drift.severity, "component_id": drift.component_id}, ) return created @app.put("/api/projects/{project_id}/token-drift/{drift_id}/status") async def update_drift_status(project_id: str, drift_id: int, status: str): """Update token drift status: pending, fixed, ignored (UI Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") if status not in ["pending", "fixed", "ignored"]: raise HTTPException(status_code=400, detail="Invalid status") updated = TokenDriftDetector.update_status(drift_id, status) if not updated: raise HTTPException(status_code=404, detail="Drift issue not found") ActivityLog.log( action="token_drift_status_updated", entity_type="token_drift", entity_id=str(drift_id), project_id=project_id, team_context="ui", details={"status": status}, ) return updated # === QA Dashboard: ESRE Definitions === @app.get("/api/projects/{project_id}/esre") async def list_esre_definitions(project_id: str): """List all ESRE definitions for a project (QA Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") return ESREDefinitions.list(project_id) @app.post("/api/projects/{project_id}/esre") async def create_esre_definition(project_id: str, esre: ESRECreate): """Create a new ESRE definition (QA Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") created = ESREDefinitions.create( project_id=project_id, name=esre.name, definition_text=esre.definition_text, expected_value=esre.expected_value, component_name=esre.component_name, ) ActivityLog.log( action="esre_created", entity_type="esre", entity_id=str(created["id"]), entity_name=esre.name, project_id=project_id, team_context="qa", ) return created @app.put("/api/projects/{project_id}/esre/{esre_id}") async def update_esre_definition(project_id: str, esre_id: int, updates: ESRECreate): """Update an ESRE definition (QA Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") updated = ESREDefinitions.update( esre_id=esre_id, name=updates.name, definition_text=updates.definition_text, expected_value=updates.expected_value, component_name=updates.component_name, ) if not updated: raise HTTPException(status_code=404, detail="ESRE definition not found") ActivityLog.log( action="esre_updated", entity_type="esre", entity_id=str(esre_id), entity_name=updates.name, project_id=project_id, team_context="qa", ) return updated @app.delete("/api/projects/{project_id}/esre/{esre_id}") async def delete_esre_definition(project_id: str, esre_id: int): """Delete an ESRE definition (QA Dashboard).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") if not ESREDefinitions.delete(esre_id): raise HTTPException(status_code=404, detail="ESRE definition not found") ActivityLog.log( action="esre_deleted", entity_type="esre", entity_id=str(esre_id), project_id=project_id, team_context="qa", ) return {"success": True} # === Claude Chat API with MCP Tool Integration === class ClaudeChatRequest(BaseModel): """AI chat request model (supports Claude and Gemini).""" message: str context: Optional[Dict[str, Any]] = {} history: Optional[List[Dict[str, Any]]] = [] project_id: Optional[str] = None user_id: Optional[int] = 1 enable_tools: Optional[bool] = True model: Optional[str] = "claude" # "claude" or "gemini" @app.post("/api/claude/chat") async def claude_chat(request_data: ClaudeChatRequest): """ Chat with AI (Claude or Gemini) via their APIs with MCP tool integration. AI can execute DSS tools to: - Get project information - List/search components - Get design tokens - Interact with Figma, Jira, Confluence Requires ANTHROPIC_API_KEY (for Claude) or GOOGLE_API_KEY/GEMINI_API_KEY (for Gemini). """ message = request_data.message context = request_data.context or {} history = request_data.history or [] project_id = request_data.project_id or context.get("projectId") user_id = request_data.user_id or 1 enable_tools = request_data.enable_tools model_name = request_data.model or "claude" # Log the chat request ActivityLog.log( action="ai_chat", entity_type="chat", entity_id=model_name, details={ "message_length": len(message), "tools_enabled": enable_tools, "model": model_name, }, ) try: # Import AI provider from ai_providers import get_ai_provider # Get the appropriate provider provider = get_ai_provider(model_name) if not provider.is_available(): return { "success": False, "response": f"{model_name.title()} is not available. Check API keys and SDK installation.", "model": "error", } # Import MCP handler (may fail if database not migrated) mcp_handler = None MCPContext = None try: from dss_mcp.handler import get_mcp_handler, MCPContext as _MCPContext MCPContext = _MCPContext mcp_handler = get_mcp_handler() except Exception as e: # MCP handler not available, proceed without tools enable_tools = False # Build system prompt with design system context system_prompt = """You are a design system assistant with access to DSS (Design System Server) tools. You can use tools to: - Get project summaries, health scores, and statistics - List and search components in the design system - Get design tokens (colors, typography, spacing) - Interact with Figma to extract designs - Create/search Jira issues for tracking - Access Confluence documentation RULES: - Use tools when the user asks about project data, components, or tokens - Be concise: 2-3 sentences for simple questions - When showing tool results, summarize key information - If a tool fails, explain what went wrong - Always provide actionable insights from tool data""" # Add project context if available if project_id and mcp_handler: try: project_context = await mcp_handler.get_project_context(project_id, user_id) if project_context: system_prompt += f""" CURRENT PROJECT CONTEXT: - Project: {project_context.name} (ID: {project_id}) - Components: {project_context.component_count} - Health Score: {project_context.health.get('score', 'N/A')}/100 (Grade: {project_context.health.get('grade', 'N/A')}) - Integrations: {', '.join(project_context.integrations.keys()) if project_context.integrations else 'None configured'}""" except: system_prompt += f"\n\nProject ID: {project_id} (context not loaded)" elif project_id: system_prompt += f"\n\nProject ID: {project_id}" # Add user context if context: context_parts = [] if "project" in context: context_parts.append(f"Project: {context['project']}") if "file" in context: context_parts.append(f"Current file: {context['file']}") if "component" in context: context_parts.append(f"Component: {context['component']}") if context_parts: system_prompt += "\n\nUser context:\n" + "\n".join(context_parts) # Get tools if enabled tools = None if enable_tools and project_id and mcp_handler: tools = mcp_handler.get_tools_for_claude() # Create MCP context (or None if MCP not available) mcp_context = None if MCPContext is not None: mcp_context = MCPContext( project_id=project_id, user_id=user_id ) # Call AI provider with all context result = await provider.chat( message=message, system_prompt=system_prompt, history=history, tools=tools, temperature=0.7, mcp_handler=mcp_handler, mcp_context=mcp_context, ) # Log tool usage if result.get("tools_used"): ActivityLog.log( action="ai_tools_used", entity_type="chat", entity_id=model_name, project_id=project_id, details={"tools": result["tools_used"], "model": model_name}, ) return result except Exception as e: error_msg = str(e) return { "success": False, "response": f"Error connecting to {model_name.title()}: {error_msg}\n\nMake sure your API key is valid and you have API access.", "model": "error", } # === MCP Tools Proxy === @app.post("/api/mcp/{tool_name}") async def execute_mcp_tool(tool_name: str, params: Dict[str, Any] = {}): """ Proxy MCP tool execution. Calls the MCP server running on port 3457. """ try: # Import MCP server functions from mcp_server import ( analyze_react_components, analyze_style_values, build_source_graph, check_naming_consistency, create_project, discover_project, export_tokens, extract_components, extract_tokens, find_inline_styles, find_style_patterns, find_unused_styles, generate_component_code, generate_stories_batch, generate_story, generate_storybook_theme, get_activity, get_project, get_quick_wins, get_quick_wins_report, get_status, get_story_coverage, get_sync_history, ingest_css_tokens, ingest_json_tokens, ingest_scss_tokens, ingest_tailwind_tokens, list_projects, merge_tokens, scan_storybook, sync_tokens_to_file, validate_tokens, ) # Map tool names to functions tool_map = { "get_status": get_status, "list_projects": list_projects, "create_project": create_project, "get_project": get_project, "extract_tokens": extract_tokens, "extract_components": extract_components, "generate_component_code": generate_component_code, "sync_tokens_to_file": sync_tokens_to_file, "get_sync_history": get_sync_history, "get_activity": get_activity, "ingest_css_tokens": ingest_css_tokens, "ingest_scss_tokens": ingest_scss_tokens, "ingest_tailwind_tokens": ingest_tailwind_tokens, "ingest_json_tokens": ingest_json_tokens, "merge_tokens": merge_tokens, "export_tokens": export_tokens, "validate_tokens": validate_tokens, "discover_project": discover_project, "analyze_react_components": analyze_react_components, "find_inline_styles": find_inline_styles, "find_style_patterns": find_style_patterns, "analyze_style_values": analyze_style_values, "find_unused_styles": find_unused_styles, "build_source_graph": build_source_graph, "get_quick_wins": get_quick_wins, "get_quick_wins_report": get_quick_wins_report, "check_naming_consistency": check_naming_consistency, "scan_storybook": scan_storybook, "generate_story": generate_story, "generate_stories_batch": generate_stories_batch, "generate_storybook_theme": generate_storybook_theme, "get_story_coverage": get_story_coverage, } # Get the tool function tool_func = tool_map.get(tool_name) if not tool_func: raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found") # Execute tool result = await tool_func(**params) # Log execution ActivityLog.log( action="mcp_tool_executed", entity_type="tool", entity_id=tool_name, details={"params": list(params.keys())}, ) return JSONResponse(content={"success": True, "result": result}) except Exception as e: ActivityLog.log( action="mcp_tool_failed", entity_type="tool", entity_id=tool_name, details={"error": str(e)}, ) raise HTTPException(status_code=500, detail=str(e)) # === MCP Integration Endpoints === class IntegrationCreate(BaseModel): """Create/Update integration configuration.""" integration_type: str # figma, jira, confluence, sequential-thinking config: Dict[str, Any] # Encrypted in database enabled: bool = True class IntegrationUpdate(BaseModel): """Update integration.""" config: Optional[Dict[str, Any]] = None enabled: Optional[bool] = None @app.get("/api/mcp/integrations") async def list_all_integrations(): """List all available integration types and their health status.""" health_list = IntegrationHealth.list_all() if not health_list: # Return defaults if no health data exists return { "integrations": [ {"integration_type": "figma", "is_healthy": True, "failure_count": 0}, {"integration_type": "jira", "is_healthy": True, "failure_count": 0}, {"integration_type": "confluence", "is_healthy": True, "failure_count": 0}, {"integration_type": "sequential-thinking", "is_healthy": True, "failure_count": 0}, ] } return {"integrations": health_list} @app.get("/api/projects/{project_id}/integrations") async def list_project_integrations( project_id: str, user_id: Optional[int] = Query(None, description="Filter by user ID") ): """List integrations configured for a project.""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") integrations = Integrations.list(project_id, user_id) return {"integrations": integrations} @app.post("/api/projects/{project_id}/integrations") async def create_integration( project_id: str, integration: IntegrationCreate, user_id: int = Query(..., description="User ID for user-scoped integration"), ): """Create or update integration for a project (user-scoped).""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") from dss_mcp.config import mcp_config # Encrypt config config_json = json.dumps(integration.config) cipher = mcp_config.get_cipher() if cipher: encrypted_config = cipher.encrypt(config_json.encode()).decode() else: encrypted_config = config_json # Store unencrypted if no key try: Integrations.upsert( project_id=project_id, user_id=user_id, integration_type=integration.integration_type, config=encrypted_config, enabled=integration.enabled, ) ActivityLog.log( action="integration_configured", entity_type="integration", entity_id=integration.integration_type, project_id=project_id, details={"user_id": user_id, "enabled": integration.enabled}, ) return { "success": True, "integration_type": integration.integration_type, "enabled": integration.enabled, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.put("/api/projects/{project_id}/integrations/{integration_type}") async def update_integration( project_id: str, integration_type: str, update: IntegrationUpdate, user_id: int = Query(..., description="User ID"), ): """Update an existing integration.""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") from dss_mcp.config import mcp_config try: encrypted_config = None if update.config is not None: config_json = json.dumps(update.config) cipher = mcp_config.get_cipher() if cipher: encrypted_config = cipher.encrypt(config_json.encode()).decode() else: encrypted_config = config_json if update.config is None and update.enabled is None: return {"success": False, "message": "No updates provided"} result = Integrations.update( project_id=project_id, user_id=user_id, integration_type=integration_type, config=encrypted_config, enabled=update.enabled, ) if not result: raise HTTPException(status_code=404, detail="Integration not found") return {"success": True, "integration_type": integration_type} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/projects/{project_id}/integrations/{integration_type}") async def delete_integration( project_id: str, integration_type: str, user_id: int = Query(..., description="User ID") ): """Delete an integration configuration.""" if not Projects.get(project_id): raise HTTPException(status_code=404, detail="Project not found") try: deleted = Integrations.delete(project_id, user_id, integration_type) if not deleted: raise HTTPException(status_code=404, detail="Integration not found") ActivityLog.log( action="integration_deleted", entity_type="integration", entity_id=integration_type, project_id=project_id, details={"user_id": user_id}, ) return {"success": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/mcp/tools") async def list_mcp_tools( include_details: bool = Query(False, description="Include full tool schemas"), ): """List all available MCP tools via unified handler.""" from dss_mcp.handler import get_mcp_handler handler = get_mcp_handler() return handler.list_tools(include_details=include_details) @app.get("/api/mcp/tools/{tool_name}") async def get_mcp_tool_info(tool_name: str): """Get detailed information about a specific MCP tool.""" from dss_mcp.handler import get_mcp_handler handler = get_mcp_handler() info = handler.get_tool_info(tool_name) if not info: raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") return info class MCPToolExecuteRequest(BaseModel): """Request to execute an MCP tool.""" arguments: Dict[str, Any] project_id: str user_id: Optional[int] = 1 @app.post("/api/mcp/tools/{tool_name}/execute") async def execute_mcp_tool(tool_name: str, request: MCPToolExecuteRequest): """ Execute an MCP tool via unified handler. All tool executions go through the central MCPHandler which: - Validates tool existence - Checks integration configurations - Applies circuit breaker protection - Logs execution metrics """ from dss_mcp.handler import MCPContext, get_mcp_handler handler = get_mcp_handler() # Create execution context context = MCPContext(project_id=request.project_id, user_id=request.user_id) # Execute tool result = await handler.execute_tool( tool_name=tool_name, arguments=request.arguments, context=context ) # Log to activity ActivityLog.log( action="mcp_tool_executed", entity_type="tool", entity_id=tool_name, project_id=request.project_id, details={ "success": result.success, "duration_ms": result.duration_ms, "error": result.error, }, ) return result.to_dict() @app.get("/api/mcp/status") async def get_mcp_status(): """Get MCP server status and configuration.""" from dss_mcp.config import integration_config, mcp_config, validate_config warnings = validate_config() return { "server": { "host": mcp_config.HOST, "port": mcp_config.PORT, "encryption_enabled": bool(mcp_config.ENCRYPTION_KEY), "context_cache_ttl": mcp_config.CONTEXT_CACHE_TTL, }, "integrations": { "figma": bool(integration_config.FIGMA_TOKEN), "anthropic": bool(integration_config.ANTHROPIC_API_KEY), "jira_default": bool(integration_config.JIRA_URL), "confluence_default": bool(integration_config.CONFLUENCE_URL), }, "circuit_breaker": { "failure_threshold": mcp_config.CIRCUIT_BREAKER_FAILURE_THRESHOLD, "timeout_seconds": mcp_config.CIRCUIT_BREAKER_TIMEOUT_SECONDS, }, "warnings": warnings, } # === MVP1: Project Configuration & Sandboxed File System === @app.get("/api/projects/{project_id}/config") async def get_project_config(project_id: str): """Get project configuration from .dss/config.json.""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") config = config_service.get_config(root_path) return config.dict() @app.put("/api/projects/{project_id}/config") async def update_project_config(project_id: str, updates: Dict[str, Any]): """Update project configuration.""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") config = config_service.update_config(root_path, updates) return config.dict() @app.get("/api/projects/{project_id}/context") async def get_project_context(project_id: str): """ Get full project context for AI injection. Returns project info, config, file tree, and context file contents. """ project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") # Get config and sandboxed FS config = config_service.get_config(root_path) fs = SandboxedFS(root_path) # Load context files specified in config context_files = {} for file_path in config.ai.context_files: try: if fs.file_exists(file_path): content = fs.read_file(file_path, max_size_kb=config.ai.max_file_size_kb) context_files[file_path] = content[:2000] # Truncate for context except Exception: pass return { "project": {"id": project["id"], "name": project["name"], "root_path": root_path}, "config": config.dict(), "file_tree": fs.get_file_tree(max_depth=2), "context_files": context_files, } @app.get("/api/projects/{project_id}/files") async def list_project_files(project_id: str, path: str = "."): """List files in project directory (sandboxed).""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") try: fs = SandboxedFS(root_path) return fs.list_directory(path) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except NotADirectoryError as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/projects/{project_id}/files/tree") async def get_project_file_tree(project_id: str, max_depth: int = 3): """Get project file tree (sandboxed).""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") fs = SandboxedFS(root_path) return fs.get_file_tree(max_depth=min(max_depth, 5)) # Cap at 5 levels @app.get("/api/projects/{project_id}/files/read") async def read_project_file(project_id: str, path: str): """Read file content from project (sandboxed).""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") try: fs = SandboxedFS(root_path) content = fs.read_file(path) return {"path": path, "content": content} except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except FileNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) class FileWriteRequest(BaseModel): path: str content: str @app.post("/api/projects/{project_id}/files/write") async def write_project_file(project_id: str, request: FileWriteRequest): """Write file content to project (sandboxed).""" project = project_manager.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") root_path = project.get("root_path") if not root_path: raise HTTPException(status_code=400, detail="Project has no root_path configured") # Check if AI write operations are allowed config = config_service.get_config(root_path) if "write" not in config.ai.allowed_operations: raise HTTPException(status_code=403, detail="Write operations not allowed for this project") try: fs = SandboxedFS(root_path) fs.write_file(request.path, request.content) ActivityLog.log( action="file_written", entity_type="file", entity_id=request.path, project_id=project_id, details={"path": request.path, "size": len(request.content)}, ) return {"status": "ok", "path": request.path} except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) UI_DIR = Path(__file__).parent.parent.parent / "admin-ui" if UI_DIR.exists(): app.mount("/", StaticFiles(directory=str(UI_DIR), html=True), name="ui") def kill_port(port: int, wait: float = 0.5) -> None: """Kill any process using the specified port.""" import subprocess import time try: # Get PIDs using the port result = subprocess.run(["lsof", "-ti", f":{port}"], capture_output=True, text=True) pids = result.stdout.strip().split("\n") killed = False for pid in pids: if pid: subprocess.run(["kill", "-9", pid], capture_output=True) print(f"[DSS] Killed process {pid} on port {port}") killed = True if killed and wait: time.sleep(wait) # Wait for port to be released except Exception: pass # Port was free if __name__ == "__main__": import uvicorn # DSS Ports: API=6220, Admin=6221, MCP=6222, Storybook=6226 port = int(os.getenv("DSS_API_PORT", "6220")) host = os.getenv("HOST", "0.0.0.0") # Kill any existing process on the port (twice to handle respawning) kill_port(port, wait=1.0) kill_port(port, wait=0.5) url = f"http://{host}:{port}" print( f""" ╔═══════════════════════════════════════════════════════════════╗ ║ Design System Server (DSS) - Portable Server ║ ╠═══════════════════════════════════════════════════════════════╣ ║ Dashboard: {url + '/':^47}║ ║ API: {url + '/api':^47}║ ║ Docs: {url + '/docs':^47}║ ║ Environment: {config.server.env:^47}║ ║ Figma Mode: {figma_suite.mode:^47}║ ╚═══════════════════════════════════════════════════════════════╝ """ ) uvicorn.run("server:app", host=host, port=port, reload=config.server.env == "development")