Simplify code documentation, remove organism terminology

- Remove biological metaphors from docstrings (organism, sensory, genetic, nutrient, etc.)
- Simplify documentation to be minimal and structured for fast model parsing
- Complete SQLite to JSON storage migration (project_manager.py, json_store.py)
- Add Integrations and IntegrationHealth classes to json_store.py
- Add kill_port() function to server.py for port conflict handling
- All 33 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-10 11:02:00 -03:00
parent 842cce133c
commit d6c25cb4db
5 changed files with 365 additions and 455 deletions

View File

@@ -1,30 +1,16 @@
#!/usr/bin/env python3
"""
🧠 DSS MCP SERVER - Design System Organism Neural Interface
DSS MCP Server
The MCP server is how AI agents interface with the DSS organism through
the Model Context Protocol. It exposes the organism's neural pathways (API)
as tools that Claude and other AI agents can use to:
MCP (Model Context Protocol) interface for DSS. Exposes design system
operations as tools for AI agents.
- Perceive the organism's current state (health checks)
- Direct the organism's sensory organs (Figma perception)
- Control token ingestion and circulation (nutrient management)
- Analyze the organism's codebase (code intelligence)
- Generate Storybook documentation (organism presentation)
- Diagnose and fix health issues (debugging and fixing)
Think of MCP tools as the organism's neural interface - the way external
intelligence (AI agents) can communicate with and direct the organism.
The organism responds to 32 different MCP tools (neural commands):
- Status & Discovery (4 tools)
- Token Ingestion (7 tools)
- Analysis (11 tools)
- Storybook Generation (5 tools)
- Utilities (5 tools)
When an AI agent calls a tool, it's sending a command through the organism's
nervous system to activate specific organs and functions.
Tool Categories (32 tools):
- Status & Discovery (4): get_status, list_projects, create_project, get_project
- Token Ingestion (7): ingest_css, ingest_scss, ingest_tailwind, ingest_json, merge, export, validate
- Analysis (11): discover_project, analyze_react, find_inline_styles, find_patterns, etc.
- Storybook (5): scan, generate_story, generate_batch, theme, coverage
- Utilities (5): extract_tokens, extract_components, sync_tokens, etc.
"""
import os
@@ -75,38 +61,22 @@ mcp = FastMCP("dss-design-system", host=MCP_HOST, port=MCP_PORT)
@mcp.tool()
async def get_status() -> str:
"""
🏥 ORGANISM STATUS CHECK - Get complete vital signs report
Returns the DSS organism's vital signs:
- Is the organism conscious and responsive?
- What is its awareness level (statistics)?
- Can the organism perceive Figma (sensory organs working)?
- What version of the organism is this?
"""
"""Get DSS server status and statistics."""
stats = get_stats()
figma_configured = bool(FIGMA_TOKEN)
return json.dumps({
"organism_status": "🟢 Alive and conscious" if FIGMA_TOKEN else "🟡 Alive but blind",
"sensory_organs": {
"figma_eyes": "👁️ Perceiving" if figma_configured else "👁️ Closed"
},
"mode": "living in reality" if figma_configured else "living in imagination (mock mode)",
"organism_statistics": stats,
"version": "0.8.0",
"message": "The organism is ready to work" if figma_configured else "Configure Figma token to unlock visual perception"
"status": "ready",
"figma": "connected" if figma_configured else "mock mode",
"mode": "live" if figma_configured else "mock",
"statistics": stats,
"version": "0.8.0"
}, indent=2)
@mcp.tool()
async def list_projects() -> str:
"""
🏥 ORGANISM CONSCIOUSNESS - View all design system organisms
Lists all living design system organisms that have been born
(created) and are under observation.
"""
"""List all registered projects."""
projects = Projects.list_all()
return json.dumps([p.to_dict() for p in projects], indent=2)
@@ -114,20 +84,15 @@ async def list_projects() -> str:
@mcp.tool()
async def create_project(name: str, description: str = "", figma_file_key: str = "") -> str:
"""
🧬 ORGANISM GENESIS - Birth of a new design system organism
Creates a new design system organism - a living, breathing instance
that will ingest tokens, circulate nutrients, and grow over time.
Create a new design system project.
Args:
name: The organism's name (identity)
description: The organism's purpose and characteristics
figma_file_key: Link to the organism's visual genetic blueprint (Figma)
Returns: The newly born organism's vital information
name: Project name
description: Project description
figma_file_key: Figma file key for design source
"""
project = Projects.create(name, description, figma_file_key)
return json.dumps({"success": True, "organism_born": True, "project": project.to_dict()}, indent=2)
return json.dumps({"success": True, "project": project.to_dict()}, indent=2)
@mcp.tool()

View File

@@ -1,23 +1,19 @@
"""
🔌 DSS NERVOUS SYSTEM - FastAPI Server
DSS API Server
The nervous system is how the DSS component communicates with the external world.
This REST API serves as the component's neural pathways - transmitting signals
between external systems and the DSS internal organs.
REST API for design system operations.
Portal Endpoints (Synapses):
- 📊 Project management (CRUD operations)
- 👁️ Figma integration (sensory perception)
- 🏥 Health checks and diagnostics
- 📝 Activity tracking (component consciousness)
- ⚙️ Runtime configuration management
- 🔍 Service discovery (companion ecosystem)
Endpoints:
- Project management (CRUD)
- Figma integration (token extraction, component sync)
- Health checks
- Activity tracking
- Configuration management
- Service discovery
Operational Modes:
- **Server Mode** 🌐 - Deployed remotely, distributes tokens to teams
- **Local Mode** 🏠 - Dev companion, local service integration
Foundation: SQLite database (❤️ Heart) stores all component experiences
Modes:
- Server: Remote deployment, team distribution
- Local: Development companion
"""
# Load environment variables from .env file FIRST (before any other imports)
@@ -66,7 +62,8 @@ from browser_logger import router as browser_log_router
from config import config
from storage.json_store import (
Projects, Components, SyncHistory, ActivityLog, Teams, Cache, get_stats,
FigmaFiles, CodeMetrics, TestResults, TokenDrift, Tokens, Styles
FigmaFiles, CodeMetrics, TestResults, TokenDrift, Tokens, Styles,
Integrations, IntegrationHealth
)
from figma.figma_tools import FigmaToolSuite
@@ -173,11 +170,8 @@ ProjectManager.ensure_schema()
class ServiceDiscovery:
"""
🔌 SENSORY ORGAN PERCEPTION - Service discovery system
The sensory organs perceive companion services (Storybook, Chromatic, dev servers)
running in the ecosystem. This discovery mechanism helps the component understand
what external tools are available for integration.
Service discovery for companion services (Storybook, Chromatic, dev servers).
Checks known ports to discover running services.
"""
KNOWN_SERVICES = {
@@ -453,10 +447,10 @@ async def health():
},
"version": "0.8.0",
"timestamp": datetime.utcnow().isoformat() + "Z",
"organs": {
"heart": "💚 Beating normally" if db_ok else "🖤 Heart failure",
"brain": "🧠 Thinking clearly" if mcp_ok else "🧠 Brain fog",
"sensory_eyes": "👁️ Perceiving Figma" if config.figma.is_configured else "👁️ Eyes closed"
"services": {
"storage": "ok" if db_ok else "error",
"mcp": "ok" if mcp_ok else "error",
"figma": "connected" if config.figma.is_configured else "not configured"
}
}
@@ -821,32 +815,21 @@ async def list_components(project_id: str):
@app.post("/api/figma/extract-variables")
async def extract_variables(request: FigmaExtractRequest, background_tasks: BackgroundTasks):
"""
🩸 NUTRIENT EXTRACTION - Extract design tokens from Figma
The sensory organs perceive Figma designs, then the digestive system
breaks them down into nutrient particles (design tokens) that the
component can absorb and circulate through its body.
"""
"""Extract design tokens from Figma variables."""
try:
result = await figma_suite.extract_variables(request.file_key, request.format)
ActivityLog.log(
action="figma_extract_variables",
entity_type="figma",
details={"file_key": request.file_key, "format": request.format, "nutrient_count": result.get("tokens_count")}
details={"file_key": request.file_key, "format": request.format, "tokens_count": result.get("tokens_count")}
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"🛡️ IMMUNE ALERT: Nutrient extraction failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Token extraction failed: {str(e)}")
@app.post("/api/figma/extract-components")
async def extract_components(request: FigmaExtractRequest):
"""
🧬 GENETIC BLUEPRINT EXTRACTION - Extract component DNA from Figma
Components are the component's tissue structures. This extracts
the genetic blueprints (component definitions) from Figma.
"""
"""Extract component definitions from Figma."""
try:
result = await figma_suite.extract_components(request.file_key)
ActivityLog.log(
@@ -860,46 +843,30 @@ async def extract_components(request: FigmaExtractRequest):
@app.post("/api/figma/extract-styles")
async def extract_styles(request: FigmaExtractRequest):
"""
🎨 PHENOTYPE EXTRACTION - Extract visual styles from Figma
The component's appearance (styles) is extracted from Figma designs.
This feeds the skin (presentation) system.
"""
"""Extract style definitions from Figma."""
try:
result = await figma_suite.extract_styles(request.file_key)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"🛡️ IMMUNE ALERT: Style extraction failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Style extraction failed: {str(e)}")
@app.post("/api/figma/sync-tokens")
async def sync_tokens(request: FigmaSyncRequest):
"""
🩸 CIRCULATORY DISTRIBUTION - Sync tokens throughout the component
The circulatory system distributes nutrients (tokens) to all parts
of the component. This endpoint broadcasts extracted tokens to the
target output paths.
"""
"""Sync tokens from Figma to target file."""
try:
result = await figma_suite.sync_tokens(request.file_key, request.target_path, request.format)
ActivityLog.log(
action="figma_sync_tokens",
entity_type="figma",
details={"file_key": request.file_key, "target": request.target_path, "nutrients_distributed": result.get("tokens_synced")}
details={"file_key": request.file_key, "target": request.target_path, "tokens_synced": result.get("tokens_synced")}
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"🛡️ IMMUNE ALERT: Token circulation failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Token sync failed: {str(e)}")
@app.post("/api/figma/validate")
async def validate_components(request: FigmaExtractRequest):
"""
🛡️ GENETIC INTEGRITY VALIDATION - Check component health
The immune system examines components to ensure they follow
design system rules. Invalid components are quarantined.
"""
"""Validate component definitions against design system rules."""
try:
result = await figma_suite.validate_components(request.file_key)
return result
@@ -917,17 +884,12 @@ async def generate_code(file_key: str, component_name: str, framework: str = "we
@app.get("/api/figma/health")
async def figma_health():
"""
👁️ SENSORY ORGAN HEALTH CHECK - Figma perception status
The sensory organs (eyes) perceive visual designs from Figma.
This endpoint checks if the component's eyes can see external designs.
"""
"""Check Figma connection status."""
is_live = figma_suite.mode == 'live'
return {
"status": "ok" if is_live else "degraded",
"sensory_mode": figma_suite.mode,
"message": "👁️ Eyes perceiving Figma clearly" if is_live else "👁️ Eyes closed - running with imagination (mock mode). Configure Figma token to see reality."
"mode": figma_suite.mode,
"message": "Figma connected" if is_live else "Running in mock mode. Configure FIGMA_TOKEN for live API."
}
@@ -2587,38 +2549,21 @@ class IntegrationUpdate(BaseModel):
@app.get("/api/mcp/integrations")
async def list_all_integrations():
"""List all available integration types and their health status."""
from storage.database import get_connection
health_list = IntegrationHealth.list_all()
try:
with get_connection() as conn:
health_rows = conn.execute(
"SELECT * FROM integration_health ORDER BY integration_type"
).fetchall()
integrations = []
for row in health_rows:
integrations.append({
"integration_type": row["integration_type"],
"is_healthy": bool(row["is_healthy"]),
"failure_count": row["failure_count"],
"last_failure_at": row["last_failure_at"],
"last_success_at": row["last_success_at"],
"circuit_open_until": row["circuit_open_until"]
})
return {"integrations": integrations}
except Exception as e:
# Table may not exist yet
if not health_list:
# Return defaults if no health data exists
return {
"integrations": [
{"integration_type": "figma", "is_healthy": True, "failure_count": 0},
{"integration_type": "jira", "is_healthy": True, "failure_count": 0},
{"integration_type": "confluence", "is_healthy": True, "failure_count": 0},
{"integration_type": "sequential-thinking", "is_healthy": True, "failure_count": 0}
],
"note": "Integration tables not yet initialized"
]
}
return {"integrations": health_list}
@app.get("/api/projects/{project_id}/integrations")
async def list_project_integrations(
@@ -2629,34 +2574,8 @@ async def list_project_integrations(
if not Projects.get(project_id):
raise HTTPException(status_code=404, detail="Project not found")
from storage.database import get_connection
try:
with get_connection() as conn:
if user_id:
rows = conn.execute(
"""
SELECT id, integration_type, enabled, created_at, updated_at, last_used_at
FROM project_integrations
WHERE project_id = ? AND user_id = ?
ORDER BY integration_type
""",
(project_id, user_id)
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, user_id, integration_type, enabled, created_at, updated_at, last_used_at
FROM project_integrations
WHERE project_id = ?
ORDER BY integration_type
""",
(project_id,)
).fetchall()
return {"integrations": [dict(row) for row in rows]}
except Exception as e:
return {"integrations": [], "error": str(e)}
integrations = Integrations.list(project_id, user_id)
return {"integrations": integrations}
@app.post("/api/projects/{project_id}/integrations")
@@ -2669,7 +2588,6 @@ async def create_integration(
if not Projects.get(project_id):
raise HTTPException(status_code=404, detail="Project not found")
from storage.database import get_connection
from dss_mcp.config import mcp_config
# Encrypt config
@@ -2681,31 +2599,27 @@ async def create_integration(
encrypted_config = config_json # Store unencrypted if no key
try:
with get_connection() as conn:
# Upsert
conn.execute(
"""
INSERT INTO project_integrations (project_id, user_id, integration_type, config, enabled, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(project_id, user_id, integration_type)
DO UPDATE SET config = excluded.config, enabled = excluded.enabled, updated_at = CURRENT_TIMESTAMP
""",
(project_id, user_id, integration.integration_type, encrypted_config, integration.enabled)
)
Integrations.upsert(
project_id=project_id,
user_id=user_id,
integration_type=integration.integration_type,
config=encrypted_config,
enabled=integration.enabled
)
ActivityLog.log(
action="integration_configured",
entity_type="integration",
entity_id=integration.integration_type,
project_id=project_id,
details={"user_id": user_id, "enabled": integration.enabled}
)
ActivityLog.log(
action="integration_configured",
entity_type="integration",
entity_id=integration.integration_type,
project_id=project_id,
details={"user_id": user_id, "enabled": integration.enabled}
)
return {
"success": True,
"integration_type": integration.integration_type,
"enabled": integration.enabled
}
return {
"success": True,
"integration_type": integration.integration_type,
"enabled": integration.enabled
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -2721,44 +2635,35 @@ async def update_integration(
if not Projects.get(project_id):
raise HTTPException(status_code=404, detail="Project not found")
from storage.database import get_connection
from dss_mcp.config import mcp_config
try:
with get_connection() as conn:
updates = []
params = []
encrypted_config = None
if update.config is not None:
config_json = json.dumps(update.config)
cipher = mcp_config.get_cipher()
if cipher:
encrypted_config = cipher.encrypt(config_json.encode()).decode()
else:
encrypted_config = config_json
if update.config is not None:
config_json = json.dumps(update.config)
cipher = mcp_config.get_cipher()
if cipher:
encrypted_config = cipher.encrypt(config_json.encode()).decode()
else:
encrypted_config = config_json
updates.append("config = ?")
params.append(encrypted_config)
if update.config is None and update.enabled is None:
return {"success": False, "message": "No updates provided"}
if update.enabled is not None:
updates.append("enabled = ?")
params.append(update.enabled)
result = Integrations.update(
project_id=project_id,
user_id=user_id,
integration_type=integration_type,
config=encrypted_config,
enabled=update.enabled
)
if not updates:
return {"success": False, "message": "No updates provided"}
if not result:
raise HTTPException(status_code=404, detail="Integration not found")
updates.append("updated_at = CURRENT_TIMESTAMP")
params.extend([project_id, user_id, integration_type])
conn.execute(
f"""
UPDATE project_integrations
SET {', '.join(updates)}
WHERE project_id = ? AND user_id = ? AND integration_type = ?
""",
params
)
return {"success": True, "integration_type": integration_type}
return {"success": True, "integration_type": integration_type}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -2773,30 +2678,21 @@ async def delete_integration(
if not Projects.get(project_id):
raise HTTPException(status_code=404, detail="Project not found")
from storage.database import get_connection
try:
with get_connection() as conn:
result = conn.execute(
"""
DELETE FROM project_integrations
WHERE project_id = ? AND user_id = ? AND integration_type = ?
""",
(project_id, user_id, integration_type)
)
deleted = Integrations.delete(project_id, user_id, integration_type)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Integration not found")
if not deleted:
raise HTTPException(status_code=404, detail="Integration not found")
ActivityLog.log(
action="integration_deleted",
entity_type="integration",
entity_id=integration_type,
project_id=project_id,
details={"user_id": user_id}
)
ActivityLog.log(
action="integration_deleted",
entity_type="integration",
entity_id=integration_type,
project_id=project_id,
details={"user_id": user_id}
)
return {"success": True}
return {"success": True}
except HTTPException:
raise
except Exception as e:
@@ -3077,12 +2973,39 @@ if UI_DIR.exists():
app.mount("/", StaticFiles(directory=str(UI_DIR), html=True), name="ui")
def kill_port(port: int, wait: float = 0.5) -> None:
"""Kill any process using the specified port."""
import subprocess
import time
try:
# Get PIDs using the port
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True, text=True
)
pids = result.stdout.strip().split('\n')
killed = False
for pid in pids:
if pid:
subprocess.run(["kill", "-9", pid], capture_output=True)
print(f"[DSS] Killed process {pid} on port {port}")
killed = True
if killed and wait:
time.sleep(wait) # Wait for port to be released
except Exception:
pass # Port was free
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "3456"))
host = os.getenv("HOST", "0.0.0.0")
# Kill any existing process on the port (twice to handle respawning)
kill_port(port, wait=1.0)
kill_port(port, wait=0.5)
url = f"http://{host}:{port}"
print(f"""
╔═══════════════════════════════════════════════════════════════╗

View File

@@ -248,48 +248,14 @@ class ProjectManager:
def _update_root_path(self, project_id: str, root_path: str) -> None:
"""
Update root_path in database.
Uses raw SQL since the column may not be in the existing model.
Update root_path in JSON storage.
"""
from storage.database import get_connection
with get_connection() as conn:
# Ensure column exists
try:
conn.execute("""
ALTER TABLE projects ADD COLUMN root_path TEXT DEFAULT ''
""")
logger.info("Added root_path column to projects table")
except Exception:
# Column already exists
pass
# Update the value
conn.execute(
"UPDATE projects SET root_path = ? WHERE id = ?",
(root_path, project_id)
)
self.db.update(project_id, root_path=root_path)
@staticmethod
def ensure_schema():
"""
Ensure database schema has root_path column.
Call this on startup to migrate existing databases.
Legacy schema migration - no longer needed with JSON storage.
Kept for API compatibility.
"""
from storage.database import get_connection
with get_connection() as conn:
cursor = conn.cursor()
# Check if column exists
cursor.execute("PRAGMA table_info(projects)")
columns = [col[1] for col in cursor.fetchall()]
if 'root_path' not in columns:
cursor.execute("""
ALTER TABLE projects ADD COLUMN root_path TEXT DEFAULT ''
""")
logger.info("Migration: Added root_path column to projects table")
else:
logger.debug("Schema check: root_path column exists")
logger.debug("Schema check: Using JSON storage, no migration needed")