From 069f5482d86a850534b6e7411e62e2c72fc2b9c4 Mon Sep 17 00:00:00 2001 From: Bruno Sarlo Date: Wed, 10 Dec 2025 08:21:14 -0300 Subject: [PATCH] Replace SQLite with JSON file storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove database.py (SQLite) from tools/storage/ and dss-mvp1/ - Add json_store.py with full JSON-based storage layer - Update 16 files to use new json_store imports - Storage now mirrors DSS canonical structure: .dss/data/ ├── _system/ (config, cache, activity) ├── projects/ (per-project: tokens, components, styles) └── teams/ (team definitions) - Remove Docker files (not needed) - Update DSS_CORE.json to v1.1.0 Philosophy: "Eat our own food" - storage structure matches DSS design 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .dockerignore | 62 - .knowledge/DSS_CORE.json | 25 +- Dockerfile | 57 - cli/python/api/server.py | 2 +- docker-compose.yml | 54 - dss-mvp1/dss/export_import/merger.py | 2 +- dss-mvp1/dss/export_import/service.py | 2 +- dss-mvp1/dss/status/dashboard.py | 2 +- dss-mvp1/dss/storage/database.py | 848 -------------- tools/api/mcp_server.py | 2 +- tools/api/server.py | 14 +- tools/auth/atlassian_auth.py | 2 +- tools/dss_mcp/audit.py | 2 +- tools/dss_mcp/context/project_context.py | 2 +- tools/dss_mcp/handler.py | 2 +- tools/dss_mcp/integrations/base.py | 2 +- tools/dss_mcp/operations.py | 2 +- tools/dss_mcp/security.py | 2 +- tools/dss_mcp/tools/project_tools.py | 2 +- tools/figma/figma_tools.py | 2 +- tools/storage/database.py | 1332 ---------------------- tools/storage/json_store.py | 1026 +++++++++++++++++ 22 files changed, 1064 insertions(+), 2382 deletions(-) delete mode 100644 .dockerignore delete mode 100644 Dockerfile delete mode 100644 docker-compose.yml delete mode 100644 dss-mvp1/dss/storage/database.py delete mode 100644 tools/storage/database.py create mode 100644 tools/storage/json_store.py diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 2bde791..0000000 --- a/.dockerignore +++ /dev/null @@ -1,62 +0,0 @@ -# Python -__pycache__/ -*.py[cod] -*$py.class -*.so -.Python -env/ -venv/ -ENV/ -*.egg-info/ -dist/ -build/ - -# DSS data -.dss/ -*.db -*.db-wal -*.db-shm - -# Environment -.env -.env.local -.env.production - -# IDE -.vscode/ -.idea/ -*.swp -*.swo -*~ - -# Git -.git/ -.gitignore - -# Documentation -docs/ -*.md -!README.md - -# Tests -tests/ -pytest_cache/ -.coverage -htmlcov/ - -# Node -node_modules/ -npm-debug.log - -# OS -.DS_Store -Thumbs.db - -# Backups -backups/ -*.backup -*.bak - -# Logs -*.log -logs/ diff --git a/.knowledge/DSS_CORE.json b/.knowledge/DSS_CORE.json index 0b0413f..b918530 100644 --- a/.knowledge/DSS_CORE.json +++ b/.knowledge/DSS_CORE.json @@ -1,6 +1,6 @@ { "$schema": "dss-core-v1", - "version": "1.0.0", + "version": "1.1.0", "last_updated": "2025-12-10", "purpose": "Single source of truth for AI agents working with DSS", @@ -53,7 +53,7 @@ "layers": { "router": "MCP Server (36 tools), REST API (34 endpoints), CLI", "messaging": "Circuit breaker, Activity log, Event emitter", - "workflows": "Figma client, Token ingestion, Storybook generator, Analysis engine, Context compiler, Storage (SQLite)" + "workflows": "Figma client, Token ingestion, Storybook generator, Analysis engine, Context compiler, Storage (JSON files)" }, "ports": { "rest_api": 3456, @@ -62,8 +62,17 @@ "dependencies": { "python": ">=3.10", "node": ">=18", - "db": "sqlite3", - "services": ["figma-api", "storybook", "nginx"] + "services": ["figma-api", "storybook"] + }, + "storage": { + "type": "JSON files", + "location": ".dss/data/", + "structure": { + "_system": "config, cache, activity logs", + "projects/{id}": "manifest, tokens/, components/, styles/, figma/, metrics/", + "teams/{id}": "manifest, members, access" + }, + "philosophy": "Eat our own food - storage mirrors DSS canonical structure" } }, @@ -98,7 +107,8 @@ "rest_api": "tools/api/server.py", "token_parsers": "tools/ingest/", "analysis": "tools/analyze/", - "database": ".dss/dss.db", + "storage": "tools/storage/json_store.py", + "data": ".dss/data/", "schemas": ".dss/schema/", "admin_ui": "admin-ui/", "skills": "dss-claude-plugin/skills/", @@ -201,8 +211,8 @@ "debounce_ms": 250 }, "storage": { - "db": ".dss/dss.db", - "cache": ".dss/cache" + "data": ".dss/data/", + "cache": ".dss/data/_system/cache/" } }, @@ -213,6 +223,7 @@ }, "changelog": [ + {"version": "1.1.0", "date": "2025-12-10", "notes": "Migrate from SQLite to JSON file storage"}, {"version": "1.0.0", "date": "2025-12-10", "notes": "Initial core definition"} ] } diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 85e17ff..0000000 --- a/Dockerfile +++ /dev/null @@ -1,57 +0,0 @@ -# Design System Server (DSS) - Docker Image -# Version: 0.8.0 - -FROM python:3.11-slim - -LABEL maintainer="DSS Team" -LABEL version="0.8.0" -LABEL description="Design System Server with MCP integration" - -# Set environment variables -ENV PYTHONUNBUFFERED=1 \ - PYTHONDONTWRITEBYTECODE=1 \ - PIP_NO_CACHE_DIR=1 \ - PIP_DISABLE_PIP_VERSION_CHECK=1 - -# Install system dependencies -RUN apt-get update && apt-get install -y \ - sqlite3 \ - curl \ - && rm -rf /var/lib/apt/lists/* - -# Create app user -RUN useradd -m -u 1000 dss && \ - mkdir -p /app && \ - chown -R dss:dss /app - -# Set working directory -WORKDIR /app - -# Copy requirements -COPY requirements.txt . - -# Install Python dependencies -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY --chown=dss:dss . . - -# Create data directories -RUN mkdir -p /app/.dss/cache && \ - chown -R dss:dss /app/.dss - -# Switch to app user -USER dss - -# Expose port -EXPOSE 3456 - -# Health check -HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ - CMD curl -f http://localhost:3456/health || exit 1 - -# Set working directory to tools/api -WORKDIR /app/tools/api - -# Run server -CMD ["python3", "-m", "uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3456"] diff --git a/cli/python/api/server.py b/cli/python/api/server.py index 64810a8..c13bffb 100644 --- a/cli/python/api/server.py +++ b/cli/python/api/server.py @@ -34,7 +34,7 @@ import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from config import config -from storage.database import ( +from storage.json_store import ( Projects, Components, SyncHistory, ActivityLog, Teams, Cache, get_stats ) from figma.figma_tools import FigmaToolSuite diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index fb5ead7..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,54 +0,0 @@ -version: '3.8' - -services: - dss: - build: . - container_name: dss-server - restart: unless-stopped - ports: - - "3456:3456" - env_file: - - .env - environment: - - NODE_ENV=production - - HOST=0.0.0.0 - - PORT=3456 - - DATABASE_PATH=/app/.dss/dss.db - - PYTHONPATH=/app/tools - volumes: - # Persistent data - - dss-data:/app/.dss - # Optional: Mount custom config - # - ./custom.env:/app/.env:ro - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:3456/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 40s - logging: - driver: "json-file" - options: - max-size: "10m" - max-file: "3" - - # Optional: Redis for caching (if using Celery) - # redis: - # image: redis:7-alpine - # container_name: dss-redis - # restart: unless-stopped - # ports: - # - "127.0.0.1:6379:6379" - # volumes: - # - redis-data:/data - # command: redis-server --appendonly yes - -volumes: - dss-data: - driver: local - # redis-data: - # driver: local - -networks: - default: - name: dss-network diff --git a/dss-mvp1/dss/export_import/merger.py b/dss-mvp1/dss/export_import/merger.py index 8e23f9e..b2ea9e8 100644 --- a/dss-mvp1/dss/export_import/merger.py +++ b/dss-mvp1/dss/export_import/merger.py @@ -10,7 +10,7 @@ from .security import TimestampConflictResolver from ..models.project import Project from ..models.theme import DesignToken from ..models.component import Component -from ..storage.database import get_connection +from storage.json_store import Projects, Components, Tokens MergeStrategy = Literal["overwrite", "keep_local", "fork", "skip"] diff --git a/dss-mvp1/dss/export_import/service.py b/dss-mvp1/dss/export_import/service.py index 660b3cb..6412794 100644 --- a/dss-mvp1/dss/export_import/service.py +++ b/dss-mvp1/dss/export_import/service.py @@ -20,7 +20,7 @@ from .importer import DSSArchiveImporter, ImportAnalysis from .merger import SmartMerger, ConflictResolutionMode, MergeAnalysis from .security import DatabaseLockingStrategy, MemoryLimitManager from ..models.project import Project -from ..storage.database import get_connection +from storage.json_store import Projects, ActivityLog @dataclass diff --git a/dss-mvp1/dss/status/dashboard.py b/dss-mvp1/dss/status/dashboard.py index 1504cc4..48d3b9b 100644 --- a/dss-mvp1/dss/status/dashboard.py +++ b/dss-mvp1/dss/status/dashboard.py @@ -190,7 +190,7 @@ class StatusDashboard: # Database stats try: - from dss.storage.database import get_stats, ActivityLog, SyncHistory, Projects, Components + from storage.json_store import get_stats, ActivityLog, SyncHistory, Projects, Components stats = get_stats() data.projects_count = stats.get("projects", 0) diff --git a/dss-mvp1/dss/storage/database.py b/dss-mvp1/dss/storage/database.py deleted file mode 100644 index 0cd7a98..0000000 --- a/dss-mvp1/dss/storage/database.py +++ /dev/null @@ -1,848 +0,0 @@ -""" -Design System Server (DSS) - SQLite Storage Layer - -High-efficiency local-first database for: -- Component definitions (relational) -- Sync history (time-series) -- Team/User RBAC -- Figma API cache (TTL-based) - -Design tokens stored as flat JSON files for git-friendly diffs. -""" - -import sqlite3 -import json -import time -import hashlib -from pathlib import Path -from datetime import datetime -from typing import Optional, Dict, List, Any -from contextlib import contextmanager -from dataclasses import dataclass, asdict - -# Database location -DB_DIR = Path(__file__).parent.parent.parent / ".dss" -DB_PATH = DB_DIR / "dss.db" - -# Ensure directory exists -DB_DIR.mkdir(parents=True, exist_ok=True) - - -@contextmanager -def get_connection(): - """Context manager for database connections with WAL mode for performance.""" - conn = sqlite3.connect(DB_PATH, timeout=30.0) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for concurrency - conn.execute("PRAGMA synchronous=NORMAL") # Balance safety/speed - conn.execute("PRAGMA cache_size=-64000") # 64MB cache - conn.execute("PRAGMA temp_store=MEMORY") # Temp tables in memory - try: - yield conn - conn.commit() - except Exception: - conn.rollback() - raise - finally: - conn.close() - - -def init_database(): - """Initialize all database tables.""" - with get_connection() as conn: - cursor = conn.cursor() - - # === Projects === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS projects ( - id TEXT PRIMARY KEY, - uuid TEXT UNIQUE, - name TEXT NOT NULL, - description TEXT, - figma_file_key TEXT, - status TEXT DEFAULT 'active', - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # === Components === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS components ( - id TEXT PRIMARY KEY, - uuid TEXT UNIQUE, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - figma_key TEXT, - description TEXT, - properties TEXT, -- JSON - variants TEXT, -- JSON array - code_generated INTEGER DEFAULT 0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_project ON components(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_name ON components(name)") - - # === Styles === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS styles ( - id TEXT PRIMARY KEY, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - type TEXT NOT NULL, -- TEXT, FILL, EFFECT, GRID - figma_key TEXT, - properties TEXT, -- JSON - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_project ON styles(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_type ON styles(type)") - - # === Tokens (metadata, actual values in JSON files) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS token_collections ( - id TEXT PRIMARY KEY, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - file_path TEXT NOT NULL, - token_count INTEGER DEFAULT 0, - last_synced TEXT, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - - # === Sync History (append-only, time-series) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS sync_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT NOT NULL, - sync_type TEXT NOT NULL, -- tokens, components, styles, full - status TEXT NOT NULL, -- success, failed, partial - items_synced INTEGER DEFAULT 0, - changes TEXT, -- JSON diff summary - error_message TEXT, - started_at TEXT NOT NULL, - completed_at TEXT, - duration_ms INTEGER, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_project ON sync_history(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_time ON sync_history(started_at DESC)") - - # === Activity Log (Enhanced Audit Trail) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS activity_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT, - user_id TEXT, - user_name TEXT, -- Denormalized for faster display - team_context TEXT, -- ui, ux, qa, all - action TEXT NOT NULL, -- Created, Updated, Deleted, Extracted, Synced, etc. - entity_type TEXT, -- project, component, token, figma_file, etc. - entity_id TEXT, - entity_name TEXT, -- Denormalized for faster display - category TEXT, -- design_system, code, configuration, team - severity TEXT DEFAULT 'info', -- info, warning, critical - description TEXT, -- Human-readable description - details TEXT, -- JSON with full context - ip_address TEXT, -- For security audit - user_agent TEXT, -- Browser/client info - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_time ON activity_log(created_at DESC)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_project ON activity_log(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_user ON activity_log(user_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_action ON activity_log(action)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_category ON activity_log(category)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_entity ON activity_log(entity_type, entity_id)") - - # === Teams === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS teams ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - description TEXT, - settings TEXT, -- JSON - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # === Users === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS users ( - id TEXT PRIMARY KEY, - email TEXT UNIQUE NOT NULL, - name TEXT, - avatar_url TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - last_login TEXT - ) - """) - - # === Team Members (RBAC) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS team_members ( - team_id TEXT NOT NULL, - user_id TEXT NOT NULL, - role TEXT NOT NULL, -- SUPER_ADMIN, TEAM_LEAD, DEVELOPER, VIEWER - joined_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (team_id, user_id), - FOREIGN KEY (team_id) REFERENCES teams(id), - FOREIGN KEY (user_id) REFERENCES users(id) - ) - """) - - # === Project Team Access === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS project_access ( - project_id TEXT NOT NULL, - team_id TEXT NOT NULL, - access_level TEXT DEFAULT 'read', -- read, write, admin - granted_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (project_id, team_id), - FOREIGN KEY (project_id) REFERENCES projects(id), - FOREIGN KEY (team_id) REFERENCES teams(id) - ) - """) - - # === Figma Cache (TTL-based) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS figma_cache ( - cache_key TEXT PRIMARY KEY, - value BLOB NOT NULL, - created_at INTEGER NOT NULL, - expires_at INTEGER NOT NULL - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON figma_cache(expires_at)") - - conn.commit() - print(f"[Storage] Database initialized at {DB_PATH}") - - -# === Cache Operations === - -class Cache: - """TTL-based cache using SQLite.""" - - DEFAULT_TTL = 300 # 5 minutes - - @staticmethod - def set(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None: - """Store a value with TTL.""" - now = int(time.time()) - expires = now + ttl - data = json.dumps(value).encode() if not isinstance(value, bytes) else value - - with get_connection() as conn: - conn.execute( - "INSERT OR REPLACE INTO figma_cache (cache_key, value, created_at, expires_at) VALUES (?, ?, ?, ?)", - (key, data, now, expires) - ) - - @staticmethod - def get(key: str) -> Optional[Any]: - """Get a value if not expired.""" - now = int(time.time()) - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT value FROM figma_cache WHERE cache_key = ? AND expires_at > ?", - (key, now) - ) - row = cursor.fetchone() - - if row: - try: - return json.loads(row[0]) - except (json.JSONDecodeError, TypeError): - return row[0] - return None - - @staticmethod - def delete(key: str) -> None: - """Delete a cache entry.""" - with get_connection() as conn: - conn.execute("DELETE FROM figma_cache WHERE cache_key = ?", (key,)) - - @staticmethod - def clear_expired() -> int: - """Remove all expired entries. Returns count deleted.""" - now = int(time.time()) - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM figma_cache WHERE expires_at <= ?", (now,)) - return cursor.rowcount - - @staticmethod - def clear_all() -> None: - """Clear entire cache.""" - with get_connection() as conn: - conn.execute("DELETE FROM figma_cache") - - -# === Project Operations === - -class Projects: - """Project CRUD operations.""" - - @staticmethod - def create(id: str, name: str, description: str = "", figma_file_key: str = "") -> Dict: - with get_connection() as conn: - conn.execute( - "INSERT INTO projects (id, name, description, figma_file_key) VALUES (?, ?, ?, ?)", - (id, name, description, figma_file_key) - ) - return Projects.get(id) - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM projects WHERE id = ?", (id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list(status: str = None) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - if status: - cursor.execute("SELECT * FROM projects WHERE status = ? ORDER BY updated_at DESC", (status,)) - else: - cursor.execute("SELECT * FROM projects ORDER BY updated_at DESC") - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def update(id: str, **kwargs) -> Optional[Dict]: - if not kwargs: - return Projects.get(id) - - fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) - values = list(kwargs.values()) + [id] - - with get_connection() as conn: - conn.execute( - f"UPDATE projects SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - values - ) - return Projects.get(id) - - @staticmethod - def delete(id: str) -> bool: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM projects WHERE id = ?", (id,)) - return cursor.rowcount > 0 - - -# === Component Operations === - -class Components: - """Component CRUD operations.""" - - @staticmethod - def upsert(project_id: str, components: List[Dict]) -> int: - """Bulk upsert components. Returns count.""" - with get_connection() as conn: - cursor = conn.cursor() - count = 0 - for comp in components: - cursor.execute(""" - INSERT OR REPLACE INTO components - (id, project_id, name, figma_key, description, properties, variants, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) - """, ( - comp.get('id') or f"{project_id}-{comp['name']}", - project_id, - comp['name'], - comp.get('figma_key') or comp.get('key'), - comp.get('description', ''), - json.dumps(comp.get('properties', {})), - json.dumps(comp.get('variants', [])) - )) - count += 1 - return count - - @staticmethod - def list(project_id: str) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM components WHERE project_id = ? ORDER BY name", - (project_id,) - ) - results = [] - for row in cursor.fetchall(): - comp = dict(row) - comp['properties'] = json.loads(comp['properties'] or '{}') - comp['variants'] = json.loads(comp['variants'] or '[]') - results.append(comp) - return results - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM components WHERE id = ?", (id,)) - row = cursor.fetchone() - if row: - comp = dict(row) - comp['properties'] = json.loads(comp['properties'] or '{}') - comp['variants'] = json.loads(comp['variants'] or '[]') - return comp - return None - - -# === Sync History === - -class SyncHistory: - """Append-only sync history log.""" - - @staticmethod - def start(project_id: str, sync_type: str) -> int: - """Start a sync, returns sync ID.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO sync_history (project_id, sync_type, status, started_at) VALUES (?, ?, 'running', ?)", - (project_id, sync_type, datetime.utcnow().isoformat()) - ) - return cursor.lastrowid - - @staticmethod - def complete(sync_id: int, status: str, items_synced: int = 0, changes: Dict = None, error: str = None): - """Complete a sync with results.""" - started = None - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT started_at FROM sync_history WHERE id = ?", (sync_id,)) - row = cursor.fetchone() - if row: - started = datetime.fromisoformat(row[0]) - - completed = datetime.utcnow() - duration_ms = int((completed - started).total_seconds() * 1000) if started else 0 - - with get_connection() as conn: - conn.execute(""" - UPDATE sync_history SET - status = ?, items_synced = ?, changes = ?, error_message = ?, - completed_at = ?, duration_ms = ? - WHERE id = ? - """, ( - status, items_synced, - json.dumps(changes) if changes else None, - error, - completed.isoformat(), duration_ms, - sync_id - )) - - @staticmethod - def recent(project_id: str = None, limit: int = 20) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - if project_id: - cursor.execute( - "SELECT * FROM sync_history WHERE project_id = ? ORDER BY started_at DESC LIMIT ?", - (project_id, limit) - ) - else: - cursor.execute( - "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?", - (limit,) - ) - results = [] - for row in cursor.fetchall(): - sync = dict(row) - sync['changes'] = json.loads(sync['changes']) if sync['changes'] else None - results.append(sync) - return results - - -# === Activity Log (Enhanced Audit System) === - -class ActivityLog: - """Enhanced activity tracking for comprehensive audit trail.""" - - # Action categories for better organization - CATEGORIES = { - 'design_system': ['extract_tokens', 'extract_components', 'sync_tokens', 'validate_tokens'], - 'code': ['analyze_components', 'find_inline_styles', 'generate_code', 'get_quick_wins'], - 'configuration': ['config_updated', 'figma_token_updated', 'mode_changed', 'service_configured'], - 'project': ['project_created', 'project_updated', 'project_deleted'], - 'team': ['team_context_changed', 'project_context_changed'], - 'storybook': ['scan_storybook', 'generate_story', 'generate_theme'] - } - - @staticmethod - def log(action: str, - entity_type: str = None, - entity_id: str = None, - entity_name: str = None, - project_id: str = None, - user_id: str = None, - user_name: str = None, - team_context: str = None, - description: str = None, - category: str = None, - severity: str = 'info', - details: Dict = None, - ip_address: str = None, - user_agent: str = None): - """ - Log an activity with enhanced audit information. - - Args: - action: Action performed (e.g., 'project_created', 'tokens_extracted') - entity_type: Type of entity affected (e.g., 'project', 'component') - entity_id: ID of the affected entity - entity_name: Human-readable name of the entity - project_id: Project context - user_id: User who performed the action - user_name: Human-readable user name - team_context: Team context (ui, ux, qa, all) - description: Human-readable description of the action - category: Category (design_system, code, configuration, etc.) - severity: info, warning, critical - details: Additional JSON details - ip_address: Client IP for security audit - user_agent: Browser/client information - """ - # Auto-detect category if not provided - if not category: - for cat, actions in ActivityLog.CATEGORIES.items(): - if action in actions: - category = cat - break - if not category: - category = 'other' - - # Generate description if not provided - if not description: - description = ActivityLog._generate_description(action, entity_type, entity_name, details) - - with get_connection() as conn: - conn.execute(""" - INSERT INTO activity_log ( - project_id, user_id, user_name, team_context, - action, entity_type, entity_id, entity_name, - category, severity, description, details, - ip_address, user_agent - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - project_id, user_id, user_name, team_context, - action, entity_type, entity_id, entity_name, - category, severity, description, - json.dumps(details) if details else None, - ip_address, user_agent - )) - - @staticmethod - def _generate_description(action: str, entity_type: str, entity_name: str, details: Dict) -> str: - """Generate human-readable description from action data.""" - entity_str = f"{entity_type} '{entity_name}'" if entity_name else (entity_type or "item") - - action_map = { - 'project_created': f"Created project {entity_str}", - 'project_updated': f"Updated {entity_str}", - 'project_deleted': f"Deleted {entity_str}", - 'extract_tokens': f"Extracted design tokens from Figma", - 'extract_components': f"Extracted components from Figma", - 'sync_tokens': f"Synced tokens to file", - 'config_updated': "Updated configuration", - 'figma_token_updated': "Updated Figma API token", - 'team_context_changed': f"Switched to team context", - 'project_context_changed': f"Switched to project {entity_name}", - } - - return action_map.get(action, f"{action.replace('_', ' ').title()}") - - @staticmethod - def recent(project_id: str = None, limit: int = 50, offset: int = 0) -> List[Dict]: - """Get recent activity with pagination.""" - with get_connection() as conn: - cursor = conn.cursor() - if project_id: - cursor.execute( - "SELECT * FROM activity_log WHERE project_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", - (project_id, limit, offset) - ) - else: - cursor.execute( - "SELECT * FROM activity_log ORDER BY created_at DESC LIMIT ? OFFSET ?", - (limit, offset) - ) - results = [] - for row in cursor.fetchall(): - activity = dict(row) - activity['details'] = json.loads(activity['details']) if activity['details'] else None - results.append(activity) - return results - - @staticmethod - def search( - project_id: str = None, - user_id: str = None, - action: str = None, - category: str = None, - entity_type: str = None, - severity: str = None, - start_date: str = None, - end_date: str = None, - limit: int = 100, - offset: int = 0 - ) -> List[Dict]: - """Advanced search/filter for audit logs.""" - conditions = [] - params = [] - - if project_id: - conditions.append("project_id = ?") - params.append(project_id) - if user_id: - conditions.append("user_id = ?") - params.append(user_id) - if action: - conditions.append("action = ?") - params.append(action) - if category: - conditions.append("category = ?") - params.append(category) - if entity_type: - conditions.append("entity_type = ?") - params.append(entity_type) - if severity: - conditions.append("severity = ?") - params.append(severity) - if start_date: - conditions.append("created_at >= ?") - params.append(start_date) - if end_date: - conditions.append("created_at <= ?") - params.append(end_date) - - where_clause = " AND ".join(conditions) if conditions else "1=1" - params.extend([limit, offset]) - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(f""" - SELECT * FROM activity_log - WHERE {where_clause} - ORDER BY created_at DESC - LIMIT ? OFFSET ? - """, params) - - results = [] - for row in cursor.fetchall(): - activity = dict(row) - activity['details'] = json.loads(activity['details']) if activity['details'] else None - results.append(activity) - return results - - @staticmethod - def count( - project_id: str = None, - user_id: str = None, - action: str = None, - category: str = None - ) -> int: - """Count activities matching filters.""" - conditions = [] - params = [] - - if project_id: - conditions.append("project_id = ?") - params.append(project_id) - if user_id: - conditions.append("user_id = ?") - params.append(user_id) - if action: - conditions.append("action = ?") - params.append(action) - if category: - conditions.append("category = ?") - params.append(category) - - where_clause = " AND ".join(conditions) if conditions else "1=1" - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(f"SELECT COUNT(*) FROM activity_log WHERE {where_clause}", params) - return cursor.fetchone()[0] - - @staticmethod - def get_categories() -> List[str]: - """Get list of all categories used.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT DISTINCT category FROM activity_log WHERE category IS NOT NULL ORDER BY category") - return [row[0] for row in cursor.fetchall()] - - @staticmethod - def get_actions() -> List[str]: - """Get list of all actions used.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT DISTINCT action FROM activity_log ORDER BY action") - return [row[0] for row in cursor.fetchall()] - - @staticmethod - def get_stats_by_category() -> Dict[str, int]: - """Get activity count by category.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT category, COUNT(*) as count - FROM activity_log - GROUP BY category - ORDER BY count DESC - """) - return {row[0]: row[1] for row in cursor.fetchall()} - - @staticmethod - def get_stats_by_user() -> Dict[str, int]: - """Get activity count by user.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT COALESCE(user_name, user_id, 'Unknown') as user, COUNT(*) as count - FROM activity_log - GROUP BY user_name, user_id - ORDER BY count DESC - """) - return {row[0]: row[1] for row in cursor.fetchall()} - - -# === Teams & RBAC === - -class Teams: - """Team and role management.""" - - @staticmethod - def create(id: str, name: str, description: str = "") -> Dict: - with get_connection() as conn: - conn.execute( - "INSERT INTO teams (id, name, description) VALUES (?, ?, ?)", - (id, name, description) - ) - return Teams.get(id) - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM teams WHERE id = ?", (id,)) - row = cursor.fetchone() - if row: - team = dict(row) - team['settings'] = json.loads(team['settings']) if team['settings'] else {} - return team - return None - - @staticmethod - def list() -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM teams ORDER BY name") - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def add_member(team_id: str, user_id: str, role: str): - with get_connection() as conn: - conn.execute( - "INSERT OR REPLACE INTO team_members (team_id, user_id, role) VALUES (?, ?, ?)", - (team_id, user_id, role) - ) - - @staticmethod - def get_members(team_id: str) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT u.*, tm.role, tm.joined_at - FROM team_members tm - JOIN users u ON u.id = tm.user_id - WHERE tm.team_id = ? - ORDER BY tm.role, u.name - """, (team_id,)) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def get_user_role(team_id: str, user_id: str) -> Optional[str]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT role FROM team_members WHERE team_id = ? AND user_id = ?", - (team_id, user_id) - ) - row = cursor.fetchone() - return row[0] if row else None - - -# === Database Stats === - -def get_stats() -> Dict: - """Get database statistics.""" - with get_connection() as conn: - cursor = conn.cursor() - - stats = {} - - # Table counts - tables = ['projects', 'components', 'styles', 'sync_history', 'activity_log', 'teams', 'users', 'figma_cache'] - for table in tables: - cursor.execute(f"SELECT COUNT(*) FROM {table}") - stats[table] = cursor.fetchone()[0] - - # Database file size - if DB_PATH.exists(): - stats['db_size_mb'] = round(DB_PATH.stat().st_size / (1024 * 1024), 2) - - # Cache stats - now = int(time.time()) - cursor.execute("SELECT COUNT(*) FROM figma_cache WHERE expires_at > ?", (now,)) - stats['cache_valid'] = cursor.fetchone()[0] - - return stats - - -# Initialize on import -init_database() - - -# === CLI for testing === -if __name__ == "__main__": - import sys - - if len(sys.argv) > 1: - cmd = sys.argv[1] - - if cmd == "stats": - print(json.dumps(get_stats(), indent=2)) - - elif cmd == "init": - init_database() - print("Database initialized") - - elif cmd == "cache-test": - Cache.set("test_key", {"foo": "bar"}, ttl=60) - print(f"Set: test_key") - print(f"Get: {Cache.get('test_key')}") - - elif cmd == "clear-cache": - Cache.clear_all() - print("Cache cleared") - - else: - print("Usage: python database.py [stats|init|cache-test|clear-cache]") - print(f"\nDatabase: {DB_PATH}") - print(f"Stats: {json.dumps(get_stats(), indent=2)}") diff --git a/tools/api/mcp_server.py b/tools/api/mcp_server.py index 533f321..bbfc084 100644 --- a/tools/api/mcp_server.py +++ b/tools/api/mcp_server.py @@ -39,7 +39,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from mcp.server.fastmcp import FastMCP from config import config -from storage.database import Projects, Components, SyncHistory, ActivityLog, get_stats +from storage.json_store import Projects, Components, SyncHistory, ActivityLog, get_stats from figma.figma_tools import FigmaToolSuite # Import new ingestion modules diff --git a/tools/api/server.py b/tools/api/server.py index 8352368..65e796a 100644 --- a/tools/api/server.py +++ b/tools/api/server.py @@ -64,10 +64,9 @@ from browser_logger import router as browser_log_router # Legacy imports (will gradually migrate these) from config import config -from storage.database import ( +from storage.json_store import ( Projects, Components, SyncHistory, ActivityLog, Teams, Cache, get_stats, - FigmaFiles, ESREDefinitions, TokenDriftDetector, CodeMetrics, TestResults, - get_connection + FigmaFiles, CodeMetrics, TestResults, TokenDrift, Tokens, Styles ) from figma.figma_tools import FigmaToolSuite @@ -405,16 +404,15 @@ async def health(): import psutil from pathlib import Path - # ❤️ Check Heart (database) connectivity + # ❤️ Check Heart (storage) connectivity db_ok = False try: - with get_connection() as conn: - conn.execute("SELECT 1").fetchone() - db_ok = True + from storage.json_store import DATA_DIR + db_ok = DATA_DIR.exists() except Exception as e: import traceback error_trace = traceback.format_exc() - print(f"🏥 VITAL SIGN: Heart (database) error: {type(e).__name__}: {e}", flush=True) + print(f"🏥 VITAL SIGN: Heart (storage) error: {type(e).__name__}: {e}", flush=True) print(f" Traceback:\n{error_trace}", flush=True) pass diff --git a/tools/auth/atlassian_auth.py b/tools/auth/atlassian_auth.py index 975b611..d837d0e 100644 --- a/tools/auth/atlassian_auth.py +++ b/tools/auth/atlassian_auth.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta from typing import Optional, Dict, Any from atlassian import Jira, Confluence -from storage.database import get_connection +from storage.json_store import read_json, write_json, SYSTEM_DIR class AtlassianAuth: diff --git a/tools/dss_mcp/audit.py b/tools/dss_mcp/audit.py index 3bea95f..0e15bc8 100644 --- a/tools/dss_mcp/audit.py +++ b/tools/dss_mcp/audit.py @@ -11,7 +11,7 @@ from typing import Optional, Dict, Any from datetime import datetime from enum import Enum -from storage.database import get_connection # Use absolute import (tools/ is in sys.path) +from storage.json_store import ActivityLog, append_jsonl, read_jsonl, SYSTEM_DIR # JSON storage class AuditEventType(Enum): diff --git a/tools/dss_mcp/context/project_context.py b/tools/dss_mcp/context/project_context.py index 1b87f94..9c236ac 100644 --- a/tools/dss_mcp/context/project_context.py +++ b/tools/dss_mcp/context/project_context.py @@ -17,7 +17,7 @@ from pathlib import Path import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent)) -from storage.database import get_connection, Projects +from storage.json_store import Projects, Components, Tokens from analyze.scanner import ProjectScanner from ..config import mcp_config diff --git a/tools/dss_mcp/handler.py b/tools/dss_mcp/handler.py index 241b053..8a98d1b 100644 --- a/tools/dss_mcp/handler.py +++ b/tools/dss_mcp/handler.py @@ -22,7 +22,7 @@ from pathlib import Path # Note: sys.path is set up by the importing module (server.py) # Do NOT modify sys.path here as it causes relative import issues -from storage.database import get_connection +from storage.json_store import Projects, ActivityLog from .config import mcp_config, integration_config from .context.project_context import get_context_manager, ProjectContext from .tools.project_tools import PROJECT_TOOLS, ProjectTools diff --git a/tools/dss_mcp/integrations/base.py b/tools/dss_mcp/integrations/base.py index e7fc2e1..6bb9f7e 100644 --- a/tools/dss_mcp/integrations/base.py +++ b/tools/dss_mcp/integrations/base.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta from enum import Enum from ..config import mcp_config -from storage.database import get_connection +from storage.json_store import Cache, read_json, write_json, SYSTEM_DIR class CircuitState(Enum): diff --git a/tools/dss_mcp/operations.py b/tools/dss_mcp/operations.py index 4b4b453..77cbc04 100644 --- a/tools/dss_mcp/operations.py +++ b/tools/dss_mcp/operations.py @@ -13,7 +13,7 @@ from datetime import datetime from enum import Enum from .config import mcp_config -from storage.database import get_connection # Use absolute import (tools/ is in sys.path) +from storage.json_store import ActivityLog, read_json, write_json, DATA_DIR # JSON storage class OperationStatus(Enum): diff --git a/tools/dss_mcp/security.py b/tools/dss_mcp/security.py index 9ad3936..7f3fed4 100644 --- a/tools/dss_mcp/security.py +++ b/tools/dss_mcp/security.py @@ -16,7 +16,7 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.backends import default_backend from .config import mcp_config -from storage.database import get_connection # Use absolute import (tools/ is in sys.path) +from storage.json_store import read_json, write_json, SYSTEM_DIR # JSON storage class CredentialVault: diff --git a/tools/dss_mcp/tools/project_tools.py b/tools/dss_mcp/tools/project_tools.py index 6bed31f..e4283ba 100644 --- a/tools/dss_mcp/tools/project_tools.py +++ b/tools/dss_mcp/tools/project_tools.py @@ -20,7 +20,7 @@ from mcp import types from ..context.project_context import get_context_manager from ..security import CredentialVault from ..audit import AuditLog, AuditEventType -from storage.database import get_connection # Use absolute import (tools/ is in sys.path) +from storage.json_store import Projects, Components, Tokens, ActivityLog # JSON storage # Tool definitions (metadata for Claude) diff --git a/tools/figma/figma_tools.py b/tools/figma/figma_tools.py index d8e7668..fe88008 100644 --- a/tools/figma/figma_tools.py +++ b/tools/figma/figma_tools.py @@ -38,7 +38,7 @@ import httpx sys.path.insert(0, str(Path(__file__).parent.parent)) from config import config -from storage.database import Cache, ActivityLog +from storage.json_store import Cache, ActivityLog @dataclass class DesignToken: diff --git a/tools/storage/database.py b/tools/storage/database.py deleted file mode 100644 index 617d22d..0000000 --- a/tools/storage/database.py +++ /dev/null @@ -1,1332 +0,0 @@ -""" -Design System Server (DSS) - SQLite Storage Layer - -High-efficiency local-first database for: -- Component definitions (relational) -- Sync history (time-series) -- Team/User RBAC -- Figma API cache (TTL-based) - -Design tokens stored as flat JSON files for git-friendly diffs. -""" - -import sqlite3 -import json -import time -import hashlib -from pathlib import Path -from datetime import datetime -from typing import Optional, Dict, List, Any -from contextlib import contextmanager -from dataclasses import dataclass, asdict - -# Database location -DB_DIR = Path(__file__).parent.parent.parent / ".dss" -DB_PATH = DB_DIR / "dss.db" - -# Ensure directory exists -DB_DIR.mkdir(parents=True, exist_ok=True) - - -@contextmanager -def get_connection(): - """Context manager for database connections with WAL mode for performance.""" - conn = sqlite3.connect(DB_PATH, timeout=30.0) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for concurrency - conn.execute("PRAGMA synchronous=NORMAL") # Balance safety/speed - conn.execute("PRAGMA cache_size=-64000") # 64MB cache - conn.execute("PRAGMA temp_store=MEMORY") # Temp tables in memory - try: - yield conn - conn.commit() - except Exception: - conn.rollback() - raise - finally: - conn.close() - - -def init_database(): - """Initialize all database tables.""" - with get_connection() as conn: - cursor = conn.cursor() - - # === Projects === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS projects ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - description TEXT, - figma_file_key TEXT, - status TEXT DEFAULT 'active', - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # === Components === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS components ( - id TEXT PRIMARY KEY, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - figma_key TEXT, - description TEXT, - properties TEXT, -- JSON - variants TEXT, -- JSON array - code_generated INTEGER DEFAULT 0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_project ON components(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_name ON components(name)") - - # === Styles === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS styles ( - id TEXT PRIMARY KEY, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - type TEXT NOT NULL, -- TEXT, FILL, EFFECT, GRID - figma_key TEXT, - properties TEXT, -- JSON - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_project ON styles(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_styles_type ON styles(type)") - - # === Tokens (metadata, actual values in JSON files) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS token_collections ( - id TEXT PRIMARY KEY, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - file_path TEXT NOT NULL, - token_count INTEGER DEFAULT 0, - last_synced TEXT, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - - # === Sync History (append-only, time-series) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS sync_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT NOT NULL, - sync_type TEXT NOT NULL, -- tokens, components, styles, full - status TEXT NOT NULL, -- success, failed, partial - items_synced INTEGER DEFAULT 0, - changes TEXT, -- JSON diff summary - error_message TEXT, - started_at TEXT NOT NULL, - completed_at TEXT, - duration_ms INTEGER, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_project ON sync_history(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_sync_time ON sync_history(started_at DESC)") - - # === Activity Log (Enhanced Audit Trail) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS activity_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT, - user_id TEXT, - user_name TEXT, -- Denormalized for faster display - team_context TEXT, -- ui, ux, qa, all - action TEXT NOT NULL, -- Created, Updated, Deleted, Extracted, Synced, etc. - entity_type TEXT, -- project, component, token, figma_file, etc. - entity_id TEXT, - entity_name TEXT, -- Denormalized for faster display - category TEXT, -- design_system, code, configuration, team - severity TEXT DEFAULT 'info', -- info, warning, critical - description TEXT, -- Human-readable description - details TEXT, -- JSON with full context - ip_address TEXT, -- For security audit - user_agent TEXT, -- Browser/client info - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_time ON activity_log(created_at DESC)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_project ON activity_log(project_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_user ON activity_log(user_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_action ON activity_log(action)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_category ON activity_log(category)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_activity_entity ON activity_log(entity_type, entity_id)") - - # === Teams === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS teams ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - description TEXT, - settings TEXT, -- JSON - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # === Users === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS users ( - id TEXT PRIMARY KEY, - email TEXT UNIQUE NOT NULL, - name TEXT, - avatar_url TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - last_login TEXT - ) - """) - - # === Team Members (RBAC) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS team_members ( - team_id TEXT NOT NULL, - user_id TEXT NOT NULL, - role TEXT NOT NULL, -- SUPER_ADMIN, TEAM_LEAD, DEVELOPER, VIEWER - joined_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (team_id, user_id), - FOREIGN KEY (team_id) REFERENCES teams(id), - FOREIGN KEY (user_id) REFERENCES users(id) - ) - """) - - # === Project Team Access === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS project_access ( - project_id TEXT NOT NULL, - team_id TEXT NOT NULL, - access_level TEXT DEFAULT 'read', -- read, write, admin - granted_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (project_id, team_id), - FOREIGN KEY (project_id) REFERENCES projects(id), - FOREIGN KEY (team_id) REFERENCES teams(id) - ) - """) - - # === Figma Cache (TTL-based) === - cursor.execute(""" - CREATE TABLE IF NOT EXISTS figma_cache ( - cache_key TEXT PRIMARY KEY, - value BLOB NOT NULL, - created_at INTEGER NOT NULL, - expires_at INTEGER NOT NULL - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON figma_cache(expires_at)") - - # === Team Dashboard Tables (Component-Centric Architecture) === - - # Figma Files (UX Dashboard) - Multiple Figma files per project - cursor.execute(""" - CREATE TABLE IF NOT EXISTS figma_files ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT NOT NULL, - figma_url TEXT NOT NULL, - file_name TEXT NOT NULL, - file_key TEXT NOT NULL, - last_synced TEXT, - sync_status TEXT DEFAULT 'pending', - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_figma_files_project ON figma_files(project_id)") - - # Component Tokens (UX Team View) - Which tokens does each component use? - cursor.execute(""" - CREATE TABLE IF NOT EXISTS component_tokens ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - component_id TEXT NOT NULL, - token_name TEXT NOT NULL, - token_value TEXT NOT NULL, - source TEXT NOT NULL, -- figma, css, scss, tailwind, json, code - source_file TEXT, - figma_node_id TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (component_id) REFERENCES components(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_component ON component_tokens(component_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_component_tokens_name ON component_tokens(token_name)") - - # Code Metrics (UI Team View) - Implementation quality metrics - cursor.execute(""" - CREATE TABLE IF NOT EXISTS code_metrics ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - component_id TEXT NOT NULL, - file_path TEXT NOT NULL, - sloc INTEGER DEFAULT 0, - complexity_score REAL DEFAULT 0.0, - prop_count INTEGER DEFAULT 0, - has_tests INTEGER DEFAULT 0, - test_coverage REAL DEFAULT 0.0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (component_id) REFERENCES components(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_component ON code_metrics(component_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_code_metrics_file ON code_metrics(file_path)") - - # Test Results (QA Team View) - Test execution results - cursor.execute(""" - CREATE TABLE IF NOT EXISTS test_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - component_id TEXT NOT NULL, - test_type TEXT NOT NULL, -- esre, regression, visual, unit - passed INTEGER NOT NULL, - score REAL, - failures TEXT, -- JSON array - run_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (component_id) REFERENCES components(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_component ON test_results(component_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_test_results_type ON test_results(test_type)") - - # ESRE Definitions (QA Dashboard) - Natural language requirements - cursor.execute(""" - CREATE TABLE IF NOT EXISTS esre_definitions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_id TEXT NOT NULL, - name TEXT NOT NULL, - definition_text TEXT NOT NULL, - expected_value TEXT, - component_name TEXT, - status TEXT DEFAULT 'pending', -- pending, validated, failed - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (project_id) REFERENCES projects(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_esre_project ON esre_definitions(project_id)") - - # Implementation Snapshots - Track implementation state over time - cursor.execute(""" - CREATE TABLE IF NOT EXISTS implementation_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - component_id TEXT NOT NULL, - snapshot_data TEXT NOT NULL, -- JSON with full implementation details - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (component_id) REFERENCES components(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_component ON implementation_snapshots(component_id)") - - # Token Drift (UI Dashboard) - Hardcoded values that should use tokens - cursor.execute(""" - CREATE TABLE IF NOT EXISTS token_drift ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - component_id TEXT NOT NULL, - property_name TEXT NOT NULL, - hardcoded_value TEXT NOT NULL, - suggested_token TEXT, - severity TEXT NOT NULL, -- info, warning, error, critical - file_path TEXT NOT NULL, - line_number INTEGER NOT NULL, - status TEXT DEFAULT 'pending', -- pending, fixed, ignored - detected_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (component_id) REFERENCES components(id) - ) - """) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_component ON token_drift(component_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_token_drift_severity ON token_drift(severity)") - - conn.commit() - print(f"[Storage] Database initialized at {DB_PATH}") - - -# === Cache Operations === - -class Cache: - """TTL-based cache using SQLite.""" - - DEFAULT_TTL = 300 # 5 minutes - - @staticmethod - def set(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None: - """Store a value with TTL.""" - now = int(time.time()) - expires = now + ttl - data = json.dumps(value).encode() if not isinstance(value, bytes) else value - - with get_connection() as conn: - conn.execute( - "INSERT OR REPLACE INTO figma_cache (cache_key, value, created_at, expires_at) VALUES (?, ?, ?, ?)", - (key, data, now, expires) - ) - - @staticmethod - def get(key: str) -> Optional[Any]: - """Get a value if not expired.""" - now = int(time.time()) - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT value FROM figma_cache WHERE cache_key = ? AND expires_at > ?", - (key, now) - ) - row = cursor.fetchone() - - if row: - try: - return json.loads(row[0]) - except (json.JSONDecodeError, TypeError): - return row[0] - return None - - @staticmethod - def delete(key: str) -> None: - """Delete a cache entry.""" - with get_connection() as conn: - conn.execute("DELETE FROM figma_cache WHERE cache_key = ?", (key,)) - - @staticmethod - def clear_expired() -> int: - """Remove all expired entries. Returns count deleted.""" - now = int(time.time()) - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM figma_cache WHERE expires_at <= ?", (now,)) - return cursor.rowcount - - @staticmethod - def clear_all() -> None: - """Clear entire cache.""" - with get_connection() as conn: - conn.execute("DELETE FROM figma_cache") - - -# === Project Operations === - -class Projects: - """Project CRUD operations.""" - - @staticmethod - def create(id: str, name: str, description: str = "", figma_file_key: str = "") -> Dict: - with get_connection() as conn: - conn.execute( - "INSERT INTO projects (id, name, description, figma_file_key) VALUES (?, ?, ?, ?)", - (id, name, description, figma_file_key) - ) - return Projects.get(id) - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM projects WHERE id = ?", (id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list(status: str = None) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - if status: - cursor.execute("SELECT * FROM projects WHERE status = ? ORDER BY updated_at DESC", (status,)) - else: - cursor.execute("SELECT * FROM projects ORDER BY updated_at DESC") - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def update(id: str, **kwargs) -> Optional[Dict]: - if not kwargs: - return Projects.get(id) - - fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) - values = list(kwargs.values()) + [id] - - with get_connection() as conn: - conn.execute( - f"UPDATE projects SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - values - ) - return Projects.get(id) - - @staticmethod - def delete(id: str) -> bool: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM projects WHERE id = ?", (id,)) - return cursor.rowcount > 0 - - -# === Component Operations === - -class Components: - """Component CRUD operations.""" - - @staticmethod - def upsert(project_id: str, components: List[Dict]) -> int: - """Bulk upsert components. Returns count.""" - with get_connection() as conn: - cursor = conn.cursor() - count = 0 - for comp in components: - cursor.execute(""" - INSERT OR REPLACE INTO components - (id, project_id, name, figma_key, description, properties, variants, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) - """, ( - comp.get('id') or f"{project_id}-{comp['name']}", - project_id, - comp['name'], - comp.get('figma_key') or comp.get('key'), - comp.get('description', ''), - json.dumps(comp.get('properties', {})), - json.dumps(comp.get('variants', [])) - )) - count += 1 - return count - - @staticmethod - def list(project_id: str) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM components WHERE project_id = ? ORDER BY name", - (project_id,) - ) - results = [] - for row in cursor.fetchall(): - comp = dict(row) - comp['properties'] = json.loads(comp['properties'] or '{}') - comp['variants'] = json.loads(comp['variants'] or '[]') - results.append(comp) - return results - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM components WHERE id = ?", (id,)) - row = cursor.fetchone() - if row: - comp = dict(row) - comp['properties'] = json.loads(comp['properties'] or '{}') - comp['variants'] = json.loads(comp['variants'] or '[]') - return comp - return None - - -# === Sync History === - -class SyncHistory: - """Append-only sync history log.""" - - @staticmethod - def start(project_id: str, sync_type: str) -> int: - """Start a sync, returns sync ID.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO sync_history (project_id, sync_type, status, started_at) VALUES (?, ?, 'running', ?)", - (project_id, sync_type, datetime.utcnow().isoformat()) - ) - return cursor.lastrowid - - @staticmethod - def complete(sync_id: int, status: str, items_synced: int = 0, changes: Dict = None, error: str = None): - """Complete a sync with results.""" - started = None - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT started_at FROM sync_history WHERE id = ?", (sync_id,)) - row = cursor.fetchone() - if row: - started = datetime.fromisoformat(row[0]) - - completed = datetime.utcnow() - duration_ms = int((completed - started).total_seconds() * 1000) if started else 0 - - with get_connection() as conn: - conn.execute(""" - UPDATE sync_history SET - status = ?, items_synced = ?, changes = ?, error_message = ?, - completed_at = ?, duration_ms = ? - WHERE id = ? - """, ( - status, items_synced, - json.dumps(changes) if changes else None, - error, - completed.isoformat(), duration_ms, - sync_id - )) - - @staticmethod - def recent(project_id: str = None, limit: int = 20) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - if project_id: - cursor.execute( - "SELECT * FROM sync_history WHERE project_id = ? ORDER BY started_at DESC LIMIT ?", - (project_id, limit) - ) - else: - cursor.execute( - "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?", - (limit,) - ) - results = [] - for row in cursor.fetchall(): - sync = dict(row) - sync['changes'] = json.loads(sync['changes']) if sync['changes'] else None - results.append(sync) - return results - - -# === Activity Log (Enhanced Audit System) === - -class ActivityLog: - """Enhanced activity tracking for comprehensive audit trail.""" - - # Action categories for better organization - CATEGORIES = { - 'design_system': ['extract_tokens', 'extract_components', 'sync_tokens', 'validate_tokens'], - 'code': ['analyze_components', 'find_inline_styles', 'generate_code', 'get_quick_wins'], - 'configuration': ['config_updated', 'figma_token_updated', 'mode_changed', 'service_configured'], - 'project': ['project_created', 'project_updated', 'project_deleted'], - 'team': ['team_context_changed', 'project_context_changed'], - 'storybook': ['scan_storybook', 'generate_story', 'generate_theme'] - } - - @staticmethod - def log(action: str, - entity_type: str = None, - entity_id: str = None, - entity_name: str = None, - project_id: str = None, - user_id: str = None, - user_name: str = None, - team_context: str = None, - description: str = None, - category: str = None, - severity: str = 'info', - details: Dict = None, - ip_address: str = None, - user_agent: str = None): - """ - Log an activity with enhanced audit information. - - Args: - action: Action performed (e.g., 'project_created', 'tokens_extracted') - entity_type: Type of entity affected (e.g., 'project', 'component') - entity_id: ID of the affected entity - entity_name: Human-readable name of the entity - project_id: Project context - user_id: User who performed the action - user_name: Human-readable user name - team_context: Team context (ui, ux, qa, all) - description: Human-readable description of the action - category: Category (design_system, code, configuration, etc.) - severity: info, warning, critical - details: Additional JSON details - ip_address: Client IP for security audit - user_agent: Browser/client information - """ - # Auto-detect category if not provided - if not category: - for cat, actions in ActivityLog.CATEGORIES.items(): - if action in actions: - category = cat - break - if not category: - category = 'other' - - # Generate description if not provided - if not description: - description = ActivityLog._generate_description(action, entity_type, entity_name, details) - - with get_connection() as conn: - conn.execute(""" - INSERT INTO activity_log ( - project_id, user_id, user_name, team_context, - action, entity_type, entity_id, entity_name, - category, severity, description, details, - ip_address, user_agent - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - project_id, user_id, user_name, team_context, - action, entity_type, entity_id, entity_name, - category, severity, description, - json.dumps(details) if details else None, - ip_address, user_agent - )) - - @staticmethod - def _generate_description(action: str, entity_type: str, entity_name: str, details: Dict) -> str: - """Generate human-readable description from action data.""" - entity_str = f"{entity_type} '{entity_name}'" if entity_name else (entity_type or "item") - - action_map = { - 'project_created': f"Created project {entity_str}", - 'project_updated': f"Updated {entity_str}", - 'project_deleted': f"Deleted {entity_str}", - 'extract_tokens': f"Extracted design tokens from Figma", - 'extract_components': f"Extracted components from Figma", - 'sync_tokens': f"Synced tokens to file", - 'config_updated': "Updated configuration", - 'figma_token_updated': "Updated Figma API token", - 'team_context_changed': f"Switched to team context", - 'project_context_changed': f"Switched to project {entity_name}", - } - - return action_map.get(action, f"{action.replace('_', ' ').title()}") - - @staticmethod - def recent(project_id: str = None, limit: int = 50, offset: int = 0) -> List[Dict]: - """Get recent activity with pagination.""" - with get_connection() as conn: - cursor = conn.cursor() - if project_id: - cursor.execute( - "SELECT * FROM activity_log WHERE project_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", - (project_id, limit, offset) - ) - else: - cursor.execute( - "SELECT * FROM activity_log ORDER BY created_at DESC LIMIT ? OFFSET ?", - (limit, offset) - ) - results = [] - for row in cursor.fetchall(): - activity = dict(row) - activity['details'] = json.loads(activity['details']) if activity['details'] else None - results.append(activity) - return results - - @staticmethod - def search( - project_id: str = None, - user_id: str = None, - action: str = None, - category: str = None, - entity_type: str = None, - severity: str = None, - start_date: str = None, - end_date: str = None, - limit: int = 100, - offset: int = 0 - ) -> List[Dict]: - """Advanced search/filter for audit logs.""" - conditions = [] - params = [] - - if project_id: - conditions.append("project_id = ?") - params.append(project_id) - if user_id: - conditions.append("user_id = ?") - params.append(user_id) - if action: - conditions.append("action = ?") - params.append(action) - if category: - conditions.append("category = ?") - params.append(category) - if entity_type: - conditions.append("entity_type = ?") - params.append(entity_type) - if severity: - conditions.append("severity = ?") - params.append(severity) - if start_date: - conditions.append("created_at >= ?") - params.append(start_date) - if end_date: - conditions.append("created_at <= ?") - params.append(end_date) - - where_clause = " AND ".join(conditions) if conditions else "1=1" - params.extend([limit, offset]) - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(f""" - SELECT * FROM activity_log - WHERE {where_clause} - ORDER BY created_at DESC - LIMIT ? OFFSET ? - """, params) - - results = [] - for row in cursor.fetchall(): - activity = dict(row) - activity['details'] = json.loads(activity['details']) if activity['details'] else None - results.append(activity) - return results - - @staticmethod - def count( - project_id: str = None, - user_id: str = None, - action: str = None, - category: str = None - ) -> int: - """Count activities matching filters.""" - conditions = [] - params = [] - - if project_id: - conditions.append("project_id = ?") - params.append(project_id) - if user_id: - conditions.append("user_id = ?") - params.append(user_id) - if action: - conditions.append("action = ?") - params.append(action) - if category: - conditions.append("category = ?") - params.append(category) - - where_clause = " AND ".join(conditions) if conditions else "1=1" - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(f"SELECT COUNT(*) FROM activity_log WHERE {where_clause}", params) - return cursor.fetchone()[0] - - @staticmethod - def get_categories() -> List[str]: - """Get list of all categories used.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT DISTINCT category FROM activity_log WHERE category IS NOT NULL ORDER BY category") - return [row[0] for row in cursor.fetchall()] - - @staticmethod - def get_actions() -> List[str]: - """Get list of all actions used.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT DISTINCT action FROM activity_log ORDER BY action") - return [row[0] for row in cursor.fetchall()] - - @staticmethod - def get_stats_by_category() -> Dict[str, int]: - """Get activity count by category.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT category, COUNT(*) as count - FROM activity_log - GROUP BY category - ORDER BY count DESC - """) - return {row[0]: row[1] for row in cursor.fetchall()} - - @staticmethod - def get_stats_by_user() -> Dict[str, int]: - """Get activity count by user.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT COALESCE(user_name, user_id, 'Unknown') as user, COUNT(*) as count - FROM activity_log - GROUP BY user_name, user_id - ORDER BY count DESC - """) - return {row[0]: row[1] for row in cursor.fetchall()} - - -# === Team Dashboard Operations === - -class FigmaFiles: - """Figma file management for UX Dashboard.""" - - @staticmethod - def create(project_id: str, figma_url: str, file_name: str, file_key: str) -> Dict: - """Add a Figma file to a project.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO figma_files (project_id, figma_url, file_name, file_key) - VALUES (?, ?, ?, ?) - """, (project_id, figma_url, file_name, file_key)) - file_id = cursor.lastrowid - return FigmaFiles.get(file_id) - - @staticmethod - def get(file_id: int) -> Optional[Dict]: - """Get a specific Figma file.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM figma_files WHERE id = ?", (file_id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list(project_id: str) -> List[Dict]: - """List all Figma files for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM figma_files WHERE project_id = ? ORDER BY created_at DESC", - (project_id,) - ) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def update_sync_status(file_id: int, status: str, last_synced: str = None) -> Dict: - """Update sync status of a Figma file.""" - with get_connection() as conn: - if last_synced: - conn.execute( - "UPDATE figma_files SET sync_status = ?, last_synced = ? WHERE id = ?", - (status, last_synced, file_id) - ) - else: - conn.execute( - "UPDATE figma_files SET sync_status = ? WHERE id = ?", - (status, file_id) - ) - return FigmaFiles.get(file_id) - - @staticmethod - def delete(file_id: int) -> bool: - """Delete a Figma file.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM figma_files WHERE id = ?", (file_id,)) - return cursor.rowcount > 0 - - -class ESREDefinitions: - """ESRE (Expected System Requirements Engineering) definitions for QA Dashboard.""" - - @staticmethod - def create(project_id: str, name: str, definition_text: str, expected_value: str = None, component_name: str = None) -> Dict: - """Create a new ESRE definition.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO esre_definitions (project_id, name, definition_text, expected_value, component_name) - VALUES (?, ?, ?, ?, ?) - """, (project_id, name, definition_text, expected_value, component_name)) - esre_id = cursor.lastrowid - return ESREDefinitions.get(esre_id) - - @staticmethod - def get(esre_id: int) -> Optional[Dict]: - """Get a specific ESRE definition.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM esre_definitions WHERE id = ?", (esre_id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list(project_id: str) -> List[Dict]: - """List all ESRE definitions for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM esre_definitions WHERE project_id = ? ORDER BY created_at DESC", - (project_id,) - ) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def update(esre_id: int, **kwargs) -> Optional[Dict]: - """Update an ESRE definition.""" - if not kwargs: - return ESREDefinitions.get(esre_id) - - fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) - values = list(kwargs.values()) + [esre_id] - - with get_connection() as conn: - conn.execute( - f"UPDATE esre_definitions SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - values - ) - return ESREDefinitions.get(esre_id) - - @staticmethod - def delete(esre_id: int) -> bool: - """Delete an ESRE definition.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM esre_definitions WHERE id = ?", (esre_id,)) - return cursor.rowcount > 0 - - -class TokenDriftDetector: - """Token drift tracking for UI Dashboard.""" - - @staticmethod - def record_drift(component_id: str, property_name: str, hardcoded_value: str, - file_path: str, line_number: int, severity: str = "warning", - suggested_token: str = None) -> Dict: - """Record a token drift issue.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO token_drift (component_id, property_name, hardcoded_value, - suggested_token, severity, file_path, line_number) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, (component_id, property_name, hardcoded_value, suggested_token, - severity, file_path, line_number)) - drift_id = cursor.lastrowid - return TokenDriftDetector.get(drift_id) - - @staticmethod - def get(drift_id: int) -> Optional[Dict]: - """Get a specific drift entry.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM token_drift WHERE id = ?", (drift_id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list_by_component(component_id: str, status: str = None) -> List[Dict]: - """List drift issues for a component.""" - with get_connection() as conn: - cursor = conn.cursor() - if status: - cursor.execute( - "SELECT * FROM token_drift WHERE component_id = ? AND status = ? ORDER BY severity, detected_at DESC", - (component_id, status) - ) - else: - cursor.execute( - "SELECT * FROM token_drift WHERE component_id = ? ORDER BY severity, detected_at DESC", - (component_id,) - ) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def list_by_project(project_id: str, severity: str = None) -> List[Dict]: - """List all drift issues for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - query = """ - SELECT td.* FROM token_drift td - JOIN components c ON c.id = td.component_id - WHERE c.project_id = ? - """ - params = [project_id] - - if severity: - query += " AND td.severity = ?" - params.append(severity) - - query += " ORDER BY td.severity, td.detected_at DESC" - cursor.execute(query, params) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def update_status(drift_id: int, status: str) -> Dict: - """Update the status of a drift issue (pending, fixed, ignored).""" - with get_connection() as conn: - conn.execute( - "UPDATE token_drift SET status = ? WHERE id = ?", - (status, drift_id) - ) - return TokenDriftDetector.get(drift_id) - - @staticmethod - def get_stats(project_id: str) -> Dict: - """Get drift statistics for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - - # Total drift count by severity - cursor.execute(""" - SELECT td.severity, COUNT(*) as count - FROM token_drift td - JOIN components c ON c.id = td.component_id - WHERE c.project_id = ? AND td.status = 'pending' - GROUP BY td.severity - """, (project_id,)) - by_severity = {row[0]: row[1] for row in cursor.fetchall()} - - # Total count - cursor.execute(""" - SELECT COUNT(*) FROM token_drift td - JOIN components c ON c.id = td.component_id - WHERE c.project_id = ? AND td.status = 'pending' - """, (project_id,)) - total = cursor.fetchone()[0] - - return { - "total": total, - "by_severity": by_severity - } - - -class CodeMetrics: - """Code metrics tracking for UI Dashboard.""" - - @staticmethod - def record_metrics(component_id: str, file_path: str, sloc: int = 0, - complexity_score: float = 0.0, prop_count: int = 0, - has_tests: bool = False, test_coverage: float = 0.0) -> Dict: - """Record code metrics for a component.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT OR REPLACE INTO code_metrics - (component_id, file_path, sloc, complexity_score, prop_count, has_tests, test_coverage, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) - """, (component_id, file_path, sloc, complexity_score, prop_count, - 1 if has_tests else 0, test_coverage)) - metric_id = cursor.lastrowid - - # Return the inserted/updated record - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM code_metrics WHERE id = ?", (metric_id,)) - row = cursor.fetchone() - return dict(row) if row else None - - @staticmethod - def list_by_component(component_id: str) -> List[Dict]: - """Get all metrics for a component.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM code_metrics WHERE component_id = ? ORDER BY updated_at DESC", - (component_id,) - ) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def get_project_summary(project_id: str) -> Dict: - """Get aggregated code metrics for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT - COUNT(DISTINCT cm.component_id) as component_count, - SUM(cm.sloc) as total_sloc, - AVG(cm.complexity_score) as avg_complexity, - AVG(cm.test_coverage) as avg_coverage, - SUM(cm.has_tests) as components_with_tests - FROM code_metrics cm - JOIN components c ON c.id = cm.component_id - WHERE c.project_id = ? - """, (project_id,)) - row = cursor.fetchone() - if row: - return { - "component_count": row[0] or 0, - "total_sloc": row[1] or 0, - "avg_complexity": round(row[2] or 0.0, 2), - "avg_coverage": round(row[3] or 0.0, 2), - "components_with_tests": row[4] or 0 - } - return {} - - -class TestResults: - """Test result tracking for QA Dashboard.""" - - @staticmethod - def record_test(component_id: str, test_type: str, passed: bool, - score: float = None, failures: List[str] = None) -> Dict: - """Record a test result.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO test_results (component_id, test_type, passed, score, failures) - VALUES (?, ?, ?, ?, ?) - """, (component_id, test_type, 1 if passed else 0, score, - json.dumps(failures) if failures else None)) - result_id = cursor.lastrowid - - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM test_results WHERE id = ?", (result_id,)) - row = cursor.fetchone() - if row: - result = dict(row) - result['failures'] = json.loads(result['failures']) if result['failures'] else [] - return result - return None - - @staticmethod - def list_by_component(component_id: str, test_type: str = None) -> List[Dict]: - """Get test results for a component.""" - with get_connection() as conn: - cursor = conn.cursor() - if test_type: - cursor.execute( - "SELECT * FROM test_results WHERE component_id = ? AND test_type = ? ORDER BY run_at DESC", - (component_id, test_type) - ) - else: - cursor.execute( - "SELECT * FROM test_results WHERE component_id = ? ORDER BY run_at DESC", - (component_id,) - ) - results = [] - for row in cursor.fetchall(): - result = dict(row) - result['failures'] = json.loads(result['failures']) if result['failures'] else [] - results.append(result) - return results - - @staticmethod - def get_project_summary(project_id: str) -> Dict: - """Get test summary for a project.""" - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT - COUNT(*) as total_tests, - SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_tests, - AVG(score) as avg_score - FROM test_results tr - JOIN components c ON c.id = tr.component_id - WHERE c.project_id = ? - """, (project_id,)) - row = cursor.fetchone() - if row: - total = row[0] or 0 - passed = row[1] or 0 - return { - "total_tests": total, - "passed_tests": passed, - "failed_tests": total - passed, - "pass_rate": round((passed / total * 100) if total > 0 else 0, 2), - "avg_score": round(row[2] or 0.0, 2) - } - return {} - - -# === Teams & RBAC === - -class Teams: - """Team and role management.""" - - @staticmethod - def create(id: str, name: str, description: str = "") -> Dict: - with get_connection() as conn: - conn.execute( - "INSERT INTO teams (id, name, description) VALUES (?, ?, ?)", - (id, name, description) - ) - return Teams.get(id) - - @staticmethod - def get(id: str) -> Optional[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM teams WHERE id = ?", (id,)) - row = cursor.fetchone() - if row: - team = dict(row) - team['settings'] = json.loads(team['settings']) if team['settings'] else {} - return team - return None - - @staticmethod - def list() -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM teams ORDER BY name") - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def add_member(team_id: str, user_id: str, role: str): - with get_connection() as conn: - conn.execute( - "INSERT OR REPLACE INTO team_members (team_id, user_id, role) VALUES (?, ?, ?)", - (team_id, user_id, role) - ) - - @staticmethod - def get_members(team_id: str) -> List[Dict]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT u.*, tm.role, tm.joined_at - FROM team_members tm - JOIN users u ON u.id = tm.user_id - WHERE tm.team_id = ? - ORDER BY tm.role, u.name - """, (team_id,)) - return [dict(row) for row in cursor.fetchall()] - - @staticmethod - def get_user_role(team_id: str, user_id: str) -> Optional[str]: - with get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT role FROM team_members WHERE team_id = ? AND user_id = ?", - (team_id, user_id) - ) - row = cursor.fetchone() - return row[0] if row else None - - -# === Database Stats === - -def get_stats() -> Dict: - """Get database statistics.""" - with get_connection() as conn: - cursor = conn.cursor() - - stats = {} - - # Table counts - tables = ['projects', 'components', 'styles', 'sync_history', 'activity_log', 'teams', 'users', 'figma_cache'] - for table in tables: - cursor.execute(f"SELECT COUNT(*) FROM {table}") - stats[table] = cursor.fetchone()[0] - - # Database file size - if DB_PATH.exists(): - stats['db_size_mb'] = round(DB_PATH.stat().st_size / (1024 * 1024), 2) - - # Cache stats - now = int(time.time()) - cursor.execute("SELECT COUNT(*) FROM figma_cache WHERE expires_at > ?", (now,)) - stats['cache_valid'] = cursor.fetchone()[0] - - return stats - - -# Initialize on import -init_database() - - -# === CLI for testing === -if __name__ == "__main__": - import sys - - if len(sys.argv) > 1: - cmd = sys.argv[1] - - if cmd == "stats": - print(json.dumps(get_stats(), indent=2)) - - elif cmd == "init": - init_database() - print("Database initialized") - - elif cmd == "cache-test": - Cache.set("test_key", {"foo": "bar"}, ttl=60) - print(f"Set: test_key") - print(f"Get: {Cache.get('test_key')}") - - elif cmd == "clear-cache": - Cache.clear_all() - print("Cache cleared") - - else: - print("Usage: python database.py [stats|init|cache-test|clear-cache]") - print(f"\nDatabase: {DB_PATH}") - print(f"Stats: {json.dumps(get_stats(), indent=2)}") diff --git a/tools/storage/json_store.py b/tools/storage/json_store.py new file mode 100644 index 0000000..9281f98 --- /dev/null +++ b/tools/storage/json_store.py @@ -0,0 +1,1026 @@ +""" +DSS JSON Storage Layer + +Pure JSON file-based storage following DSS canonical structure. +No SQLite - everything is JSON for git-friendly diffs. + +Structure: +.dss/data/ +├── _system/ # DSS internal (config, cache, activity) +├── projects/ # Per-project data (tokens, components, etc.) +└── teams/ # Team definitions +""" + +import json +import time +import hashlib +import fcntl +from pathlib import Path +from datetime import datetime, date +from typing import Optional, Dict, List, Any, Union +from contextlib import contextmanager +from dataclasses import dataclass, asdict, field +import uuid +import os + +# Base paths +DATA_DIR = Path(__file__).parent.parent.parent / ".dss" / "data" +SYSTEM_DIR = DATA_DIR / "_system" +PROJECTS_DIR = DATA_DIR / "projects" +TEAMS_DIR = DATA_DIR / "teams" + +# Ensure directories exist +for d in [DATA_DIR, SYSTEM_DIR, SYSTEM_DIR / "cache", SYSTEM_DIR / "activity", PROJECTS_DIR, TEAMS_DIR]: + d.mkdir(parents=True, exist_ok=True) + + +# === File Locking Utilities === + +@contextmanager +def file_lock(path: Path, exclusive: bool = True): + """Context manager for file locking.""" + lock_path = path.with_suffix(path.suffix + ".lock") + lock_path.parent.mkdir(parents=True, exist_ok=True) + + with open(lock_path, 'w') as lock_file: + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH) + yield + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + + +def read_json(path: Path, default: Any = None) -> Any: + """Read JSON file with locking.""" + if not path.exists(): + return default + + with file_lock(path, exclusive=False): + try: + return json.loads(path.read_text()) + except (json.JSONDecodeError, IOError): + return default + + +def write_json(path: Path, data: Any, indent: int = 2) -> None: + """Write JSON file with locking.""" + path.parent.mkdir(parents=True, exist_ok=True) + + with file_lock(path, exclusive=True): + path.write_text(json.dumps(data, indent=indent, default=str)) + + +def append_jsonl(path: Path, record: Dict) -> None: + """Append to JSON Lines file.""" + path.parent.mkdir(parents=True, exist_ok=True) + + with file_lock(path, exclusive=True): + with open(path, 'a') as f: + f.write(json.dumps(record, default=str) + '\n') + + +def read_jsonl(path: Path, limit: int = None, offset: int = 0) -> List[Dict]: + """Read JSON Lines file with pagination.""" + if not path.exists(): + return [] + + records = [] + with file_lock(path, exclusive=False): + with open(path, 'r') as f: + lines = f.readlines() + + # Reverse for newest first + lines = list(reversed(lines)) + + for i, line in enumerate(lines): + if i < offset: + continue + if limit and len(records) >= limit: + break + try: + records.append(json.loads(line.strip())) + except json.JSONDecodeError: + continue + + return records + + +# === Cache (TTL-based) === + +class Cache: + """TTL-based cache using JSON files.""" + + CACHE_DIR = SYSTEM_DIR / "cache" + DEFAULT_TTL = 300 # 5 minutes + + @staticmethod + def _key_to_path(key: str) -> Path: + """Convert cache key to file path.""" + key_hash = hashlib.md5(key.encode()).hexdigest() + return Cache.CACHE_DIR / f"{key_hash}.json" + + @staticmethod + def set(key: str, value: Any, ttl: int = None) -> None: + """Store a value with TTL.""" + ttl = ttl or Cache.DEFAULT_TTL + data = { + "key": key, + "value": value, + "created_at": int(time.time()), + "expires_at": int(time.time()) + ttl + } + write_json(Cache._key_to_path(key), data) + + @staticmethod + def get(key: str) -> Optional[Any]: + """Get a value if not expired.""" + path = Cache._key_to_path(key) + data = read_json(path) + + if not data: + return None + + if data.get("expires_at", 0) <= int(time.time()): + path.unlink(missing_ok=True) + return None + + return data.get("value") + + @staticmethod + def delete(key: str) -> None: + """Delete a cache entry.""" + Cache._key_to_path(key).unlink(missing_ok=True) + + @staticmethod + def clear_expired() -> int: + """Remove all expired entries.""" + count = 0 + now = int(time.time()) + + for path in Cache.CACHE_DIR.glob("*.json"): + data = read_json(path) + if data and data.get("expires_at", 0) <= now: + path.unlink(missing_ok=True) + count += 1 + + return count + + @staticmethod + def clear_all() -> None: + """Clear entire cache.""" + for path in Cache.CACHE_DIR.glob("*.json"): + path.unlink(missing_ok=True) + + +# === Projects === + +class Projects: + """Project CRUD operations using JSON files.""" + + @staticmethod + def _project_dir(project_id: str) -> Path: + return PROJECTS_DIR / project_id + + @staticmethod + def _manifest_path(project_id: str) -> Path: + return Projects._project_dir(project_id) / "manifest.json" + + @staticmethod + def _init_project_structure(project_id: str) -> None: + """Initialize project folder structure.""" + base = Projects._project_dir(project_id) + for subdir in ["tokens", "components", "styles", "figma", "metrics"]: + (base / subdir).mkdir(parents=True, exist_ok=True) + + @staticmethod + def create(id: str, name: str, description: str = "", figma_file_key: str = "") -> Dict: + """Create a new project.""" + Projects._init_project_structure(id) + + now = datetime.utcnow().isoformat() + manifest = { + "id": id, + "name": name, + "description": description, + "figma_file_key": figma_file_key, + "status": "active", + "created_at": now, + "updated_at": now + } + + write_json(Projects._manifest_path(id), manifest) + + # Initialize empty token files for canonical structure + for token_type in ["colors", "spacing", "typography", "borders", "shadows", "motion"]: + token_path = Projects._project_dir(id) / "tokens" / f"{token_type}.json" + if not token_path.exists(): + write_json(token_path, {"$type": token_type, "tokens": {}}) + + return manifest + + @staticmethod + def get(id: str) -> Optional[Dict]: + """Get project by ID.""" + return read_json(Projects._manifest_path(id)) + + @staticmethod + def list(status: str = None) -> List[Dict]: + """List all projects.""" + projects = [] + + for project_dir in PROJECTS_DIR.iterdir(): + if project_dir.is_dir() and not project_dir.name.startswith("_"): + manifest = read_json(project_dir / "manifest.json") + if manifest: + if status is None or manifest.get("status") == status: + projects.append(manifest) + + # Sort by updated_at descending + projects.sort(key=lambda p: p.get("updated_at", ""), reverse=True) + return projects + + @staticmethod + def update(id: str, **kwargs) -> Optional[Dict]: + """Update project fields.""" + manifest = Projects.get(id) + if not manifest: + return None + + manifest.update(kwargs) + manifest["updated_at"] = datetime.utcnow().isoformat() + + write_json(Projects._manifest_path(id), manifest) + return manifest + + @staticmethod + def delete(id: str) -> bool: + """Delete a project (moves to _archived).""" + project_dir = Projects._project_dir(id) + if not project_dir.exists(): + return False + + # Move to archived instead of hard delete + archived_dir = PROJECTS_DIR / "_archived" + archived_dir.mkdir(exist_ok=True) + + import shutil + shutil.move(str(project_dir), str(archived_dir / f"{id}_{int(time.time())}")) + return True + + +# === Components === + +class Components: + """Component operations using JSON files.""" + + @staticmethod + def _components_dir(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "components" + + @staticmethod + def _component_path(project_id: str, component_id: str) -> Path: + # Sanitize component name for filesystem + safe_name = component_id.replace("/", "_").replace("\\", "_") + return Components._components_dir(project_id) / f"{safe_name}.json" + + @staticmethod + def upsert(project_id: str, components: List[Dict]) -> int: + """Bulk upsert components.""" + count = 0 + now = datetime.utcnow().isoformat() + + for comp in components: + comp_id = comp.get("id") or f"{project_id}-{comp['name']}" + + existing = Components.get(comp_id, project_id) + + component_data = { + "id": comp_id, + "project_id": project_id, + "name": comp["name"], + "figma_key": comp.get("figma_key") or comp.get("key"), + "description": comp.get("description", ""), + "properties": comp.get("properties", {}), + "variants": comp.get("variants", []), + "code_generated": comp.get("code_generated", False), + "created_at": existing.get("created_at", now) if existing else now, + "updated_at": now + } + + write_json(Components._component_path(project_id, comp_id), component_data) + count += 1 + + return count + + @staticmethod + def list(project_id: str) -> List[Dict]: + """List all components for a project.""" + components = [] + comp_dir = Components._components_dir(project_id) + + if not comp_dir.exists(): + return [] + + for path in comp_dir.glob("*.json"): + comp = read_json(path) + if comp: + components.append(comp) + + components.sort(key=lambda c: c.get("name", "")) + return components + + @staticmethod + def get(id: str, project_id: str = None) -> Optional[Dict]: + """Get component by ID.""" + if project_id: + return read_json(Components._component_path(project_id, id)) + + # Search all projects + for project_dir in PROJECTS_DIR.iterdir(): + if project_dir.is_dir(): + comp = read_json(Components._component_path(project_dir.name, id)) + if comp: + return comp + + return None + + +# === Tokens === + +class Tokens: + """Token operations following DSS canonical structure.""" + + CANONICAL_TYPES = ["colors", "spacing", "typography", "borders", "shadows", "motion"] + + @staticmethod + def _tokens_dir(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "tokens" + + @staticmethod + def get_all(project_id: str) -> Dict[str, Dict]: + """Get all tokens for a project, organized by type.""" + tokens = {} + tokens_dir = Tokens._tokens_dir(project_id) + + for token_type in Tokens.CANONICAL_TYPES: + path = tokens_dir / f"{token_type}.json" + data = read_json(path, {"$type": token_type, "tokens": {}}) + tokens[token_type] = data.get("tokens", {}) + + return tokens + + @staticmethod + def get_by_type(project_id: str, token_type: str) -> Dict: + """Get tokens of a specific type.""" + path = Tokens._tokens_dir(project_id) / f"{token_type}.json" + data = read_json(path, {"$type": token_type, "tokens": {}}) + return data.get("tokens", {}) + + @staticmethod + def set_by_type(project_id: str, token_type: str, tokens: Dict) -> None: + """Set tokens of a specific type.""" + path = Tokens._tokens_dir(project_id) / f"{token_type}.json" + write_json(path, { + "$type": token_type, + "updated_at": datetime.utcnow().isoformat(), + "tokens": tokens + }) + + @staticmethod + def merge(project_id: str, token_type: str, new_tokens: Dict, strategy: str = "LAST") -> Dict: + """Merge tokens with strategy.""" + existing = Tokens.get_by_type(project_id, token_type) + + if strategy == "FIRST": + # Keep existing, only add new + merged = {**new_tokens, **existing} + elif strategy == "LAST": + # Override with new + merged = {**existing, **new_tokens} + elif strategy == "MERGE_METADATA": + # Deep merge + merged = existing.copy() + for key, value in new_tokens.items(): + if key in merged and isinstance(merged[key], dict) and isinstance(value, dict): + merged[key] = {**merged[key], **value} + else: + merged[key] = value + else: + merged = {**existing, **new_tokens} + + Tokens.set_by_type(project_id, token_type, merged) + return merged + + +# === Styles === + +class Styles: + """Style operations.""" + + STYLE_TYPES = ["TEXT", "FILL", "EFFECT", "GRID"] + + @staticmethod + def _styles_dir(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "styles" + + @staticmethod + def upsert(project_id: str, style_type: str, styles: List[Dict]) -> int: + """Upsert styles of a given type.""" + path = Styles._styles_dir(project_id) / f"{style_type.lower()}.json" + + existing_data = read_json(path, {"$type": style_type, "styles": []}) + existing_styles = {s["id"]: s for s in existing_data.get("styles", [])} + + now = datetime.utcnow().isoformat() + for style in styles: + style["updated_at"] = now + if style["id"] not in existing_styles: + style["created_at"] = now + existing_styles[style["id"]] = style + + write_json(path, { + "$type": style_type, + "updated_at": now, + "styles": list(existing_styles.values()) + }) + + return len(styles) + + @staticmethod + def list(project_id: str, style_type: str = None) -> List[Dict]: + """List styles, optionally filtered by type.""" + styles = [] + styles_dir = Styles._styles_dir(project_id) + + if not styles_dir.exists(): + return [] + + types_to_check = [style_type.lower()] if style_type else [t.lower() for t in Styles.STYLE_TYPES] + + for st in types_to_check: + path = styles_dir / f"{st}.json" + data = read_json(path) + if data: + styles.extend(data.get("styles", [])) + + return styles + + +# === Sync History === + +class SyncHistory: + """Sync history using JSON Lines.""" + + @staticmethod + def _history_path(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "figma" / "sync-history.jsonl" + + @staticmethod + def start(project_id: str, sync_type: str) -> str: + """Start a sync, returns sync ID.""" + sync_id = str(uuid.uuid4())[:8] + + record = { + "id": sync_id, + "project_id": project_id, + "sync_type": sync_type, + "status": "running", + "started_at": datetime.utcnow().isoformat(), + "completed_at": None, + "items_synced": 0, + "changes": None, + "error_message": None, + "duration_ms": None + } + + append_jsonl(SyncHistory._history_path(project_id), record) + return sync_id + + @staticmethod + def complete(project_id: str, sync_id: str, status: str, items_synced: int = 0, + changes: Dict = None, error: str = None) -> None: + """Complete a sync.""" + path = SyncHistory._history_path(project_id) + records = read_jsonl(path, limit=1000) + + # Find and update the record + completed_at = datetime.utcnow().isoformat() + + for record in records: + if record.get("id") == sync_id: + started = datetime.fromisoformat(record["started_at"]) + duration_ms = int((datetime.utcnow() - started).total_seconds() * 1000) + + # Append completion record + completion = { + "id": sync_id, + "project_id": project_id, + "sync_type": record.get("sync_type"), + "status": status, + "started_at": record["started_at"], + "completed_at": completed_at, + "items_synced": items_synced, + "changes": changes, + "error_message": error, + "duration_ms": duration_ms + } + append_jsonl(path, completion) + break + + @staticmethod + def recent(project_id: str = None, limit: int = 20) -> List[Dict]: + """Get recent sync history.""" + if project_id: + return read_jsonl(SyncHistory._history_path(project_id), limit=limit) + + # Aggregate from all projects + all_records = [] + for project_dir in PROJECTS_DIR.iterdir(): + if project_dir.is_dir() and not project_dir.name.startswith("_"): + records = read_jsonl(project_dir / "figma" / "sync-history.jsonl", limit=limit) + all_records.extend(records) + + # Sort by started_at descending + all_records.sort(key=lambda r: r.get("started_at", ""), reverse=True) + return all_records[:limit] + + +# === Activity Log === + +class ActivityLog: + """Activity logging using daily JSON Lines files.""" + + CATEGORIES = { + 'design_system': ['extract_tokens', 'extract_components', 'sync_tokens', 'validate_tokens'], + 'code': ['analyze_components', 'find_inline_styles', 'generate_code', 'get_quick_wins'], + 'configuration': ['config_updated', 'figma_token_updated', 'mode_changed', 'service_configured'], + 'project': ['project_created', 'project_updated', 'project_deleted'], + 'team': ['team_context_changed', 'project_context_changed'], + 'storybook': ['scan_storybook', 'generate_story', 'generate_theme'] + } + + @staticmethod + def _log_path(day: date = None) -> Path: + day = day or date.today() + return SYSTEM_DIR / "activity" / f"{day.isoformat()}.jsonl" + + @staticmethod + def log(action: str, + entity_type: str = None, + entity_id: str = None, + entity_name: str = None, + project_id: str = None, + user_id: str = None, + user_name: str = None, + team_context: str = None, + description: str = None, + category: str = None, + severity: str = 'info', + details: Dict = None, + ip_address: str = None, + user_agent: str = None) -> None: + """Log an activity.""" + # Auto-detect category + if not category: + for cat, actions in ActivityLog.CATEGORIES.items(): + if action in actions: + category = cat + break + category = category or 'other' + + # Generate description if not provided + if not description: + entity_str = f"{entity_type} '{entity_name}'" if entity_name else (entity_type or "item") + description = f"{action.replace('_', ' ').title()} {entity_str}" + + record = { + "id": str(uuid.uuid4())[:12], + "timestamp": datetime.utcnow().isoformat(), + "action": action, + "entity_type": entity_type, + "entity_id": entity_id, + "entity_name": entity_name, + "project_id": project_id, + "user_id": user_id, + "user_name": user_name, + "team_context": team_context, + "category": category, + "severity": severity, + "description": description, + "details": details, + "ip_address": ip_address, + "user_agent": user_agent + } + + append_jsonl(ActivityLog._log_path(), record) + + @staticmethod + def recent(project_id: str = None, limit: int = 50, offset: int = 0, days: int = 7) -> List[Dict]: + """Get recent activity.""" + all_records = [] + + # Read from recent days + for i in range(days): + day = date.today() - __import__('datetime').timedelta(days=i) + records = read_jsonl(ActivityLog._log_path(day), limit=limit * 2) + + if project_id: + records = [r for r in records if r.get("project_id") == project_id] + + all_records.extend(records) + + # Sort by timestamp descending + all_records.sort(key=lambda r: r.get("timestamp", ""), reverse=True) + + return all_records[offset:offset + limit] + + @staticmethod + def search(project_id: str = None, user_id: str = None, action: str = None, + category: str = None, severity: str = None, days: int = 30, + limit: int = 100, offset: int = 0) -> List[Dict]: + """Search activity logs.""" + all_records = [] + + for i in range(days): + day = date.today() - __import__('datetime').timedelta(days=i) + records = read_jsonl(ActivityLog._log_path(day)) + + for r in records: + if project_id and r.get("project_id") != project_id: + continue + if user_id and r.get("user_id") != user_id: + continue + if action and r.get("action") != action: + continue + if category and r.get("category") != category: + continue + if severity and r.get("severity") != severity: + continue + all_records.append(r) + + all_records.sort(key=lambda r: r.get("timestamp", ""), reverse=True) + return all_records[offset:offset + limit] + + +# === Teams === + +class Teams: + """Team management using JSON files.""" + + @staticmethod + def _team_dir(team_id: str) -> Path: + return TEAMS_DIR / team_id + + @staticmethod + def _manifest_path(team_id: str) -> Path: + return Teams._team_dir(team_id) / "manifest.json" + + @staticmethod + def create(id: str, name: str, description: str = "") -> Dict: + """Create a team.""" + team_dir = Teams._team_dir(id) + team_dir.mkdir(parents=True, exist_ok=True) + + now = datetime.utcnow().isoformat() + manifest = { + "id": id, + "name": name, + "description": description, + "settings": {}, + "created_at": now + } + + write_json(Teams._manifest_path(id), manifest) + write_json(team_dir / "members.json", {"members": []}) + write_json(team_dir / "access.json", {"projects": {}}) + + return manifest + + @staticmethod + def get(id: str) -> Optional[Dict]: + """Get team by ID.""" + return read_json(Teams._manifest_path(id)) + + @staticmethod + def list() -> List[Dict]: + """List all teams.""" + teams = [] + + for team_dir in TEAMS_DIR.iterdir(): + if team_dir.is_dir(): + manifest = read_json(team_dir / "manifest.json") + if manifest: + teams.append(manifest) + + teams.sort(key=lambda t: t.get("name", "")) + return teams + + @staticmethod + def add_member(team_id: str, user_id: str, role: str) -> None: + """Add or update team member.""" + path = Teams._team_dir(team_id) / "members.json" + data = read_json(path, {"members": []}) + + # Update or add + members = data.get("members", []) + for m in members: + if m.get("user_id") == user_id: + m["role"] = role + m["updated_at"] = datetime.utcnow().isoformat() + break + else: + members.append({ + "user_id": user_id, + "role": role, + "joined_at": datetime.utcnow().isoformat() + }) + + data["members"] = members + write_json(path, data) + + @staticmethod + def get_members(team_id: str) -> List[Dict]: + """Get team members.""" + path = Teams._team_dir(team_id) / "members.json" + data = read_json(path, {"members": []}) + return data.get("members", []) + + @staticmethod + def get_user_role(team_id: str, user_id: str) -> Optional[str]: + """Get user's role in team.""" + members = Teams.get_members(team_id) + for m in members: + if m.get("user_id") == user_id: + return m.get("role") + return None + + @staticmethod + def set_project_access(team_id: str, project_id: str, access_level: str) -> None: + """Set team's access level to a project.""" + path = Teams._team_dir(team_id) / "access.json" + data = read_json(path, {"projects": {}}) + + data["projects"][project_id] = { + "access_level": access_level, + "granted_at": datetime.utcnow().isoformat() + } + + write_json(path, data) + + +# === Figma Files === + +class FigmaFiles: + """Figma file management.""" + + @staticmethod + def _files_path(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "figma" / "files.json" + + @staticmethod + def create(project_id: str, figma_url: str, file_name: str, file_key: str) -> Dict: + """Add a Figma file to project.""" + path = FigmaFiles._files_path(project_id) + data = read_json(path, {"files": []}) + + file_id = str(uuid.uuid4())[:8] + now = datetime.utcnow().isoformat() + + new_file = { + "id": file_id, + "project_id": project_id, + "figma_url": figma_url, + "file_name": file_name, + "file_key": file_key, + "sync_status": "pending", + "last_synced": None, + "created_at": now + } + + data["files"].append(new_file) + write_json(path, data) + + return new_file + + @staticmethod + def list(project_id: str) -> List[Dict]: + """List Figma files for project.""" + data = read_json(FigmaFiles._files_path(project_id), {"files": []}) + return data.get("files", []) + + @staticmethod + def update_sync_status(project_id: str, file_id: str, status: str) -> Optional[Dict]: + """Update sync status.""" + path = FigmaFiles._files_path(project_id) + data = read_json(path, {"files": []}) + + for f in data.get("files", []): + if f.get("id") == file_id: + f["sync_status"] = status + if status == "synced": + f["last_synced"] = datetime.utcnow().isoformat() + write_json(path, data) + return f + + return None + + +# === Metrics === + +class CodeMetrics: + """Code metrics storage.""" + + @staticmethod + def _metrics_path(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "metrics" / "code.json" + + @staticmethod + def record(project_id: str, component_id: str, metrics: Dict) -> None: + """Record code metrics for component.""" + path = CodeMetrics._metrics_path(project_id) + data = read_json(path, {"components": {}}) + + metrics["updated_at"] = datetime.utcnow().isoformat() + data["components"][component_id] = metrics + + write_json(path, data) + + @staticmethod + def get(project_id: str, component_id: str = None) -> Union[Dict, List[Dict]]: + """Get metrics.""" + data = read_json(CodeMetrics._metrics_path(project_id), {"components": {}}) + + if component_id: + return data["components"].get(component_id) + return data["components"] + + +class TestResults: + """Test results storage.""" + + @staticmethod + def _results_path(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "metrics" / "tests.json" + + @staticmethod + def record(project_id: str, component_id: str, test_type: str, + passed: bool, score: float = None, failures: List[str] = None) -> Dict: + """Record test result.""" + path = TestResults._results_path(project_id) + data = read_json(path, {"results": []}) + + result = { + "id": str(uuid.uuid4())[:8], + "component_id": component_id, + "test_type": test_type, + "passed": passed, + "score": score, + "failures": failures or [], + "run_at": datetime.utcnow().isoformat() + } + + data["results"].append(result) + write_json(path, data) + + return result + + @staticmethod + def list(project_id: str, component_id: str = None, test_type: str = None) -> List[Dict]: + """List test results.""" + data = read_json(TestResults._results_path(project_id), {"results": []}) + results = data.get("results", []) + + if component_id: + results = [r for r in results if r.get("component_id") == component_id] + if test_type: + results = [r for r in results if r.get("test_type") == test_type] + + results.sort(key=lambda r: r.get("run_at", ""), reverse=True) + return results + + +class TokenDrift: + """Token drift tracking.""" + + @staticmethod + def _drift_path(project_id: str) -> Path: + return PROJECTS_DIR / project_id / "metrics" / "drift.json" + + @staticmethod + def record(project_id: str, component_id: str, property_name: str, + hardcoded_value: str, file_path: str, line_number: int, + severity: str = "warning", suggested_token: str = None) -> Dict: + """Record token drift.""" + path = TokenDrift._drift_path(project_id) + data = read_json(path, {"drift": []}) + + drift = { + "id": str(uuid.uuid4())[:8], + "component_id": component_id, + "property_name": property_name, + "hardcoded_value": hardcoded_value, + "suggested_token": suggested_token, + "severity": severity, + "file_path": file_path, + "line_number": line_number, + "status": "pending", + "detected_at": datetime.utcnow().isoformat() + } + + data["drift"].append(drift) + write_json(path, data) + + return drift + + @staticmethod + def list(project_id: str, status: str = None, severity: str = None) -> List[Dict]: + """List drift issues.""" + data = read_json(TokenDrift._drift_path(project_id), {"drift": []}) + drift = data.get("drift", []) + + if status: + drift = [d for d in drift if d.get("status") == status] + if severity: + drift = [d for d in drift if d.get("severity") == severity] + + return drift + + @staticmethod + def update_status(project_id: str, drift_id: str, status: str) -> Optional[Dict]: + """Update drift status.""" + path = TokenDrift._drift_path(project_id) + data = read_json(path, {"drift": []}) + + for d in data.get("drift", []): + if d.get("id") == drift_id: + d["status"] = status + write_json(path, data) + return d + + return None + + +# === Stats === + +def get_stats() -> Dict: + """Get storage statistics.""" + stats = { + "projects": len(list(PROJECTS_DIR.iterdir())) - 1 if PROJECTS_DIR.exists() else 0, # -1 for _archived + "teams": len(list(TEAMS_DIR.iterdir())) if TEAMS_DIR.exists() else 0, + "cache_files": len(list((SYSTEM_DIR / "cache").glob("*.json"))) if (SYSTEM_DIR / "cache").exists() else 0, + "activity_days": len(list((SYSTEM_DIR / "activity").glob("*.jsonl"))) if (SYSTEM_DIR / "activity").exists() else 0, + } + + # Calculate total size + total_size = 0 + for path in DATA_DIR.rglob("*"): + if path.is_file(): + total_size += path.stat().st_size + + stats["total_size_mb"] = round(total_size / (1024 * 1024), 2) + + return stats + + +# === Initialization === + +def init_storage() -> None: + """Initialize storage directories.""" + for d in [DATA_DIR, SYSTEM_DIR, SYSTEM_DIR / "cache", SYSTEM_DIR / "activity", PROJECTS_DIR, TEAMS_DIR]: + d.mkdir(parents=True, exist_ok=True) + + print(f"[Storage] JSON storage initialized at {DATA_DIR}") + + +# Initialize on import +init_storage() + + +# === CLI === + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cmd = sys.argv[1] + + if cmd == "stats": + print(json.dumps(get_stats(), indent=2)) + + elif cmd == "init": + init_storage() + print("Storage initialized") + + elif cmd == "cache-test": + Cache.set("test_key", {"foo": "bar"}, ttl=60) + print(f"Set: test_key") + print(f"Get: {Cache.get('test_key')}") + + elif cmd == "clear-cache": + Cache.clear_all() + print("Cache cleared") + + else: + print("Usage: python json_store.py [stats|init|cache-test|clear-cache]") + print(f"\nData directory: {DATA_DIR}") + print(f"Stats: {json.dumps(get_stats(), indent=2)}")