Migrated from design-system-swarm with fresh git history.
Old project history preserved in /home/overbits/apps/design-system-swarm
Core components:
- MCP Server (Python FastAPI with mcp 1.23.1)
- Claude Plugin (agents, commands, skills, strategies, hooks, core)
- DSS Backend (dss-mvp1 - token translation, Figma sync)
- Admin UI (Node.js/React)
- Server (Node.js/Express)
- Storybook integration (dss-mvp1/.storybook)
Self-contained configuration:
- All paths relative or use DSS_BASE_PATH=/home/overbits/dss
- PYTHONPATH configured for dss-mvp1 and dss-claude-plugin
- .env file with all configuration
- Claude plugin uses ${CLAUDE_PLUGIN_ROOT} for portability
Migration completed: $(date)
🤖 Clean migration with full functionality preserved
1333 lines
50 KiB
Python
1333 lines
50 KiB
Python
"""
|
|
Design System Server (DSS) - SQLite Storage Layer
|
|
|
|
High-efficiency local-first database for:
|
|
- Component definitions (relational)
|
|
- Sync history (time-series)
|
|
- Team/User RBAC
|
|
- Figma API cache (TTL-based)
|
|
|
|
Design tokens stored as flat JSON files for git-friendly diffs.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, List, Any
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass, asdict
|
|
|
|
# Database location
|
|
DB_DIR = Path(__file__).parent.parent.parent / ".dss"
|
|
DB_PATH = DB_DIR / "dss.db"
|
|
|
|
# Ensure directory exists
|
|
DB_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
@contextmanager
|
|
def get_connection():
|
|
"""Context manager for database connections with WAL mode for performance."""
|
|
conn = sqlite3.connect(DB_PATH, timeout=30.0)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for concurrency
|
|
conn.execute("PRAGMA synchronous=NORMAL") # Balance safety/speed
|
|
conn.execute("PRAGMA cache_size=-64000") # 64MB cache
|
|
conn.execute("PRAGMA temp_store=MEMORY") # Temp tables in memory
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_database():
|
|
"""Initialize all database tables."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# === Projects ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS projects (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
figma_file_key TEXT,
|
|
status TEXT DEFAULT 'active',
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# === Components ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS components (
|
|
id TEXT PRIMARY KEY,
|
|
project_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
figma_key TEXT,
|
|
description TEXT,
|
|
properties TEXT, -- JSON
|
|
variants TEXT, -- JSON array
|
|
code_generated INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_project ON components(project_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_name ON components(name)")
|
|
|
|
# === Styles ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS styles (
|
|
id TEXT PRIMARY KEY,
|
|
project_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
type TEXT NOT NULL, -- TEXT, FILL, EFFECT, GRID
|
|
figma_key TEXT,
|
|
properties TEXT, -- JSON
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_project ON styles(project_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_type ON styles(type)")
|
|
|
|
# === Tokens (metadata, actual values in JSON files) ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS token_collections (
|
|
id TEXT PRIMARY KEY,
|
|
project_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
file_path TEXT NOT NULL,
|
|
token_count INTEGER DEFAULT 0,
|
|
last_synced TEXT,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
|
|
# === Sync History (append-only, time-series) ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS sync_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
project_id TEXT NOT NULL,
|
|
sync_type TEXT NOT NULL, -- tokens, components, styles, full
|
|
status TEXT NOT NULL, -- success, failed, partial
|
|
items_synced INTEGER DEFAULT 0,
|
|
changes TEXT, -- JSON diff summary
|
|
error_message TEXT,
|
|
started_at TEXT NOT NULL,
|
|
completed_at TEXT,
|
|
duration_ms INTEGER,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_project ON sync_history(project_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_time ON sync_history(started_at DESC)")
|
|
|
|
# === Activity Log (Enhanced Audit Trail) ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS activity_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
project_id TEXT,
|
|
user_id TEXT,
|
|
user_name TEXT, -- Denormalized for faster display
|
|
team_context TEXT, -- ui, ux, qa, all
|
|
action TEXT NOT NULL, -- Created, Updated, Deleted, Extracted, Synced, etc.
|
|
entity_type TEXT, -- project, component, token, figma_file, etc.
|
|
entity_id TEXT,
|
|
entity_name TEXT, -- Denormalized for faster display
|
|
category TEXT, -- design_system, code, configuration, team
|
|
severity TEXT DEFAULT 'info', -- info, warning, critical
|
|
description TEXT, -- Human-readable description
|
|
details TEXT, -- JSON with full context
|
|
ip_address TEXT, -- For security audit
|
|
user_agent TEXT, -- Browser/client info
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_time ON activity_log(created_at DESC)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_project ON activity_log(project_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_user ON activity_log(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_action ON activity_log(action)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_category ON activity_log(category)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_entity ON activity_log(entity_type, entity_id)")
|
|
|
|
# === Teams ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS teams (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
settings TEXT, -- JSON
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# === Users ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id TEXT PRIMARY KEY,
|
|
email TEXT UNIQUE NOT NULL,
|
|
name TEXT,
|
|
avatar_url TEXT,
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
last_login TEXT
|
|
)
|
|
""")
|
|
|
|
# === Team Members (RBAC) ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS team_members (
|
|
team_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
role TEXT NOT NULL, -- SUPER_ADMIN, TEAM_LEAD, DEVELOPER, VIEWER
|
|
joined_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (team_id, user_id),
|
|
FOREIGN KEY (team_id) REFERENCES teams(id),
|
|
FOREIGN KEY (user_id) REFERENCES users(id)
|
|
)
|
|
""")
|
|
|
|
# === Project Team Access ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS project_access (
|
|
project_id TEXT NOT NULL,
|
|
team_id TEXT NOT NULL,
|
|
access_level TEXT DEFAULT 'read', -- read, write, admin
|
|
granted_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (project_id, team_id),
|
|
FOREIGN KEY (project_id) REFERENCES projects(id),
|
|
FOREIGN KEY (team_id) REFERENCES teams(id)
|
|
)
|
|
""")
|
|
|
|
# === Figma Cache (TTL-based) ===
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS figma_cache (
|
|
cache_key TEXT PRIMARY KEY,
|
|
value BLOB NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
expires_at INTEGER NOT NULL
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON figma_cache(expires_at)")
|
|
|
|
# === Team Dashboard Tables (Component-Centric Architecture) ===
|
|
|
|
# Figma Files (UX Dashboard) - Multiple Figma files per project
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS figma_files (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
project_id TEXT NOT NULL,
|
|
figma_url TEXT NOT NULL,
|
|
file_name TEXT NOT NULL,
|
|
file_key TEXT NOT NULL,
|
|
last_synced TEXT,
|
|
sync_status TEXT DEFAULT 'pending',
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_figma_files_project ON figma_files(project_id)")
|
|
|
|
# Component Tokens (UX Team View) - Which tokens does each component use?
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS component_tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
component_id TEXT NOT NULL,
|
|
token_name TEXT NOT NULL,
|
|
token_value TEXT NOT NULL,
|
|
source TEXT NOT NULL, -- figma, css, scss, tailwind, json, code
|
|
source_file TEXT,
|
|
figma_node_id TEXT,
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (component_id) REFERENCES components(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_component ON component_tokens(component_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_name ON component_tokens(token_name)")
|
|
|
|
# Code Metrics (UI Team View) - Implementation quality metrics
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS code_metrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
component_id TEXT NOT NULL,
|
|
file_path TEXT NOT NULL,
|
|
sloc INTEGER DEFAULT 0,
|
|
complexity_score REAL DEFAULT 0.0,
|
|
prop_count INTEGER DEFAULT 0,
|
|
has_tests INTEGER DEFAULT 0,
|
|
test_coverage REAL DEFAULT 0.0,
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (component_id) REFERENCES components(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_component ON code_metrics(component_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_file ON code_metrics(file_path)")
|
|
|
|
# Test Results (QA Team View) - Test execution results
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS test_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
component_id TEXT NOT NULL,
|
|
test_type TEXT NOT NULL, -- esre, regression, visual, unit
|
|
passed INTEGER NOT NULL,
|
|
score REAL,
|
|
failures TEXT, -- JSON array
|
|
run_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (component_id) REFERENCES components(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_component ON test_results(component_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_type ON test_results(test_type)")
|
|
|
|
# ESRE Definitions (QA Dashboard) - Natural language requirements
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS esre_definitions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
project_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
definition_text TEXT NOT NULL,
|
|
expected_value TEXT,
|
|
component_name TEXT,
|
|
status TEXT DEFAULT 'pending', -- pending, validated, failed
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_esre_project ON esre_definitions(project_id)")
|
|
|
|
# Implementation Snapshots - Track implementation state over time
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS implementation_snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
component_id TEXT NOT NULL,
|
|
snapshot_data TEXT NOT NULL, -- JSON with full implementation details
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (component_id) REFERENCES components(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_component ON implementation_snapshots(component_id)")
|
|
|
|
# Token Drift (UI Dashboard) - Hardcoded values that should use tokens
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS token_drift (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
component_id TEXT NOT NULL,
|
|
property_name TEXT NOT NULL,
|
|
hardcoded_value TEXT NOT NULL,
|
|
suggested_token TEXT,
|
|
severity TEXT NOT NULL, -- info, warning, error, critical
|
|
file_path TEXT NOT NULL,
|
|
line_number INTEGER NOT NULL,
|
|
status TEXT DEFAULT 'pending', -- pending, fixed, ignored
|
|
detected_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (component_id) REFERENCES components(id)
|
|
)
|
|
""")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_component ON token_drift(component_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_severity ON token_drift(severity)")
|
|
|
|
conn.commit()
|
|
print(f"[Storage] Database initialized at {DB_PATH}")
|
|
|
|
|
|
# === Cache Operations ===
|
|
|
|
class Cache:
|
|
"""TTL-based cache using SQLite."""
|
|
|
|
DEFAULT_TTL = 300 # 5 minutes
|
|
|
|
@staticmethod
|
|
def set(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None:
|
|
"""Store a value with TTL."""
|
|
now = int(time.time())
|
|
expires = now + ttl
|
|
data = json.dumps(value).encode() if not isinstance(value, bytes) else value
|
|
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO figma_cache (cache_key, value, created_at, expires_at) VALUES (?, ?, ?, ?)",
|
|
(key, data, now, expires)
|
|
)
|
|
|
|
@staticmethod
|
|
def get(key: str) -> Optional[Any]:
|
|
"""Get a value if not expired."""
|
|
now = int(time.time())
|
|
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT value FROM figma_cache WHERE cache_key = ? AND expires_at > ?",
|
|
(key, now)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
try:
|
|
return json.loads(row[0])
|
|
except (json.JSONDecodeError, TypeError):
|
|
return row[0]
|
|
return None
|
|
|
|
@staticmethod
|
|
def delete(key: str) -> None:
|
|
"""Delete a cache entry."""
|
|
with get_connection() as conn:
|
|
conn.execute("DELETE FROM figma_cache WHERE cache_key = ?", (key,))
|
|
|
|
@staticmethod
|
|
def clear_expired() -> int:
|
|
"""Remove all expired entries. Returns count deleted."""
|
|
now = int(time.time())
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM figma_cache WHERE expires_at <= ?", (now,))
|
|
return cursor.rowcount
|
|
|
|
@staticmethod
|
|
def clear_all() -> None:
|
|
"""Clear entire cache."""
|
|
with get_connection() as conn:
|
|
conn.execute("DELETE FROM figma_cache")
|
|
|
|
|
|
# === Project Operations ===
|
|
|
|
class Projects:
|
|
"""Project CRUD operations."""
|
|
|
|
@staticmethod
|
|
def create(id: str, name: str, description: str = "", figma_file_key: str = "") -> Dict:
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
"INSERT INTO projects (id, name, description, figma_file_key) VALUES (?, ?, ?, ?)",
|
|
(id, name, description, figma_file_key)
|
|
)
|
|
return Projects.get(id)
|
|
|
|
@staticmethod
|
|
def get(id: str) -> Optional[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM projects WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
@staticmethod
|
|
def list(status: str = None) -> List[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if status:
|
|
cursor.execute("SELECT * FROM projects WHERE status = ? ORDER BY updated_at DESC", (status,))
|
|
else:
|
|
cursor.execute("SELECT * FROM projects ORDER BY updated_at DESC")
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def update(id: str, **kwargs) -> Optional[Dict]:
|
|
if not kwargs:
|
|
return Projects.get(id)
|
|
|
|
fields = ", ".join(f"{k} = ?" for k in kwargs.keys())
|
|
values = list(kwargs.values()) + [id]
|
|
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
f"UPDATE projects SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
|
values
|
|
)
|
|
return Projects.get(id)
|
|
|
|
@staticmethod
|
|
def delete(id: str) -> bool:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM projects WHERE id = ?", (id,))
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
# === Component Operations ===
|
|
|
|
class Components:
|
|
"""Component CRUD operations."""
|
|
|
|
@staticmethod
|
|
def upsert(project_id: str, components: List[Dict]) -> int:
|
|
"""Bulk upsert components. Returns count."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
count = 0
|
|
for comp in components:
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO components
|
|
(id, project_id, name, figma_key, description, properties, variants, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""", (
|
|
comp.get('id') or f"{project_id}-{comp['name']}",
|
|
project_id,
|
|
comp['name'],
|
|
comp.get('figma_key') or comp.get('key'),
|
|
comp.get('description', ''),
|
|
json.dumps(comp.get('properties', {})),
|
|
json.dumps(comp.get('variants', []))
|
|
))
|
|
count += 1
|
|
return count
|
|
|
|
@staticmethod
|
|
def list(project_id: str) -> List[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM components WHERE project_id = ? ORDER BY name",
|
|
(project_id,)
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
comp = dict(row)
|
|
comp['properties'] = json.loads(comp['properties'] or '{}')
|
|
comp['variants'] = json.loads(comp['variants'] or '[]')
|
|
results.append(comp)
|
|
return results
|
|
|
|
@staticmethod
|
|
def get(id: str) -> Optional[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM components WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
comp = dict(row)
|
|
comp['properties'] = json.loads(comp['properties'] or '{}')
|
|
comp['variants'] = json.loads(comp['variants'] or '[]')
|
|
return comp
|
|
return None
|
|
|
|
|
|
# === Sync History ===
|
|
|
|
class SyncHistory:
|
|
"""Append-only sync history log."""
|
|
|
|
@staticmethod
|
|
def start(project_id: str, sync_type: str) -> int:
|
|
"""Start a sync, returns sync ID."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO sync_history (project_id, sync_type, status, started_at) VALUES (?, ?, 'running', ?)",
|
|
(project_id, sync_type, datetime.utcnow().isoformat())
|
|
)
|
|
return cursor.lastrowid
|
|
|
|
@staticmethod
|
|
def complete(sync_id: int, status: str, items_synced: int = 0, changes: Dict = None, error: str = None):
|
|
"""Complete a sync with results."""
|
|
started = None
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT started_at FROM sync_history WHERE id = ?", (sync_id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
started = datetime.fromisoformat(row[0])
|
|
|
|
completed = datetime.utcnow()
|
|
duration_ms = int((completed - started).total_seconds() * 1000) if started else 0
|
|
|
|
with get_connection() as conn:
|
|
conn.execute("""
|
|
UPDATE sync_history SET
|
|
status = ?, items_synced = ?, changes = ?, error_message = ?,
|
|
completed_at = ?, duration_ms = ?
|
|
WHERE id = ?
|
|
""", (
|
|
status, items_synced,
|
|
json.dumps(changes) if changes else None,
|
|
error,
|
|
completed.isoformat(), duration_ms,
|
|
sync_id
|
|
))
|
|
|
|
@staticmethod
|
|
def recent(project_id: str = None, limit: int = 20) -> List[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if project_id:
|
|
cursor.execute(
|
|
"SELECT * FROM sync_history WHERE project_id = ? ORDER BY started_at DESC LIMIT ?",
|
|
(project_id, limit)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?",
|
|
(limit,)
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
sync = dict(row)
|
|
sync['changes'] = json.loads(sync['changes']) if sync['changes'] else None
|
|
results.append(sync)
|
|
return results
|
|
|
|
|
|
# === Activity Log (Enhanced Audit System) ===
|
|
|
|
class ActivityLog:
|
|
"""Enhanced activity tracking for comprehensive audit trail."""
|
|
|
|
# Action categories for better organization
|
|
CATEGORIES = {
|
|
'design_system': ['extract_tokens', 'extract_components', 'sync_tokens', 'validate_tokens'],
|
|
'code': ['analyze_components', 'find_inline_styles', 'generate_code', 'get_quick_wins'],
|
|
'configuration': ['config_updated', 'figma_token_updated', 'mode_changed', 'service_configured'],
|
|
'project': ['project_created', 'project_updated', 'project_deleted'],
|
|
'team': ['team_context_changed', 'project_context_changed'],
|
|
'storybook': ['scan_storybook', 'generate_story', 'generate_theme']
|
|
}
|
|
|
|
@staticmethod
|
|
def log(action: str,
|
|
entity_type: str = None,
|
|
entity_id: str = None,
|
|
entity_name: str = None,
|
|
project_id: str = None,
|
|
user_id: str = None,
|
|
user_name: str = None,
|
|
team_context: str = None,
|
|
description: str = None,
|
|
category: str = None,
|
|
severity: str = 'info',
|
|
details: Dict = None,
|
|
ip_address: str = None,
|
|
user_agent: str = None):
|
|
"""
|
|
Log an activity with enhanced audit information.
|
|
|
|
Args:
|
|
action: Action performed (e.g., 'project_created', 'tokens_extracted')
|
|
entity_type: Type of entity affected (e.g., 'project', 'component')
|
|
entity_id: ID of the affected entity
|
|
entity_name: Human-readable name of the entity
|
|
project_id: Project context
|
|
user_id: User who performed the action
|
|
user_name: Human-readable user name
|
|
team_context: Team context (ui, ux, qa, all)
|
|
description: Human-readable description of the action
|
|
category: Category (design_system, code, configuration, etc.)
|
|
severity: info, warning, critical
|
|
details: Additional JSON details
|
|
ip_address: Client IP for security audit
|
|
user_agent: Browser/client information
|
|
"""
|
|
# Auto-detect category if not provided
|
|
if not category:
|
|
for cat, actions in ActivityLog.CATEGORIES.items():
|
|
if action in actions:
|
|
category = cat
|
|
break
|
|
if not category:
|
|
category = 'other'
|
|
|
|
# Generate description if not provided
|
|
if not description:
|
|
description = ActivityLog._generate_description(action, entity_type, entity_name, details)
|
|
|
|
with get_connection() as conn:
|
|
conn.execute("""
|
|
INSERT INTO activity_log (
|
|
project_id, user_id, user_name, team_context,
|
|
action, entity_type, entity_id, entity_name,
|
|
category, severity, description, details,
|
|
ip_address, user_agent
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
project_id, user_id, user_name, team_context,
|
|
action, entity_type, entity_id, entity_name,
|
|
category, severity, description,
|
|
json.dumps(details) if details else None,
|
|
ip_address, user_agent
|
|
))
|
|
|
|
@staticmethod
|
|
def _generate_description(action: str, entity_type: str, entity_name: str, details: Dict) -> str:
|
|
"""Generate human-readable description from action data."""
|
|
entity_str = f"{entity_type} '{entity_name}'" if entity_name else (entity_type or "item")
|
|
|
|
action_map = {
|
|
'project_created': f"Created project {entity_str}",
|
|
'project_updated': f"Updated {entity_str}",
|
|
'project_deleted': f"Deleted {entity_str}",
|
|
'extract_tokens': f"Extracted design tokens from Figma",
|
|
'extract_components': f"Extracted components from Figma",
|
|
'sync_tokens': f"Synced tokens to file",
|
|
'config_updated': "Updated configuration",
|
|
'figma_token_updated': "Updated Figma API token",
|
|
'team_context_changed': f"Switched to team context",
|
|
'project_context_changed': f"Switched to project {entity_name}",
|
|
}
|
|
|
|
return action_map.get(action, f"{action.replace('_', ' ').title()}")
|
|
|
|
@staticmethod
|
|
def recent(project_id: str = None, limit: int = 50, offset: int = 0) -> List[Dict]:
|
|
"""Get recent activity with pagination."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if project_id:
|
|
cursor.execute(
|
|
"SELECT * FROM activity_log WHERE project_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
|
(project_id, limit, offset)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT * FROM activity_log ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
|
(limit, offset)
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
activity = dict(row)
|
|
activity['details'] = json.loads(activity['details']) if activity['details'] else None
|
|
results.append(activity)
|
|
return results
|
|
|
|
@staticmethod
|
|
def search(
|
|
project_id: str = None,
|
|
user_id: str = None,
|
|
action: str = None,
|
|
category: str = None,
|
|
entity_type: str = None,
|
|
severity: str = None,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[Dict]:
|
|
"""Advanced search/filter for audit logs."""
|
|
conditions = []
|
|
params = []
|
|
|
|
if project_id:
|
|
conditions.append("project_id = ?")
|
|
params.append(project_id)
|
|
if user_id:
|
|
conditions.append("user_id = ?")
|
|
params.append(user_id)
|
|
if action:
|
|
conditions.append("action = ?")
|
|
params.append(action)
|
|
if category:
|
|
conditions.append("category = ?")
|
|
params.append(category)
|
|
if entity_type:
|
|
conditions.append("entity_type = ?")
|
|
params.append(entity_type)
|
|
if severity:
|
|
conditions.append("severity = ?")
|
|
params.append(severity)
|
|
if start_date:
|
|
conditions.append("created_at >= ?")
|
|
params.append(start_date)
|
|
if end_date:
|
|
conditions.append("created_at <= ?")
|
|
params.append(end_date)
|
|
|
|
where_clause = " AND ".join(conditions) if conditions else "1=1"
|
|
params.extend([limit, offset])
|
|
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
SELECT * FROM activity_log
|
|
WHERE {where_clause}
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
""", params)
|
|
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
activity = dict(row)
|
|
activity['details'] = json.loads(activity['details']) if activity['details'] else None
|
|
results.append(activity)
|
|
return results
|
|
|
|
@staticmethod
|
|
def count(
|
|
project_id: str = None,
|
|
user_id: str = None,
|
|
action: str = None,
|
|
category: str = None
|
|
) -> int:
|
|
"""Count activities matching filters."""
|
|
conditions = []
|
|
params = []
|
|
|
|
if project_id:
|
|
conditions.append("project_id = ?")
|
|
params.append(project_id)
|
|
if user_id:
|
|
conditions.append("user_id = ?")
|
|
params.append(user_id)
|
|
if action:
|
|
conditions.append("action = ?")
|
|
params.append(action)
|
|
if category:
|
|
conditions.append("category = ?")
|
|
params.append(category)
|
|
|
|
where_clause = " AND ".join(conditions) if conditions else "1=1"
|
|
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT COUNT(*) FROM activity_log WHERE {where_clause}", params)
|
|
return cursor.fetchone()[0]
|
|
|
|
@staticmethod
|
|
def get_categories() -> List[str]:
|
|
"""Get list of all categories used."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT DISTINCT category FROM activity_log WHERE category IS NOT NULL ORDER BY category")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def get_actions() -> List[str]:
|
|
"""Get list of all actions used."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT DISTINCT action FROM activity_log ORDER BY action")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def get_stats_by_category() -> Dict[str, int]:
|
|
"""Get activity count by category."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT category, COUNT(*) as count
|
|
FROM activity_log
|
|
GROUP BY category
|
|
ORDER BY count DESC
|
|
""")
|
|
return {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
@staticmethod
|
|
def get_stats_by_user() -> Dict[str, int]:
|
|
"""Get activity count by user."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT COALESCE(user_name, user_id, 'Unknown') as user, COUNT(*) as count
|
|
FROM activity_log
|
|
GROUP BY user_name, user_id
|
|
ORDER BY count DESC
|
|
""")
|
|
return {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
|
|
# === Team Dashboard Operations ===
|
|
|
|
class FigmaFiles:
|
|
"""Figma file management for UX Dashboard."""
|
|
|
|
@staticmethod
|
|
def create(project_id: str, figma_url: str, file_name: str, file_key: str) -> Dict:
|
|
"""Add a Figma file to a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO figma_files (project_id, figma_url, file_name, file_key)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (project_id, figma_url, file_name, file_key))
|
|
file_id = cursor.lastrowid
|
|
return FigmaFiles.get(file_id)
|
|
|
|
@staticmethod
|
|
def get(file_id: int) -> Optional[Dict]:
|
|
"""Get a specific Figma file."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM figma_files WHERE id = ?", (file_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
@staticmethod
|
|
def list(project_id: str) -> List[Dict]:
|
|
"""List all Figma files for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM figma_files WHERE project_id = ? ORDER BY created_at DESC",
|
|
(project_id,)
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def update_sync_status(file_id: int, status: str, last_synced: str = None) -> Dict:
|
|
"""Update sync status of a Figma file."""
|
|
with get_connection() as conn:
|
|
if last_synced:
|
|
conn.execute(
|
|
"UPDATE figma_files SET sync_status = ?, last_synced = ? WHERE id = ?",
|
|
(status, last_synced, file_id)
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE figma_files SET sync_status = ? WHERE id = ?",
|
|
(status, file_id)
|
|
)
|
|
return FigmaFiles.get(file_id)
|
|
|
|
@staticmethod
|
|
def delete(file_id: int) -> bool:
|
|
"""Delete a Figma file."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM figma_files WHERE id = ?", (file_id,))
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
class ESREDefinitions:
|
|
"""ESRE (Expected System Requirements Engineering) definitions for QA Dashboard."""
|
|
|
|
@staticmethod
|
|
def create(project_id: str, name: str, definition_text: str, expected_value: str = None, component_name: str = None) -> Dict:
|
|
"""Create a new ESRE definition."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO esre_definitions (project_id, name, definition_text, expected_value, component_name)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (project_id, name, definition_text, expected_value, component_name))
|
|
esre_id = cursor.lastrowid
|
|
return ESREDefinitions.get(esre_id)
|
|
|
|
@staticmethod
|
|
def get(esre_id: int) -> Optional[Dict]:
|
|
"""Get a specific ESRE definition."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM esre_definitions WHERE id = ?", (esre_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
@staticmethod
|
|
def list(project_id: str) -> List[Dict]:
|
|
"""List all ESRE definitions for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM esre_definitions WHERE project_id = ? ORDER BY created_at DESC",
|
|
(project_id,)
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def update(esre_id: int, **kwargs) -> Optional[Dict]:
|
|
"""Update an ESRE definition."""
|
|
if not kwargs:
|
|
return ESREDefinitions.get(esre_id)
|
|
|
|
fields = ", ".join(f"{k} = ?" for k in kwargs.keys())
|
|
values = list(kwargs.values()) + [esre_id]
|
|
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
f"UPDATE esre_definitions SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
|
values
|
|
)
|
|
return ESREDefinitions.get(esre_id)
|
|
|
|
@staticmethod
|
|
def delete(esre_id: int) -> bool:
|
|
"""Delete an ESRE definition."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM esre_definitions WHERE id = ?", (esre_id,))
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
class TokenDriftDetector:
|
|
"""Token drift tracking for UI Dashboard."""
|
|
|
|
@staticmethod
|
|
def record_drift(component_id: str, property_name: str, hardcoded_value: str,
|
|
file_path: str, line_number: int, severity: str = "warning",
|
|
suggested_token: str = None) -> Dict:
|
|
"""Record a token drift issue."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO token_drift (component_id, property_name, hardcoded_value,
|
|
suggested_token, severity, file_path, line_number)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (component_id, property_name, hardcoded_value, suggested_token,
|
|
severity, file_path, line_number))
|
|
drift_id = cursor.lastrowid
|
|
return TokenDriftDetector.get(drift_id)
|
|
|
|
@staticmethod
|
|
def get(drift_id: int) -> Optional[Dict]:
|
|
"""Get a specific drift entry."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM token_drift WHERE id = ?", (drift_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
@staticmethod
|
|
def list_by_component(component_id: str, status: str = None) -> List[Dict]:
|
|
"""List drift issues for a component."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if status:
|
|
cursor.execute(
|
|
"SELECT * FROM token_drift WHERE component_id = ? AND status = ? ORDER BY severity, detected_at DESC",
|
|
(component_id, status)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT * FROM token_drift WHERE component_id = ? ORDER BY severity, detected_at DESC",
|
|
(component_id,)
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def list_by_project(project_id: str, severity: str = None) -> List[Dict]:
|
|
"""List all drift issues for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
query = """
|
|
SELECT td.* FROM token_drift td
|
|
JOIN components c ON c.id = td.component_id
|
|
WHERE c.project_id = ?
|
|
"""
|
|
params = [project_id]
|
|
|
|
if severity:
|
|
query += " AND td.severity = ?"
|
|
params.append(severity)
|
|
|
|
query += " ORDER BY td.severity, td.detected_at DESC"
|
|
cursor.execute(query, params)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def update_status(drift_id: int, status: str) -> Dict:
|
|
"""Update the status of a drift issue (pending, fixed, ignored)."""
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
"UPDATE token_drift SET status = ? WHERE id = ?",
|
|
(status, drift_id)
|
|
)
|
|
return TokenDriftDetector.get(drift_id)
|
|
|
|
@staticmethod
|
|
def get_stats(project_id: str) -> Dict:
|
|
"""Get drift statistics for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Total drift count by severity
|
|
cursor.execute("""
|
|
SELECT td.severity, COUNT(*) as count
|
|
FROM token_drift td
|
|
JOIN components c ON c.id = td.component_id
|
|
WHERE c.project_id = ? AND td.status = 'pending'
|
|
GROUP BY td.severity
|
|
""", (project_id,))
|
|
by_severity = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Total count
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM token_drift td
|
|
JOIN components c ON c.id = td.component_id
|
|
WHERE c.project_id = ? AND td.status = 'pending'
|
|
""", (project_id,))
|
|
total = cursor.fetchone()[0]
|
|
|
|
return {
|
|
"total": total,
|
|
"by_severity": by_severity
|
|
}
|
|
|
|
|
|
class CodeMetrics:
|
|
"""Code metrics tracking for UI Dashboard."""
|
|
|
|
@staticmethod
|
|
def record_metrics(component_id: str, file_path: str, sloc: int = 0,
|
|
complexity_score: float = 0.0, prop_count: int = 0,
|
|
has_tests: bool = False, test_coverage: float = 0.0) -> Dict:
|
|
"""Record code metrics for a component."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO code_metrics
|
|
(component_id, file_path, sloc, complexity_score, prop_count, has_tests, test_coverage, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""", (component_id, file_path, sloc, complexity_score, prop_count,
|
|
1 if has_tests else 0, test_coverage))
|
|
metric_id = cursor.lastrowid
|
|
|
|
# Return the inserted/updated record
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM code_metrics WHERE id = ?", (metric_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
@staticmethod
|
|
def list_by_component(component_id: str) -> List[Dict]:
|
|
"""Get all metrics for a component."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM code_metrics WHERE component_id = ? ORDER BY updated_at DESC",
|
|
(component_id,)
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def get_project_summary(project_id: str) -> Dict:
|
|
"""Get aggregated code metrics for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(DISTINCT cm.component_id) as component_count,
|
|
SUM(cm.sloc) as total_sloc,
|
|
AVG(cm.complexity_score) as avg_complexity,
|
|
AVG(cm.test_coverage) as avg_coverage,
|
|
SUM(cm.has_tests) as components_with_tests
|
|
FROM code_metrics cm
|
|
JOIN components c ON c.id = cm.component_id
|
|
WHERE c.project_id = ?
|
|
""", (project_id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {
|
|
"component_count": row[0] or 0,
|
|
"total_sloc": row[1] or 0,
|
|
"avg_complexity": round(row[2] or 0.0, 2),
|
|
"avg_coverage": round(row[3] or 0.0, 2),
|
|
"components_with_tests": row[4] or 0
|
|
}
|
|
return {}
|
|
|
|
|
|
class TestResults:
|
|
"""Test result tracking for QA Dashboard."""
|
|
|
|
@staticmethod
|
|
def record_test(component_id: str, test_type: str, passed: bool,
|
|
score: float = None, failures: List[str] = None) -> Dict:
|
|
"""Record a test result."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO test_results (component_id, test_type, passed, score, failures)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (component_id, test_type, 1 if passed else 0, score,
|
|
json.dumps(failures) if failures else None))
|
|
result_id = cursor.lastrowid
|
|
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM test_results WHERE id = ?", (result_id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
result = dict(row)
|
|
result['failures'] = json.loads(result['failures']) if result['failures'] else []
|
|
return result
|
|
return None
|
|
|
|
@staticmethod
|
|
def list_by_component(component_id: str, test_type: str = None) -> List[Dict]:
|
|
"""Get test results for a component."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if test_type:
|
|
cursor.execute(
|
|
"SELECT * FROM test_results WHERE component_id = ? AND test_type = ? ORDER BY run_at DESC",
|
|
(component_id, test_type)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT * FROM test_results WHERE component_id = ? ORDER BY run_at DESC",
|
|
(component_id,)
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
result = dict(row)
|
|
result['failures'] = json.loads(result['failures']) if result['failures'] else []
|
|
results.append(result)
|
|
return results
|
|
|
|
@staticmethod
|
|
def get_project_summary(project_id: str) -> Dict:
|
|
"""Get test summary for a project."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(*) as total_tests,
|
|
SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_tests,
|
|
AVG(score) as avg_score
|
|
FROM test_results tr
|
|
JOIN components c ON c.id = tr.component_id
|
|
WHERE c.project_id = ?
|
|
""", (project_id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
total = row[0] or 0
|
|
passed = row[1] or 0
|
|
return {
|
|
"total_tests": total,
|
|
"passed_tests": passed,
|
|
"failed_tests": total - passed,
|
|
"pass_rate": round((passed / total * 100) if total > 0 else 0, 2),
|
|
"avg_score": round(row[2] or 0.0, 2)
|
|
}
|
|
return {}
|
|
|
|
|
|
# === Teams & RBAC ===
|
|
|
|
class Teams:
|
|
"""Team and role management."""
|
|
|
|
@staticmethod
|
|
def create(id: str, name: str, description: str = "") -> Dict:
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
"INSERT INTO teams (id, name, description) VALUES (?, ?, ?)",
|
|
(id, name, description)
|
|
)
|
|
return Teams.get(id)
|
|
|
|
@staticmethod
|
|
def get(id: str) -> Optional[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM teams WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
team = dict(row)
|
|
team['settings'] = json.loads(team['settings']) if team['settings'] else {}
|
|
return team
|
|
return None
|
|
|
|
@staticmethod
|
|
def list() -> List[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM teams ORDER BY name")
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def add_member(team_id: str, user_id: str, role: str):
|
|
with get_connection() as conn:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO team_members (team_id, user_id, role) VALUES (?, ?, ?)",
|
|
(team_id, user_id, role)
|
|
)
|
|
|
|
@staticmethod
|
|
def get_members(team_id: str) -> List[Dict]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT u.*, tm.role, tm.joined_at
|
|
FROM team_members tm
|
|
JOIN users u ON u.id = tm.user_id
|
|
WHERE tm.team_id = ?
|
|
ORDER BY tm.role, u.name
|
|
""", (team_id,))
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
@staticmethod
|
|
def get_user_role(team_id: str, user_id: str) -> Optional[str]:
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT role FROM team_members WHERE team_id = ? AND user_id = ?",
|
|
(team_id, user_id)
|
|
)
|
|
row = cursor.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
# === Database Stats ===
|
|
|
|
def get_stats() -> Dict:
|
|
"""Get database statistics."""
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Table counts
|
|
tables = ['projects', 'components', 'styles', 'sync_history', 'activity_log', 'teams', 'users', 'figma_cache']
|
|
for table in tables:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
stats[table] = cursor.fetchone()[0]
|
|
|
|
# Database file size
|
|
if DB_PATH.exists():
|
|
stats['db_size_mb'] = round(DB_PATH.stat().st_size / (1024 * 1024), 2)
|
|
|
|
# Cache stats
|
|
now = int(time.time())
|
|
cursor.execute("SELECT COUNT(*) FROM figma_cache WHERE expires_at > ?", (now,))
|
|
stats['cache_valid'] = cursor.fetchone()[0]
|
|
|
|
return stats
|
|
|
|
|
|
# Initialize on import
|
|
init_database()
|
|
|
|
|
|
# === CLI for testing ===
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "stats":
|
|
print(json.dumps(get_stats(), indent=2))
|
|
|
|
elif cmd == "init":
|
|
init_database()
|
|
print("Database initialized")
|
|
|
|
elif cmd == "cache-test":
|
|
Cache.set("test_key", {"foo": "bar"}, ttl=60)
|
|
print(f"Set: test_key")
|
|
print(f"Get: {Cache.get('test_key')}")
|
|
|
|
elif cmd == "clear-cache":
|
|
Cache.clear_all()
|
|
print("Cache cleared")
|
|
|
|
else:
|
|
print("Usage: python database.py [stats|init|cache-test|clear-cache]")
|
|
print(f"\nDatabase: {DB_PATH}")
|
|
print(f"Stats: {json.dumps(get_stats(), indent=2)}")
|