Files
dss/dss/mcp/context/project_context.py
Bruno Sarlo 41fba59bf7 Major refactor: Consolidate DSS into unified package structure
- Create new dss/ Python package at project root
- Move MCP core from tools/dss_mcp/ to dss/mcp/
- Move storage layer from tools/storage/ to dss/storage/
- Move domain logic from dss-mvp1/dss/ to dss/
- Move services from tools/api/services/ to dss/services/
- Move API server to apps/api/
- Move CLI to apps/cli/
- Move Storybook assets to storybook/
- Create unified dss/__init__.py with comprehensive exports
- Merge configuration into dss/settings.py (Pydantic-based)
- Create pyproject.toml for proper package management
- Update startup scripts for new paths
- Remove old tools/ and dss-mvp1/ directories

Architecture changes:
- DSS is now MCP-first with 40+ tools for Claude Code
- Clean imports: from dss import Projects, Components, FigmaToolSuite
- No more sys.path.insert() hacking
- apps/ contains thin application wrappers (API, CLI)
- Single unified Python package for all DSS logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 12:46:43 -03:00

444 lines
14 KiB
Python

"""
Project Context Manager
Provides cached, project-isolated context for Claude MCP sessions.
Loads all relevant project data (components, tokens, config, health, etc.)
and caches it for performance.
"""
import json
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, List
from pathlib import Path
# Import from existing DSS modules
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from storage.json_store import Projects, Components, Tokens
from analyze.scanner import ProjectScanner
from ..config import mcp_config
@dataclass
class ProjectContext:
"""Complete project context for MCP sessions"""
project_id: str
name: str
description: Optional[str]
path: Optional[Path]
# Component data
components: List[Dict[str, Any]]
component_count: int
# Token/Style data
tokens: Dict[str, Any]
styles: List[Dict[str, Any]]
# Project configuration
config: Dict[str, Any]
# User's enabled integrations (user-scoped)
integrations: Dict[str, Any]
# Project health & metrics
health: Dict[str, Any]
stats: Dict[str, Any]
# Discovery/scan results
discovery: Dict[str, Any]
# Metadata
loaded_at: datetime
cache_expires_at: datetime
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
data = asdict(self)
data['loaded_at'] = self.loaded_at.isoformat()
data['cache_expires_at'] = self.cache_expires_at.isoformat()
if self.path:
data['path'] = str(self.path)
return data
def is_expired(self) -> bool:
"""Check if cache has expired"""
return datetime.now() >= self.cache_expires_at
class ProjectContextManager:
"""
Manages project contexts with TTL-based caching.
Provides fast access to project data for MCP tools while ensuring
data freshness and project isolation.
"""
def __init__(self):
self._cache: Dict[str, ProjectContext] = {}
self._cache_ttl = timedelta(seconds=mcp_config.CONTEXT_CACHE_TTL)
async def get_context(
self,
project_id: str,
user_id: Optional[int] = None,
force_refresh: bool = False
) -> Optional[ProjectContext]:
"""
Get project context, using cache if available.
Args:
project_id: Project ID
user_id: User ID for loading user-scoped integrations
force_refresh: Force cache refresh
Returns:
ProjectContext or None if project not found
"""
# Check cache first
cache_key = f"{project_id}:{user_id or 'anonymous'}"
if not force_refresh and cache_key in self._cache:
ctx = self._cache[cache_key]
if not ctx.is_expired():
return ctx
# Load fresh context
context = await self._load_context(project_id, user_id)
if context:
self._cache[cache_key] = context
return context
async def _load_context(
self,
project_id: str,
user_id: Optional[int] = None
) -> Optional[ProjectContext]:
"""Load complete project context from database and filesystem"""
# Run database queries in thread pool to avoid blocking
loop = asyncio.get_event_loop()
# Load project metadata
project = await loop.run_in_executor(None, self._load_project, project_id)
if not project:
return None
# Load components, styles, stats in parallel
components_task = loop.run_in_executor(None, self._load_components, project_id)
styles_task = loop.run_in_executor(None, self._load_styles, project_id)
stats_task = loop.run_in_executor(None, self._load_stats, project_id)
integrations_task = loop.run_in_executor(None, self._load_integrations, project_id, user_id)
components = await components_task
styles = await styles_task
stats = await stats_task
integrations = await integrations_task
# Load tokens from filesystem if project has a path
tokens = {}
project_path = None
if project.get('figma_file_key'):
# Try to find project path based on naming convention
# (This can be enhanced based on actual project structure)
project_path = Path.cwd()
tokens = await loop.run_in_executor(None, self._load_tokens, project_path)
# Load discovery/scan data
discovery = await loop.run_in_executor(None, self._load_discovery, project_path)
# Compute health score
health = self._compute_health(components, tokens, stats)
# Build context
now = datetime.now()
context = ProjectContext(
project_id=project_id,
name=project['name'],
description=project.get('description'),
path=project_path,
components=components,
component_count=len(components),
tokens=tokens,
styles=styles,
config={
'figma_file_key': project.get('figma_file_key'),
'status': project.get('status', 'active')
},
integrations=integrations,
health=health,
stats=stats,
discovery=discovery,
loaded_at=now,
cache_expires_at=now + self._cache_ttl
)
return context
def _load_project(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Load project metadata from database"""
try:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM projects WHERE id = ?",
(project_id,)
).fetchone()
if row:
return dict(row)
return None
except Exception as e:
print(f"Error loading project: {e}")
return None
def _load_components(self, project_id: str) -> List[Dict[str, Any]]:
"""Load all components for project"""
try:
with get_connection() as conn:
rows = conn.execute(
"""
SELECT id, name, figma_key, description,
properties, variants, code_generated,
created_at, updated_at
FROM components
WHERE project_id = ?
ORDER BY name
""",
(project_id,)
).fetchall()
components = []
for row in rows:
comp = dict(row)
# Parse JSON fields
if comp.get('properties'):
comp['properties'] = json.loads(comp['properties'])
if comp.get('variants'):
comp['variants'] = json.loads(comp['variants'])
components.append(comp)
return components
except Exception as e:
print(f"Error loading components: {e}")
return []
def _load_styles(self, project_id: str) -> List[Dict[str, Any]]:
"""Load all styles for project"""
try:
with get_connection() as conn:
rows = conn.execute(
"""
SELECT id, name, type, figma_key, properties, created_at
FROM styles
WHERE project_id = ?
ORDER BY type, name
""",
(project_id,)
).fetchall()
styles = []
for row in rows:
style = dict(row)
if style.get('properties'):
style['properties'] = json.loads(style['properties'])
styles.append(style)
return styles
except Exception as e:
print(f"Error loading styles: {e}")
return []
def _load_stats(self, project_id: str) -> Dict[str, Any]:
"""Load project statistics"""
try:
with get_connection() as conn:
# Component count by type
component_stats = conn.execute(
"""
SELECT COUNT(*) as total,
SUM(CASE WHEN code_generated = 1 THEN 1 ELSE 0 END) as generated
FROM components
WHERE project_id = ?
""",
(project_id,)
).fetchone()
# Style count by type
style_stats = conn.execute(
"""
SELECT type, COUNT(*) as count
FROM styles
WHERE project_id = ?
GROUP BY type
""",
(project_id,)
).fetchall()
return {
'components': dict(component_stats) if component_stats else {'total': 0, 'generated': 0},
'styles': {row['type']: row['count'] for row in style_stats}
}
except Exception as e:
print(f"Error loading stats: {e}")
return {'components': {'total': 0, 'generated': 0}, 'styles': {}}
def _load_integrations(self, project_id: str, user_id: Optional[int]) -> Dict[str, Any]:
"""Load user's enabled integrations for this project"""
if not user_id:
return {}
try:
with get_connection() as conn:
rows = conn.execute(
"""
SELECT integration_type, config, enabled, last_used_at
FROM project_integrations
WHERE project_id = ? AND user_id = ? AND enabled = 1
""",
(project_id, user_id)
).fetchall()
# Return decrypted config for each integration
integrations = {}
cipher = mcp_config.get_cipher()
for row in rows:
integration_type = row['integration_type']
encrypted_config = row['config']
# Decrypt config
if cipher:
try:
decrypted_config = cipher.decrypt(encrypted_config.encode()).decode()
config = json.loads(decrypted_config)
except Exception as e:
print(f"Error decrypting integration config: {e}")
config = {}
else:
# No encryption key, try to parse as JSON
try:
config = json.loads(encrypted_config)
except:
config = {}
integrations[integration_type] = {
'enabled': True,
'config': config,
'last_used_at': row['last_used_at']
}
return integrations
except Exception as e:
print(f"Error loading integrations: {e}")
return {}
def _load_tokens(self, project_path: Optional[Path]) -> Dict[str, Any]:
"""Load design tokens from filesystem"""
if not project_path:
return {}
tokens = {}
token_files = ['tokens.json', 'design-tokens.json', 'variables.json']
for token_file in token_files:
token_path = project_path / token_file
if token_path.exists():
try:
with open(token_path) as f:
tokens = json.load(f)
break
except Exception as e:
print(f"Error loading tokens from {token_path}: {e}")
return tokens
def _load_discovery(self, project_path: Optional[Path]) -> Dict[str, Any]:
"""Load project discovery data"""
if not project_path:
return {}
try:
scanner = ProjectScanner(str(project_path))
discovery = scanner.scan()
return discovery
except Exception as e:
print(f"Error running discovery scan: {e}")
return {}
def _compute_health(
self,
components: List[Dict],
tokens: Dict,
stats: Dict
) -> Dict[str, Any]:
"""Compute project health score"""
score = 100
issues = []
# Deduct points for missing components
if stats['components']['total'] == 0:
score -= 30
issues.append("No components defined")
# Deduct points for no tokens
if not tokens:
score -= 20
issues.append("No design tokens defined")
# Deduct points for ungeneratedcomponents
total = stats['components']['total']
generated = stats['components']['generated']
if total > 0 and generated < total:
percentage = (generated / total) * 100
if percentage < 50:
score -= 20
issues.append(f"Low code generation: {percentage:.1f}%")
elif percentage < 80:
score -= 10
issues.append(f"Medium code generation: {percentage:.1f}%")
# Compute grade
if score >= 90:
grade = 'A'
elif score >= 80:
grade = 'B'
elif score >= 70:
grade = 'C'
elif score >= 60:
grade = 'D'
else:
grade = 'F'
return {
'score': max(0, score),
'grade': grade,
'issues': issues
}
def clear_cache(self, project_id: Optional[str] = None):
"""Clear cache for specific project or all projects"""
if project_id:
# Clear all cache entries for this project
keys_to_remove = [k for k in self._cache.keys() if k.startswith(f"{project_id}:")]
for key in keys_to_remove:
del self._cache[key]
else:
# Clear all cache
self._cache.clear()
# Singleton instance
_context_manager = None
def get_context_manager() -> ProjectContextManager:
"""Get singleton context manager instance"""
global _context_manager
if _context_manager is None:
_context_manager = ProjectContextManager()
return _context_manager