""" Design System Server (DSS) - SQLite Storage Layer High-efficiency local-first database for: - Component definitions (relational) - Sync history (time-series) - Team/User RBAC - Figma API cache (TTL-based) Design tokens stored as flat JSON files for git-friendly diffs. """ import sqlite3 import json import time import hashlib from pathlib import Path from datetime import datetime from typing import Optional, Dict, List, Any from contextlib import contextmanager from dataclasses import dataclass, asdict # Database location DB_DIR = Path(__file__).parent.parent.parent / ".dss" DB_PATH = DB_DIR / "dss.db" # Ensure directory exists DB_DIR.mkdir(parents=True, exist_ok=True) @contextmanager def get_connection(): """Context manager for database connections with WAL mode for performance.""" conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for concurrency conn.execute("PRAGMA synchronous=NORMAL") # Balance safety/speed conn.execute("PRAGMA cache_size=-64000") # 64MB cache conn.execute("PRAGMA temp_store=MEMORY") # Temp tables in memory try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_database(): """Initialize all database tables.""" with get_connection() as conn: cursor = conn.cursor() # === Projects === cursor.execute(""" CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, figma_file_key TEXT, status TEXT DEFAULT 'active', created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) # === Components === cursor.execute(""" CREATE TABLE IF NOT EXISTS components ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, name TEXT NOT NULL, figma_key TEXT, description TEXT, properties TEXT, -- JSON variants TEXT, -- JSON array code_generated INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_project ON components(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_name ON components(name)") # === Styles === cursor.execute(""" CREATE TABLE IF NOT EXISTS styles ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, name TEXT NOT NULL, type TEXT NOT NULL, -- TEXT, FILL, EFFECT, GRID figma_key TEXT, properties TEXT, -- JSON created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_project ON styles(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_type ON styles(type)") # === Tokens (metadata, actual values in JSON files) === cursor.execute(""" CREATE TABLE IF NOT EXISTS token_collections ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, name TEXT NOT NULL, file_path TEXT NOT NULL, token_count INTEGER DEFAULT 0, last_synced TEXT, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) # === Sync History (append-only, time-series) === cursor.execute(""" CREATE TABLE IF NOT EXISTS sync_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT NOT NULL, sync_type TEXT NOT NULL, -- tokens, components, styles, full status TEXT NOT NULL, -- success, failed, partial items_synced INTEGER DEFAULT 0, changes TEXT, -- JSON diff summary error_message TEXT, started_at TEXT NOT NULL, completed_at TEXT, duration_ms INTEGER, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_project ON sync_history(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_time ON sync_history(started_at DESC)") # === Activity Log (Enhanced Audit Trail) === cursor.execute(""" CREATE TABLE IF NOT EXISTS activity_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT, user_id TEXT, user_name TEXT, -- Denormalized for faster display team_context TEXT, -- ui, ux, qa, all action TEXT NOT NULL, -- Created, Updated, Deleted, Extracted, Synced, etc. entity_type TEXT, -- project, component, token, figma_file, etc. entity_id TEXT, entity_name TEXT, -- Denormalized for faster display category TEXT, -- design_system, code, configuration, team severity TEXT DEFAULT 'info', -- info, warning, critical description TEXT, -- Human-readable description details TEXT, -- JSON with full context ip_address TEXT, -- For security audit user_agent TEXT, -- Browser/client info created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_time ON activity_log(created_at DESC)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_project ON activity_log(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_user ON activity_log(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_action ON activity_log(action)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_category ON activity_log(category)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_entity ON activity_log(entity_type, entity_id)") # === Teams === cursor.execute(""" CREATE TABLE IF NOT EXISTS teams ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, settings TEXT, -- JSON created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) # === Users === cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, name TEXT, avatar_url TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, last_login TEXT ) """) # === Team Members (RBAC) === cursor.execute(""" CREATE TABLE IF NOT EXISTS team_members ( team_id TEXT NOT NULL, user_id TEXT NOT NULL, role TEXT NOT NULL, -- SUPER_ADMIN, TEAM_LEAD, DEVELOPER, VIEWER joined_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (team_id, user_id), FOREIGN KEY (team_id) REFERENCES teams(id), FOREIGN KEY (user_id) REFERENCES users(id) ) """) # === Project Team Access === cursor.execute(""" CREATE TABLE IF NOT EXISTS project_access ( project_id TEXT NOT NULL, team_id TEXT NOT NULL, access_level TEXT DEFAULT 'read', -- read, write, admin granted_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (project_id, team_id), FOREIGN KEY (project_id) REFERENCES projects(id), FOREIGN KEY (team_id) REFERENCES teams(id) ) """) # === Figma Cache (TTL-based) === cursor.execute(""" CREATE TABLE IF NOT EXISTS figma_cache ( cache_key TEXT PRIMARY KEY, value BLOB NOT NULL, created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON figma_cache(expires_at)") # === Team Dashboard Tables (Component-Centric Architecture) === # Figma Files (UX Dashboard) - Multiple Figma files per project cursor.execute(""" CREATE TABLE IF NOT EXISTS figma_files ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT NOT NULL, figma_url TEXT NOT NULL, file_name TEXT NOT NULL, file_key TEXT NOT NULL, last_synced TEXT, sync_status TEXT DEFAULT 'pending', created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_figma_files_project ON figma_files(project_id)") # Component Tokens (UX Team View) - Which tokens does each component use? cursor.execute(""" CREATE TABLE IF NOT EXISTS component_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL, token_name TEXT NOT NULL, token_value TEXT NOT NULL, source TEXT NOT NULL, -- figma, css, scss, tailwind, json, code source_file TEXT, figma_node_id TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (component_id) REFERENCES components(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_component ON component_tokens(component_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_name ON component_tokens(token_name)") # Code Metrics (UI Team View) - Implementation quality metrics cursor.execute(""" CREATE TABLE IF NOT EXISTS code_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL, file_path TEXT NOT NULL, sloc INTEGER DEFAULT 0, complexity_score REAL DEFAULT 0.0, prop_count INTEGER DEFAULT 0, has_tests INTEGER DEFAULT 0, test_coverage REAL DEFAULT 0.0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (component_id) REFERENCES components(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_component ON code_metrics(component_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_file ON code_metrics(file_path)") # Test Results (QA Team View) - Test execution results cursor.execute(""" CREATE TABLE IF NOT EXISTS test_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL, test_type TEXT NOT NULL, -- esre, regression, visual, unit passed INTEGER NOT NULL, score REAL, failures TEXT, -- JSON array run_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (component_id) REFERENCES components(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_component ON test_results(component_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_type ON test_results(test_type)") # ESRE Definitions (QA Dashboard) - Natural language requirements cursor.execute(""" CREATE TABLE IF NOT EXISTS esre_definitions ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT NOT NULL, name TEXT NOT NULL, definition_text TEXT NOT NULL, expected_value TEXT, component_name TEXT, status TEXT DEFAULT 'pending', -- pending, validated, failed created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_esre_project ON esre_definitions(project_id)") # Implementation Snapshots - Track implementation state over time cursor.execute(""" CREATE TABLE IF NOT EXISTS implementation_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL, snapshot_data TEXT NOT NULL, -- JSON with full implementation details created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (component_id) REFERENCES components(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_component ON implementation_snapshots(component_id)") # Token Drift (UI Dashboard) - Hardcoded values that should use tokens cursor.execute(""" CREATE TABLE IF NOT EXISTS token_drift ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL, property_name TEXT NOT NULL, hardcoded_value TEXT NOT NULL, suggested_token TEXT, severity TEXT NOT NULL, -- info, warning, error, critical file_path TEXT NOT NULL, line_number INTEGER NOT NULL, status TEXT DEFAULT 'pending', -- pending, fixed, ignored detected_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (component_id) REFERENCES components(id) ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_component ON token_drift(component_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_severity ON token_drift(severity)") conn.commit() print(f"[Storage] Database initialized at {DB_PATH}") # === Cache Operations === class Cache: """TTL-based cache using SQLite.""" DEFAULT_TTL = 300 # 5 minutes @staticmethod def set(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None: """Store a value with TTL.""" now = int(time.time()) expires = now + ttl data = json.dumps(value).encode() if not isinstance(value, bytes) else value with get_connection() as conn: conn.execute( "INSERT OR REPLACE INTO figma_cache (cache_key, value, created_at, expires_at) VALUES (?, ?, ?, ?)", (key, data, now, expires) ) @staticmethod def get(key: str) -> Optional[Any]: """Get a value if not expired.""" now = int(time.time()) with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT value FROM figma_cache WHERE cache_key = ? AND expires_at > ?", (key, now) ) row = cursor.fetchone() if row: try: return json.loads(row[0]) except (json.JSONDecodeError, TypeError): return row[0] return None @staticmethod def delete(key: str) -> None: """Delete a cache entry.""" with get_connection() as conn: conn.execute("DELETE FROM figma_cache WHERE cache_key = ?", (key,)) @staticmethod def clear_expired() -> int: """Remove all expired entries. Returns count deleted.""" now = int(time.time()) with get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM figma_cache WHERE expires_at <= ?", (now,)) return cursor.rowcount @staticmethod def clear_all() -> None: """Clear entire cache.""" with get_connection() as conn: conn.execute("DELETE FROM figma_cache") # === Project Operations === class Projects: """Project CRUD operations.""" @staticmethod def create(id: str, name: str, description: str = "", figma_file_key: str = "") -> Dict: with get_connection() as conn: conn.execute( "INSERT INTO projects (id, name, description, figma_file_key) VALUES (?, ?, ?, ?)", (id, name, description, figma_file_key) ) return Projects.get(id) @staticmethod def get(id: str) -> Optional[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM projects WHERE id = ?", (id,)) row = cursor.fetchone() return dict(row) if row else None @staticmethod def list(status: str = None) -> List[Dict]: with get_connection() as conn: cursor = conn.cursor() if status: cursor.execute("SELECT * FROM projects WHERE status = ? ORDER BY updated_at DESC", (status,)) else: cursor.execute("SELECT * FROM projects ORDER BY updated_at DESC") return [dict(row) for row in cursor.fetchall()] @staticmethod def update(id: str, **kwargs) -> Optional[Dict]: if not kwargs: return Projects.get(id) fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) values = list(kwargs.values()) + [id] with get_connection() as conn: conn.execute( f"UPDATE projects SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", values ) return Projects.get(id) @staticmethod def delete(id: str) -> bool: with get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM projects WHERE id = ?", (id,)) return cursor.rowcount > 0 # === Component Operations === class Components: """Component CRUD operations.""" @staticmethod def upsert(project_id: str, components: List[Dict]) -> int: """Bulk upsert components. Returns count.""" with get_connection() as conn: cursor = conn.cursor() count = 0 for comp in components: cursor.execute(""" INSERT OR REPLACE INTO components (id, project_id, name, figma_key, description, properties, variants, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) """, ( comp.get('id') or f"{project_id}-{comp['name']}", project_id, comp['name'], comp.get('figma_key') or comp.get('key'), comp.get('description', ''), json.dumps(comp.get('properties', {})), json.dumps(comp.get('variants', [])) )) count += 1 return count @staticmethod def list(project_id: str) -> List[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM components WHERE project_id = ? ORDER BY name", (project_id,) ) results = [] for row in cursor.fetchall(): comp = dict(row) comp['properties'] = json.loads(comp['properties'] or '{}') comp['variants'] = json.loads(comp['variants'] or '[]') results.append(comp) return results @staticmethod def get(id: str) -> Optional[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM components WHERE id = ?", (id,)) row = cursor.fetchone() if row: comp = dict(row) comp['properties'] = json.loads(comp['properties'] or '{}') comp['variants'] = json.loads(comp['variants'] or '[]') return comp return None # === Sync History === class SyncHistory: """Append-only sync history log.""" @staticmethod def start(project_id: str, sync_type: str) -> int: """Start a sync, returns sync ID.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO sync_history (project_id, sync_type, status, started_at) VALUES (?, ?, 'running', ?)", (project_id, sync_type, datetime.utcnow().isoformat()) ) return cursor.lastrowid @staticmethod def complete(sync_id: int, status: str, items_synced: int = 0, changes: Dict = None, error: str = None): """Complete a sync with results.""" started = None with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT started_at FROM sync_history WHERE id = ?", (sync_id,)) row = cursor.fetchone() if row: started = datetime.fromisoformat(row[0]) completed = datetime.utcnow() duration_ms = int((completed - started).total_seconds() * 1000) if started else 0 with get_connection() as conn: conn.execute(""" UPDATE sync_history SET status = ?, items_synced = ?, changes = ?, error_message = ?, completed_at = ?, duration_ms = ? WHERE id = ? """, ( status, items_synced, json.dumps(changes) if changes else None, error, completed.isoformat(), duration_ms, sync_id )) @staticmethod def recent(project_id: str = None, limit: int = 20) -> List[Dict]: with get_connection() as conn: cursor = conn.cursor() if project_id: cursor.execute( "SELECT * FROM sync_history WHERE project_id = ? ORDER BY started_at DESC LIMIT ?", (project_id, limit) ) else: cursor.execute( "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?", (limit,) ) results = [] for row in cursor.fetchall(): sync = dict(row) sync['changes'] = json.loads(sync['changes']) if sync['changes'] else None results.append(sync) return results # === Activity Log (Enhanced Audit System) === class ActivityLog: """Enhanced activity tracking for comprehensive audit trail.""" # Action categories for better organization CATEGORIES = { 'design_system': ['extract_tokens', 'extract_components', 'sync_tokens', 'validate_tokens'], 'code': ['analyze_components', 'find_inline_styles', 'generate_code', 'get_quick_wins'], 'configuration': ['config_updated', 'figma_token_updated', 'mode_changed', 'service_configured'], 'project': ['project_created', 'project_updated', 'project_deleted'], 'team': ['team_context_changed', 'project_context_changed'], 'storybook': ['scan_storybook', 'generate_story', 'generate_theme'] } @staticmethod def log(action: str, entity_type: str = None, entity_id: str = None, entity_name: str = None, project_id: str = None, user_id: str = None, user_name: str = None, team_context: str = None, description: str = None, category: str = None, severity: str = 'info', details: Dict = None, ip_address: str = None, user_agent: str = None): """ Log an activity with enhanced audit information. Args: action: Action performed (e.g., 'project_created', 'tokens_extracted') entity_type: Type of entity affected (e.g., 'project', 'component') entity_id: ID of the affected entity entity_name: Human-readable name of the entity project_id: Project context user_id: User who performed the action user_name: Human-readable user name team_context: Team context (ui, ux, qa, all) description: Human-readable description of the action category: Category (design_system, code, configuration, etc.) severity: info, warning, critical details: Additional JSON details ip_address: Client IP for security audit user_agent: Browser/client information """ # Auto-detect category if not provided if not category: for cat, actions in ActivityLog.CATEGORIES.items(): if action in actions: category = cat break if not category: category = 'other' # Generate description if not provided if not description: description = ActivityLog._generate_description(action, entity_type, entity_name, details) with get_connection() as conn: conn.execute(""" INSERT INTO activity_log ( project_id, user_id, user_name, team_context, action, entity_type, entity_id, entity_name, category, severity, description, details, ip_address, user_agent ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( project_id, user_id, user_name, team_context, action, entity_type, entity_id, entity_name, category, severity, description, json.dumps(details) if details else None, ip_address, user_agent )) @staticmethod def _generate_description(action: str, entity_type: str, entity_name: str, details: Dict) -> str: """Generate human-readable description from action data.""" entity_str = f"{entity_type} '{entity_name}'" if entity_name else (entity_type or "item") action_map = { 'project_created': f"Created project {entity_str}", 'project_updated': f"Updated {entity_str}", 'project_deleted': f"Deleted {entity_str}", 'extract_tokens': f"Extracted design tokens from Figma", 'extract_components': f"Extracted components from Figma", 'sync_tokens': f"Synced tokens to file", 'config_updated': "Updated configuration", 'figma_token_updated': "Updated Figma API token", 'team_context_changed': f"Switched to team context", 'project_context_changed': f"Switched to project {entity_name}", } return action_map.get(action, f"{action.replace('_', ' ').title()}") @staticmethod def recent(project_id: str = None, limit: int = 50, offset: int = 0) -> List[Dict]: """Get recent activity with pagination.""" with get_connection() as conn: cursor = conn.cursor() if project_id: cursor.execute( "SELECT * FROM activity_log WHERE project_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", (project_id, limit, offset) ) else: cursor.execute( "SELECT * FROM activity_log ORDER BY created_at DESC LIMIT ? OFFSET ?", (limit, offset) ) results = [] for row in cursor.fetchall(): activity = dict(row) activity['details'] = json.loads(activity['details']) if activity['details'] else None results.append(activity) return results @staticmethod def search( project_id: str = None, user_id: str = None, action: str = None, category: str = None, entity_type: str = None, severity: str = None, start_date: str = None, end_date: str = None, limit: int = 100, offset: int = 0 ) -> List[Dict]: """Advanced search/filter for audit logs.""" conditions = [] params = [] if project_id: conditions.append("project_id = ?") params.append(project_id) if user_id: conditions.append("user_id = ?") params.append(user_id) if action: conditions.append("action = ?") params.append(action) if category: conditions.append("category = ?") params.append(category) if entity_type: conditions.append("entity_type = ?") params.append(entity_type) if severity: conditions.append("severity = ?") params.append(severity) if start_date: conditions.append("created_at >= ?") params.append(start_date) if end_date: conditions.append("created_at <= ?") params.append(end_date) where_clause = " AND ".join(conditions) if conditions else "1=1" params.extend([limit, offset]) with get_connection() as conn: cursor = conn.cursor() cursor.execute(f""" SELECT * FROM activity_log WHERE {where_clause} ORDER BY created_at DESC LIMIT ? OFFSET ? """, params) results = [] for row in cursor.fetchall(): activity = dict(row) activity['details'] = json.loads(activity['details']) if activity['details'] else None results.append(activity) return results @staticmethod def count( project_id: str = None, user_id: str = None, action: str = None, category: str = None ) -> int: """Count activities matching filters.""" conditions = [] params = [] if project_id: conditions.append("project_id = ?") params.append(project_id) if user_id: conditions.append("user_id = ?") params.append(user_id) if action: conditions.append("action = ?") params.append(action) if category: conditions.append("category = ?") params.append(category) where_clause = " AND ".join(conditions) if conditions else "1=1" with get_connection() as conn: cursor = conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM activity_log WHERE {where_clause}", params) return cursor.fetchone()[0] @staticmethod def get_categories() -> List[str]: """Get list of all categories used.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT DISTINCT category FROM activity_log WHERE category IS NOT NULL ORDER BY category") return [row[0] for row in cursor.fetchall()] @staticmethod def get_actions() -> List[str]: """Get list of all actions used.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT DISTINCT action FROM activity_log ORDER BY action") return [row[0] for row in cursor.fetchall()] @staticmethod def get_stats_by_category() -> Dict[str, int]: """Get activity count by category.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT category, COUNT(*) as count FROM activity_log GROUP BY category ORDER BY count DESC """) return {row[0]: row[1] for row in cursor.fetchall()} @staticmethod def get_stats_by_user() -> Dict[str, int]: """Get activity count by user.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT COALESCE(user_name, user_id, 'Unknown') as user, COUNT(*) as count FROM activity_log GROUP BY user_name, user_id ORDER BY count DESC """) return {row[0]: row[1] for row in cursor.fetchall()} # === Team Dashboard Operations === class FigmaFiles: """Figma file management for UX Dashboard.""" @staticmethod def create(project_id: str, figma_url: str, file_name: str, file_key: str) -> Dict: """Add a Figma file to a project.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO figma_files (project_id, figma_url, file_name, file_key) VALUES (?, ?, ?, ?) """, (project_id, figma_url, file_name, file_key)) file_id = cursor.lastrowid return FigmaFiles.get(file_id) @staticmethod def get(file_id: int) -> Optional[Dict]: """Get a specific Figma file.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM figma_files WHERE id = ?", (file_id,)) row = cursor.fetchone() return dict(row) if row else None @staticmethod def list(project_id: str) -> List[Dict]: """List all Figma files for a project.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM figma_files WHERE project_id = ? ORDER BY created_at DESC", (project_id,) ) return [dict(row) for row in cursor.fetchall()] @staticmethod def update_sync_status(file_id: int, status: str, last_synced: str = None) -> Dict: """Update sync status of a Figma file.""" with get_connection() as conn: if last_synced: conn.execute( "UPDATE figma_files SET sync_status = ?, last_synced = ? WHERE id = ?", (status, last_synced, file_id) ) else: conn.execute( "UPDATE figma_files SET sync_status = ? WHERE id = ?", (status, file_id) ) return FigmaFiles.get(file_id) @staticmethod def delete(file_id: int) -> bool: """Delete a Figma file.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM figma_files WHERE id = ?", (file_id,)) return cursor.rowcount > 0 class ESREDefinitions: """ESRE (Expected System Requirements Engineering) definitions for QA Dashboard.""" @staticmethod def create(project_id: str, name: str, definition_text: str, expected_value: str = None, component_name: str = None) -> Dict: """Create a new ESRE definition.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO esre_definitions (project_id, name, definition_text, expected_value, component_name) VALUES (?, ?, ?, ?, ?) """, (project_id, name, definition_text, expected_value, component_name)) esre_id = cursor.lastrowid return ESREDefinitions.get(esre_id) @staticmethod def get(esre_id: int) -> Optional[Dict]: """Get a specific ESRE definition.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM esre_definitions WHERE id = ?", (esre_id,)) row = cursor.fetchone() return dict(row) if row else None @staticmethod def list(project_id: str) -> List[Dict]: """List all ESRE definitions for a project.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM esre_definitions WHERE project_id = ? ORDER BY created_at DESC", (project_id,) ) return [dict(row) for row in cursor.fetchall()] @staticmethod def update(esre_id: int, **kwargs) -> Optional[Dict]: """Update an ESRE definition.""" if not kwargs: return ESREDefinitions.get(esre_id) fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) values = list(kwargs.values()) + [esre_id] with get_connection() as conn: conn.execute( f"UPDATE esre_definitions SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", values ) return ESREDefinitions.get(esre_id) @staticmethod def delete(esre_id: int) -> bool: """Delete an ESRE definition.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM esre_definitions WHERE id = ?", (esre_id,)) return cursor.rowcount > 0 class TokenDriftDetector: """Token drift tracking for UI Dashboard.""" @staticmethod def record_drift(component_id: str, property_name: str, hardcoded_value: str, file_path: str, line_number: int, severity: str = "warning", suggested_token: str = None) -> Dict: """Record a token drift issue.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO token_drift (component_id, property_name, hardcoded_value, suggested_token, severity, file_path, line_number) VALUES (?, ?, ?, ?, ?, ?, ?) """, (component_id, property_name, hardcoded_value, suggested_token, severity, file_path, line_number)) drift_id = cursor.lastrowid return TokenDriftDetector.get(drift_id) @staticmethod def get(drift_id: int) -> Optional[Dict]: """Get a specific drift entry.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM token_drift WHERE id = ?", (drift_id,)) row = cursor.fetchone() return dict(row) if row else None @staticmethod def list_by_component(component_id: str, status: str = None) -> List[Dict]: """List drift issues for a component.""" with get_connection() as conn: cursor = conn.cursor() if status: cursor.execute( "SELECT * FROM token_drift WHERE component_id = ? AND status = ? ORDER BY severity, detected_at DESC", (component_id, status) ) else: cursor.execute( "SELECT * FROM token_drift WHERE component_id = ? ORDER BY severity, detected_at DESC", (component_id,) ) return [dict(row) for row in cursor.fetchall()] @staticmethod def list_by_project(project_id: str, severity: str = None) -> List[Dict]: """List all drift issues for a project.""" with get_connection() as conn: cursor = conn.cursor() query = """ SELECT td.* FROM token_drift td JOIN components c ON c.id = td.component_id WHERE c.project_id = ? """ params = [project_id] if severity: query += " AND td.severity = ?" params.append(severity) query += " ORDER BY td.severity, td.detected_at DESC" cursor.execute(query, params) return [dict(row) for row in cursor.fetchall()] @staticmethod def update_status(drift_id: int, status: str) -> Dict: """Update the status of a drift issue (pending, fixed, ignored).""" with get_connection() as conn: conn.execute( "UPDATE token_drift SET status = ? WHERE id = ?", (status, drift_id) ) return TokenDriftDetector.get(drift_id) @staticmethod def get_stats(project_id: str) -> Dict: """Get drift statistics for a project.""" with get_connection() as conn: cursor = conn.cursor() # Total drift count by severity cursor.execute(""" SELECT td.severity, COUNT(*) as count FROM token_drift td JOIN components c ON c.id = td.component_id WHERE c.project_id = ? AND td.status = 'pending' GROUP BY td.severity """, (project_id,)) by_severity = {row[0]: row[1] for row in cursor.fetchall()} # Total count cursor.execute(""" SELECT COUNT(*) FROM token_drift td JOIN components c ON c.id = td.component_id WHERE c.project_id = ? AND td.status = 'pending' """, (project_id,)) total = cursor.fetchone()[0] return { "total": total, "by_severity": by_severity } class CodeMetrics: """Code metrics tracking for UI Dashboard.""" @staticmethod def record_metrics(component_id: str, file_path: str, sloc: int = 0, complexity_score: float = 0.0, prop_count: int = 0, has_tests: bool = False, test_coverage: float = 0.0) -> Dict: """Record code metrics for a component.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO code_metrics (component_id, file_path, sloc, complexity_score, prop_count, has_tests, test_coverage, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) """, (component_id, file_path, sloc, complexity_score, prop_count, 1 if has_tests else 0, test_coverage)) metric_id = cursor.lastrowid # Return the inserted/updated record with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM code_metrics WHERE id = ?", (metric_id,)) row = cursor.fetchone() return dict(row) if row else None @staticmethod def list_by_component(component_id: str) -> List[Dict]: """Get all metrics for a component.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM code_metrics WHERE component_id = ? ORDER BY updated_at DESC", (component_id,) ) return [dict(row) for row in cursor.fetchall()] @staticmethod def get_project_summary(project_id: str) -> Dict: """Get aggregated code metrics for a project.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(DISTINCT cm.component_id) as component_count, SUM(cm.sloc) as total_sloc, AVG(cm.complexity_score) as avg_complexity, AVG(cm.test_coverage) as avg_coverage, SUM(cm.has_tests) as components_with_tests FROM code_metrics cm JOIN components c ON c.id = cm.component_id WHERE c.project_id = ? """, (project_id,)) row = cursor.fetchone() if row: return { "component_count": row[0] or 0, "total_sloc": row[1] or 0, "avg_complexity": round(row[2] or 0.0, 2), "avg_coverage": round(row[3] or 0.0, 2), "components_with_tests": row[4] or 0 } return {} class TestResults: """Test result tracking for QA Dashboard.""" @staticmethod def record_test(component_id: str, test_type: str, passed: bool, score: float = None, failures: List[str] = None) -> Dict: """Record a test result.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO test_results (component_id, test_type, passed, score, failures) VALUES (?, ?, ?, ?, ?) """, (component_id, test_type, 1 if passed else 0, score, json.dumps(failures) if failures else None)) result_id = cursor.lastrowid with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM test_results WHERE id = ?", (result_id,)) row = cursor.fetchone() if row: result = dict(row) result['failures'] = json.loads(result['failures']) if result['failures'] else [] return result return None @staticmethod def list_by_component(component_id: str, test_type: str = None) -> List[Dict]: """Get test results for a component.""" with get_connection() as conn: cursor = conn.cursor() if test_type: cursor.execute( "SELECT * FROM test_results WHERE component_id = ? AND test_type = ? ORDER BY run_at DESC", (component_id, test_type) ) else: cursor.execute( "SELECT * FROM test_results WHERE component_id = ? ORDER BY run_at DESC", (component_id,) ) results = [] for row in cursor.fetchall(): result = dict(row) result['failures'] = json.loads(result['failures']) if result['failures'] else [] results.append(result) return results @staticmethod def get_project_summary(project_id: str) -> Dict: """Get test summary for a project.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) as total_tests, SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_tests, AVG(score) as avg_score FROM test_results tr JOIN components c ON c.id = tr.component_id WHERE c.project_id = ? """, (project_id,)) row = cursor.fetchone() if row: total = row[0] or 0 passed = row[1] or 0 return { "total_tests": total, "passed_tests": passed, "failed_tests": total - passed, "pass_rate": round((passed / total * 100) if total > 0 else 0, 2), "avg_score": round(row[2] or 0.0, 2) } return {} # === Teams & RBAC === class Teams: """Team and role management.""" @staticmethod def create(id: str, name: str, description: str = "") -> Dict: with get_connection() as conn: conn.execute( "INSERT INTO teams (id, name, description) VALUES (?, ?, ?)", (id, name, description) ) return Teams.get(id) @staticmethod def get(id: str) -> Optional[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM teams WHERE id = ?", (id,)) row = cursor.fetchone() if row: team = dict(row) team['settings'] = json.loads(team['settings']) if team['settings'] else {} return team return None @staticmethod def list() -> List[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM teams ORDER BY name") return [dict(row) for row in cursor.fetchall()] @staticmethod def add_member(team_id: str, user_id: str, role: str): with get_connection() as conn: conn.execute( "INSERT OR REPLACE INTO team_members (team_id, user_id, role) VALUES (?, ?, ?)", (team_id, user_id, role) ) @staticmethod def get_members(team_id: str) -> List[Dict]: with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT u.*, tm.role, tm.joined_at FROM team_members tm JOIN users u ON u.id = tm.user_id WHERE tm.team_id = ? ORDER BY tm.role, u.name """, (team_id,)) return [dict(row) for row in cursor.fetchall()] @staticmethod def get_user_role(team_id: str, user_id: str) -> Optional[str]: with get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT role FROM team_members WHERE team_id = ? AND user_id = ?", (team_id, user_id) ) row = cursor.fetchone() return row[0] if row else None # === Database Stats === def get_stats() -> Dict: """Get database statistics.""" with get_connection() as conn: cursor = conn.cursor() stats = {} # Table counts tables = ['projects', 'components', 'styles', 'sync_history', 'activity_log', 'teams', 'users', 'figma_cache'] for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") stats[table] = cursor.fetchone()[0] # Database file size if DB_PATH.exists(): stats['db_size_mb'] = round(DB_PATH.stat().st_size / (1024 * 1024), 2) # Cache stats now = int(time.time()) cursor.execute("SELECT COUNT(*) FROM figma_cache WHERE expires_at > ?", (now,)) stats['cache_valid'] = cursor.fetchone()[0] return stats # Initialize on import init_database() # === CLI for testing === if __name__ == "__main__": import sys if len(sys.argv) > 1: cmd = sys.argv[1] if cmd == "stats": print(json.dumps(get_stats(), indent=2)) elif cmd == "init": init_database() print("Database initialized") elif cmd == "cache-test": Cache.set("test_key", {"foo": "bar"}, ttl=60) print(f"Set: test_key") print(f"Get: {Cache.get('test_key')}") elif cmd == "clear-cache": Cache.clear_all() print("Cache cleared") else: print("Usage: python database.py [stats|init|cache-test|clear-cache]") print(f"\nDatabase: {DB_PATH}") print(f"Stats: {json.dumps(get_stats(), indent=2)}")