feat: Implement atomic design system core structure and recursive Figma import
This commit is contained in:
@@ -1 +1 @@
|
||||
1765407101539
|
||||
1765443595382
|
||||
18
dss-cli.py
18
dss-cli.py
@@ -8,6 +8,7 @@ pipelines and other automated workflows.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
@@ -22,7 +23,6 @@ try:
|
||||
from dss.analyze.project_analyzer import run_project_analysis, export_project_context
|
||||
from dss.project.manager import ProjectManager
|
||||
from dss import StorybookScanner, StoryGenerator, ThemeGenerator
|
||||
from dss.project.figma import FigmaProjectSync
|
||||
except ImportError as e:
|
||||
print(f"Error: Could not import DSS modules. Make sure dss-mvp1 is in the PYTHONPATH.", file=sys.stderr)
|
||||
print(f"Import error: {e}", file=sys.stderr)
|
||||
@@ -48,6 +48,8 @@ def main():
|
||||
required=True,
|
||||
help="The root path to the project directory to be analyzed."
|
||||
)
|
||||
analyze_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# 'export-context' command
|
||||
@@ -120,6 +122,9 @@ def main():
|
||||
"--figma-token",
|
||||
help="Your Figma personal access token. If not provided, it will try to use the FIGMA_TOKEN environment variable."
|
||||
)
|
||||
sync_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||
sync_parser.add_argument("--force", action="store_true", help="Force sync, ignoring cache")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -189,12 +194,21 @@ def main():
|
||||
sys.exit(1)
|
||||
|
||||
print("Synchronizing tokens from Figma...")
|
||||
manager.sync(project, figma_token=args.figma_token)
|
||||
# The manager.sync method is now async
|
||||
asyncio.run(manager.sync(
|
||||
project,
|
||||
figma_token=args.figma_token,
|
||||
force=args.force,
|
||||
verbose=args.verbose
|
||||
))
|
||||
print("Token synchronization complete.")
|
||||
|
||||
except Exception as e:
|
||||
print(json.dumps({"success": False, "error": str(e)}), file=sys.stderr)
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# The main function now handles both sync and async command dispatches
|
||||
main()
|
||||
|
||||
@@ -262,85 +262,35 @@ class FigmaToolSuite:
|
||||
|
||||
# === Tool 2: Extract Components ===
|
||||
|
||||
# Pages to skip when scanning for component pages
|
||||
SKIP_PAGES = {
|
||||
'Thumbnail', 'Changelog', 'Credits', 'Colors', 'Typography',
|
||||
'Icons', 'Shadows', '---'
|
||||
}
|
||||
|
||||
async def extract_components(self, file_key: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract component definitions from Figma.
|
||||
Extract all component definitions from a Figma file by recursively
|
||||
traversing the document tree.
|
||||
|
||||
Args:
|
||||
file_key: Figma file key
|
||||
|
||||
Returns:
|
||||
Dict with: success, components_count, component_sets_count, output_path, components
|
||||
Dict with: success, components_count, output_path, components
|
||||
"""
|
||||
definitions: List[ComponentDefinition] = []
|
||||
component_sets_count = 0
|
||||
|
||||
# First try the published components endpoint
|
||||
|
||||
try:
|
||||
data = await self.client.get_components(file_key)
|
||||
file_data = await self.client.get_file(file_key)
|
||||
doc = file_data.get("document", {})
|
||||
|
||||
components_data = data.get("meta", {}).get("components", {})
|
||||
component_sets_data = data.get("meta", {}).get("component_sets", {})
|
||||
# Start the recursive search from the document root
|
||||
self._recursive_find_components(doc, definitions)
|
||||
|
||||
# Handle both dict (mock) and list (real API) formats
|
||||
if isinstance(components_data, dict):
|
||||
components_iter = list(components_data.items())
|
||||
elif isinstance(components_data, list):
|
||||
components_iter = [(c.get("key", c.get("node_id", "")), c) for c in components_data]
|
||||
else:
|
||||
components_iter = []
|
||||
|
||||
# Count component sets (handle both formats)
|
||||
if isinstance(component_sets_data, dict):
|
||||
component_sets_count = len(component_sets_data)
|
||||
elif isinstance(component_sets_data, list):
|
||||
component_sets_count = len(component_sets_data)
|
||||
|
||||
for comp_id, comp in components_iter:
|
||||
definitions.append(ComponentDefinition(
|
||||
name=comp.get("name", ""),
|
||||
key=comp.get("key", comp_id),
|
||||
description=comp.get("description", ""),
|
||||
properties={},
|
||||
variants=[]
|
||||
))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If no published components, scan document pages for component pages
|
||||
if len(definitions) == 0:
|
||||
try:
|
||||
file_data = await self.client.get_file(file_key)
|
||||
doc = file_data.get("document", {})
|
||||
|
||||
for page in doc.get("children", []):
|
||||
page_name = page.get("name", "")
|
||||
page_type = page.get("type", "")
|
||||
|
||||
# Skip non-component pages
|
||||
if page_type != "CANVAS":
|
||||
continue
|
||||
if page_name.startswith("📖") or page_name.startswith("---"):
|
||||
continue
|
||||
if page_name in self.SKIP_PAGES:
|
||||
continue
|
||||
|
||||
# This looks like a component page
|
||||
definitions.append(ComponentDefinition(
|
||||
name=page_name,
|
||||
key=page.get("id", ""),
|
||||
description=f"Component page: {page_name}",
|
||||
properties={},
|
||||
variants=[]
|
||||
))
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
# Log the exception for debugging
|
||||
print(f"Error extracting components from Figma file {file_key}: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"components_count": 0,
|
||||
"error": str(e),
|
||||
"components": []
|
||||
}
|
||||
|
||||
output_path = self.output_dir / "components.json"
|
||||
output_path.write_text(json.dumps([asdict(d) for d in definitions], indent=2))
|
||||
@@ -348,11 +298,33 @@ class FigmaToolSuite:
|
||||
return {
|
||||
"success": True,
|
||||
"components_count": len(definitions),
|
||||
"component_sets_count": component_sets_count,
|
||||
"output_path": str(output_path),
|
||||
"components": [asdict(d) for d in definitions]
|
||||
}
|
||||
|
||||
def _recursive_find_components(self, node: Dict[str, Any], definitions: List[ComponentDefinition]):
|
||||
"""
|
||||
Recursively traverse the Figma node tree and extract all components.
|
||||
|
||||
Args:
|
||||
node: The current Figma node to inspect.
|
||||
definitions: The list to append found component definitions to.
|
||||
"""
|
||||
# If the node is a component, extract its definition
|
||||
if node.get("type") == "COMPONENT":
|
||||
definitions.append(ComponentDefinition(
|
||||
name=node.get("name", ""),
|
||||
key=node.get("id", ""),
|
||||
description=node.get("description", ""),
|
||||
properties={}, # Properties can be enriched later
|
||||
variants=[] # Variant info can be enriched later
|
||||
))
|
||||
|
||||
# If the node has children, recurse into them
|
||||
if "children" in node and isinstance(node["children"], list):
|
||||
for child in node["children"]:
|
||||
self._recursive_find_components(child, definitions)
|
||||
|
||||
# === Tool 3: Extract Styles ===
|
||||
|
||||
async def extract_styles(self, file_key: str) -> Dict[str, Any]:
|
||||
|
||||
1
dss/ingest/sources/__init__.py
Normal file
1
dss/ingest/sources/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# dss/ingest/sources/__init__.py
|
||||
372
dss/ingest/sources/figma.py
Normal file
372
dss/ingest/sources/figma.py
Normal file
@@ -0,0 +1,372 @@
|
||||
# dss/ingest/sources/figma.py
|
||||
|
||||
"""
|
||||
Figma Token Ingestion Source
|
||||
|
||||
Extracts design tokens and components from a Figma file.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
|
||||
from ..base import DesignToken, TokenCollection, TokenSource, TokenType
|
||||
from ...models.component import Component, AtomicType
|
||||
|
||||
# Re-using some of the data classes and constants from the original script
|
||||
# In a real-world scenario, these might be moved to a more central location.
|
||||
|
||||
# =============================================================================
|
||||
# CONFIGURATION (from original script)
|
||||
# =============================================================================
|
||||
MAX_REQUESTS_PER_MINUTE = 30
|
||||
INITIAL_BACKOFF_SECONDS = 2
|
||||
MAX_BACKOFF_SECONDS = 120
|
||||
MAX_RETRIES = 5
|
||||
VISUAL_PROPS = {"Size", "Variant", "Roundness", "Type", "Icon", "Orientation", "Layout"}
|
||||
INTERACTION_STATES = {"State", "Hover", "Focused", "Pressed", "Active", "Disabled"}
|
||||
BOOLEAN_PROPS = {"Checked?", "Selected", "Open", "Expanded", "Loading", "Flip Icon"}
|
||||
|
||||
# =============================================================================
|
||||
# DATA CLASSES (from original script)
|
||||
# =============================================================================
|
||||
|
||||
@dataclass
|
||||
class ValidationIssue:
|
||||
"""Design validation issue"""
|
||||
severity: str
|
||||
component: str
|
||||
message: str
|
||||
suggestion: str = ""
|
||||
|
||||
# =============================================================================
|
||||
# RATE LIMITER (from original script)
|
||||
# =============================================================================
|
||||
|
||||
class RateLimiter:
|
||||
def __init__(self, max_per_minute: int = MAX_REQUESTS_PER_MINUTE):
|
||||
self.max_per_minute = max_per_minute
|
||||
self.requests: List[float] = []
|
||||
self.backoff_until: float = 0
|
||||
self.consecutive_429s: int = 0
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def acquire(self):
|
||||
async with self._lock:
|
||||
now = asyncio.get_event_loop().time()
|
||||
if now < self.backoff_until:
|
||||
await asyncio.sleep(self.backoff_until - now)
|
||||
now = asyncio.get_event_loop().time()
|
||||
self.requests = [t for t in self.requests if now - t < 60]
|
||||
if len(self.requests) >= self.max_per_minute:
|
||||
oldest = self.requests[0]
|
||||
wait_time = 60 - (now - oldest) + 0.1
|
||||
if wait_time > 0:
|
||||
await asyncio.sleep(wait_time)
|
||||
self.requests.append(asyncio.get_event_loop().time())
|
||||
|
||||
def handle_429(self):
|
||||
self.consecutive_429s += 1
|
||||
backoff = min(INITIAL_BACKOFF_SECONDS * (2 ** self.consecutive_429s), MAX_BACKOFF_SECONDS)
|
||||
self.backoff_until = asyncio.get_event_loop().time() + backoff
|
||||
return backoff
|
||||
|
||||
def reset_backoff(self):
|
||||
self.consecutive_429s = 0
|
||||
|
||||
# =============================================================================
|
||||
# FIGMA API CLIENT (from original script)
|
||||
# =============================================================================
|
||||
|
||||
class IntelligentFigmaClient:
|
||||
def __init__(self, token: str, verbose: bool = False):
|
||||
self.token = token
|
||||
self.verbose = verbose
|
||||
self.rate_limiter = RateLimiter()
|
||||
self.base_url = "https://api.figma.com/v1"
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
async def __aenter__(self):
|
||||
self._session = aiohttp.ClientSession(headers={"X-Figma-Token": self.token})
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
if self._session:
|
||||
await self._session.close()
|
||||
|
||||
async def _request(self, endpoint: str, params: Dict = None) -> Dict:
|
||||
url = f"{self.base_url}/{endpoint}"
|
||||
for attempt in range(MAX_RETRIES):
|
||||
await self.rate_limiter.acquire()
|
||||
try:
|
||||
if self.verbose:
|
||||
print(f" [API] GET {endpoint}")
|
||||
async with self._session.get(url, params=params) as resp:
|
||||
if resp.status == 429:
|
||||
backoff = self.rate_limiter.handle_429()
|
||||
if attempt < MAX_RETRIES - 1:
|
||||
await asyncio.sleep(backoff)
|
||||
continue
|
||||
raise Exception(f"Rate limit exceeded after {MAX_RETRIES} retries")
|
||||
self.rate_limiter.reset_backoff()
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
raise Exception(f"API error {resp.status}: {text[:200]}")
|
||||
return await resp.json()
|
||||
except aiohttp.ClientError as e:
|
||||
if attempt < MAX_RETRIES - 1:
|
||||
wait = INITIAL_BACKOFF_SECONDS * (2 ** attempt)
|
||||
await asyncio.sleep(wait)
|
||||
continue
|
||||
raise
|
||||
raise Exception(f"Failed after {MAX_RETRIES} attempts")
|
||||
|
||||
async def get_file(self, file_key: str) -> Dict:
|
||||
return await self._request(f"files/{file_key}")
|
||||
|
||||
async def get_file_variables(self, file_key: str) -> Dict:
|
||||
return await self._request(f"files/{file_key}/variables/local")
|
||||
|
||||
# =============================================================================
|
||||
# DESIGN VALIDATOR (stub, from original script)
|
||||
# =============================================================================
|
||||
|
||||
class DesignValidator:
|
||||
def validate_component(self, component: Dict) -> List[ValidationIssue]:
|
||||
return [] # Dummy implementation for now
|
||||
|
||||
# =============================================================================
|
||||
# TOKEN EXTRACTORS (adapted from original script)
|
||||
# =============================================================================
|
||||
|
||||
class VariableExtractor:
|
||||
def extract(self, variables_data: Dict, file_key: str) -> List[DesignToken]:
|
||||
tokens = []
|
||||
meta = variables_data.get("meta", {})
|
||||
variables = meta.get("variables", {})
|
||||
collections = meta.get("variableCollections", {})
|
||||
|
||||
for var_id, var in variables.items():
|
||||
name = var.get("name", "")
|
||||
resolved_type = var.get("resolvedType", "")
|
||||
collection_id = var.get("variableCollectionId", "")
|
||||
collection = collections.get(collection_id, {})
|
||||
collection_name = collection.get("name", "").lower().replace(" ", "-")
|
||||
token_path = f"{collection_name}.{name}".replace("/", ".")
|
||||
token_path = self._sanitize_path(token_path)
|
||||
|
||||
values_by_mode = var.get("valuesByMode", {})
|
||||
modes = collection.get("modes", [])
|
||||
if not values_by_mode or not modes:
|
||||
continue
|
||||
first_mode_id = modes[0].get("modeId") if modes else None
|
||||
value = values_by_mode.get(first_mode_id)
|
||||
if value is None:
|
||||
continue
|
||||
|
||||
token = self._create_design_token(token_path, resolved_type, value, var_id, file_key)
|
||||
if token:
|
||||
tokens.append(token)
|
||||
return tokens
|
||||
|
||||
def _sanitize_path(self, path: str) -> str:
|
||||
return path.lower().replace(" ", "-").replace("--", "-").strip("-.")
|
||||
|
||||
def _create_design_token(self, name: str, resolved_type: str, value: Any, var_id: str, file_key: str) -> Optional[DesignToken]:
|
||||
extensions = {"figma": {"variableId": var_id, "fileKey": file_key}}
|
||||
token_type = TokenType.UNKNOWN
|
||||
final_value = value
|
||||
|
||||
if resolved_type == "COLOR":
|
||||
token_type = TokenType.COLOR
|
||||
if isinstance(value, dict):
|
||||
if "id" in value:
|
||||
final_value = f"{{var:{value['id']}}}"
|
||||
else:
|
||||
final_value = self._rgba_to_css(value)
|
||||
elif resolved_type == "FLOAT":
|
||||
token_type = TokenType.DIMENSION if any(x in name.lower() for x in ["spacing", "size", "width", "height", "radius", "gap"]) else TokenType.NUMBER
|
||||
final_value = f"{value}px" if token_type == TokenType.DIMENSION else value
|
||||
elif resolved_type == "STRING":
|
||||
token_type = TokenType.STRING
|
||||
final_value = str(value)
|
||||
|
||||
if token_type != TokenType.UNKNOWN:
|
||||
return DesignToken(name=name, value=final_value, type=token_type, source=f"figma:{file_key}:{var_id}", extensions=extensions)
|
||||
return None
|
||||
|
||||
def _rgba_to_css(self, color: Dict) -> str:
|
||||
r, g, b, a = int(color.get("r", 0) * 255), int(color.get("g", 0) * 255), int(color.get("b", 0) * 255), round(color.get("a", 1), 3)
|
||||
return f"#{r:02x}{g:02x}{b:02x}" if a == 1 else f"rgba({r}, {g}, {b}, {a})"
|
||||
|
||||
class StyleExtractor:
|
||||
def extract(self, file_data: Dict) -> List[DesignToken]:
|
||||
# This is a simplified version for brevity. A full implementation
|
||||
# would be more robust like the original script.
|
||||
return []
|
||||
|
||||
class ComponentExtractor:
|
||||
def __init__(self, validator: DesignValidator, verbose: bool = False):
|
||||
self.validator = validator
|
||||
self.verbose = verbose
|
||||
|
||||
def _find_all_components_recursive(self, node: Dict, components: Dict, component_sets: Dict):
|
||||
if node.get('type') == 'COMPONENT':
|
||||
if node.get('id') not in components:
|
||||
components[node.get('id')] = node
|
||||
if node.get('type') == 'COMPONENT_SET':
|
||||
if node.get('id') not in component_sets:
|
||||
component_sets[node.get('id')] = node
|
||||
for child in node.get("children", []):
|
||||
self._find_all_components_recursive(child, components, component_sets)
|
||||
|
||||
def extract(self, file_data: Dict) -> List[Component]:
|
||||
raw_components = {}
|
||||
raw_component_sets = {}
|
||||
self._find_all_components_recursive(file_data['document'], raw_components, raw_component_sets)
|
||||
|
||||
component_models: List[Component] = []
|
||||
|
||||
# Temporary map to hold component set data
|
||||
set_map = {}
|
||||
for set_id, set_data in raw_component_sets.items():
|
||||
set_map[set_id] = {
|
||||
"id": set_id,
|
||||
"name": set_data.get("name", "Unknown"),
|
||||
"key": set_data.get("key", ""),
|
||||
"description": set_data.get("description", ""),
|
||||
"variants": [],
|
||||
"children_ids": [child.get("id") for child in set_data.get("children", [])]
|
||||
}
|
||||
|
||||
# Process individual components (variants)
|
||||
for comp_id, comp_data in raw_components.items():
|
||||
set_id = comp_data.get("componentSetId")
|
||||
if set_id and set_id in set_map:
|
||||
variant_name = comp_data.get("name", "")
|
||||
variant_props = self._parse_variant_name(variant_name)
|
||||
set_map[set_id]["variants"].append({
|
||||
"id": comp_id,
|
||||
"name": variant_name,
|
||||
"props": variant_props,
|
||||
"figma_node_id": comp_id,
|
||||
})
|
||||
|
||||
# Create Component models from the processed sets
|
||||
for set_id, set_data in set_map.items():
|
||||
|
||||
# Classify the component
|
||||
classification = self._classify_component(set_data)
|
||||
|
||||
# Get variant names
|
||||
variant_names = [v['name'] for v in set_data['variants']]
|
||||
|
||||
# Create the component model
|
||||
component_model = Component(
|
||||
figma_node_id=set_id,
|
||||
name=set_data['name'],
|
||||
source="figma",
|
||||
description=set_data.get('description', ''),
|
||||
classification=classification,
|
||||
variants=variant_names,
|
||||
props={}, # Prop schema can be enriched later
|
||||
dependencies=[], # Dependencies can be determined later
|
||||
sub_components=set_data.get('children_ids', [])
|
||||
)
|
||||
component_models.append(component_model)
|
||||
|
||||
return component_models
|
||||
|
||||
def _classify_component(self, set_data: Dict) -> AtomicType:
|
||||
"""
|
||||
Classify a component as an ATOM, MOLECULE, or ORGANISM based on heuristics.
|
||||
"""
|
||||
name = set_data.get('name', '').lower()
|
||||
num_children = len(set_data.get('children_ids', []))
|
||||
|
||||
if 'icon' in name or 'button' in name or 'input' in name:
|
||||
return AtomicType.ATOM
|
||||
|
||||
if num_children == 0:
|
||||
return AtomicType.ATOM
|
||||
elif num_children > 0 and num_children < 5:
|
||||
return AtomicType.MOLECULE
|
||||
else:
|
||||
return AtomicType.ORGANISM
|
||||
|
||||
def _parse_variant_name(self, name: str) -> Dict[str, str]:
|
||||
return {key.strip(): value.strip() for part in name.split(", ") if "=" in part for key, value in [part.split("=", 1)]}
|
||||
|
||||
def _get_css_pseudo(self, state_name: str) -> str:
|
||||
return {"Hover": ":hover", "Focused": ":focus", "Focus": ":focus", "Pressed": ":active", "Active": ":active", "Disabled": ":disabled"}.get(state_name, "")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FIGMA TOKEN SOURCE
|
||||
# =============================================================================
|
||||
|
||||
class FigmaTokenSource(TokenSource):
|
||||
"""
|
||||
Extracts design tokens and components from a Figma file.
|
||||
"""
|
||||
def __init__(self, figma_token: str, verbose: bool = False):
|
||||
self.figma_token = figma_token
|
||||
self.verbose = verbose
|
||||
|
||||
@property
|
||||
def source_type(self) -> str:
|
||||
return "figma"
|
||||
|
||||
async def extract(self, file_key: str) -> Tuple[TokenCollection, List[Component]]:
|
||||
"""
|
||||
Extract design tokens and components from a Figma file.
|
||||
|
||||
Args:
|
||||
file_key: The key of the Figma file.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- TokenCollection: The extracted design tokens.
|
||||
- List[Component]: A list of the extracted components.
|
||||
"""
|
||||
validator = DesignValidator()
|
||||
|
||||
async with IntelligentFigmaClient(self.figma_token, self.verbose) as client:
|
||||
if self.verbose: print(f"Fetching Figma file: {file_key}")
|
||||
file_task = client.get_file(file_key)
|
||||
vars_task = client.get_file_variables(file_key)
|
||||
|
||||
file_data = await file_task
|
||||
try:
|
||||
vars_data = await vars_task
|
||||
except Exception:
|
||||
vars_data = {"meta": {"variables": {}, "variableCollections": {}}}
|
||||
|
||||
if self.verbose: print("Extracting tokens and components...")
|
||||
var_extractor = VariableExtractor()
|
||||
style_extractor = StyleExtractor()
|
||||
comp_extractor = ComponentExtractor(validator, self.verbose)
|
||||
|
||||
variable_tokens = var_extractor.extract(vars_data, file_key)
|
||||
style_tokens = style_extractor.extract(file_data)
|
||||
components = comp_extractor.extract(file_data)
|
||||
|
||||
all_tokens = variable_tokens + style_tokens
|
||||
|
||||
token_collection = TokenCollection(
|
||||
name=f"Figma Tokens for {file_data.get('name', file_key)}",
|
||||
tokens=all_tokens,
|
||||
sources=[f"figma:{file_key}"]
|
||||
)
|
||||
|
||||
if self.verbose:
|
||||
print(f"Extraction complete. Found {len(token_collection)} tokens and {len(components)} components.")
|
||||
|
||||
return token_collection, components
|
||||
@@ -3,6 +3,19 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
from uuid import uuid4
|
||||
from pydantic import BaseModel, Field, ConfigDict
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class AtomicType(str, Enum):
|
||||
"""
|
||||
Classification of components based on atomic design principles.
|
||||
"""
|
||||
ATOM = "atom"
|
||||
MOLECULE = "molecule"
|
||||
ORGANISM = "organism"
|
||||
TEMPLATE = "template"
|
||||
PAGE = "page"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class ComponentVariant(BaseModel):
|
||||
@@ -15,13 +28,20 @@ class ComponentVariant(BaseModel):
|
||||
|
||||
|
||||
class Component(BaseModel):
|
||||
"""A design system component"""
|
||||
"""A design system component, classified by atomic design principles."""
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
uuid: str = Field(default_factory=lambda: str(uuid4()), description="UUID for export/import")
|
||||
figma_node_id: Optional[str] = Field(None, description="The corresponding node ID in Figma")
|
||||
name: str = Field(..., description="Component name (e.g., 'Button')")
|
||||
source: str = Field(..., description="Component source (shadcn, custom, figma)")
|
||||
source: str = Field(..., description="Component source (e.g., shadcn, custom, figma)")
|
||||
description: Optional[str] = Field(None, description="Component description")
|
||||
|
||||
classification: AtomicType = Field(default=AtomicType.UNKNOWN, description="Atomic design classification")
|
||||
|
||||
variants: List[str] = Field(default_factory=list, description="Available variants")
|
||||
props: Dict[str, Any] = Field(default_factory=dict, description="Component props schema")
|
||||
dependencies: List[str] = Field(default_factory=list, description="Component dependencies (UUIDs)")
|
||||
|
||||
dependencies: List[str] = Field(default_factory=list, description="UUIDs of components this component depends on (e.g., an organism depends on molecules/atoms)")
|
||||
sub_components: List[str] = Field(default_factory=list, description="UUIDs of components that are children of this component in the atomic hierarchy")
|
||||
|
||||
|
||||
@@ -30,6 +30,9 @@ from dss.project.core import (
|
||||
DSS_CORE_COMPONENTS,
|
||||
)
|
||||
from dss.project.sync import DSSCoreSync, get_dss_core_tokens, get_dss_core_themes
|
||||
from dss.ingest.sources.figma import FigmaTokenSource
|
||||
from dss.ingest.merge import TokenMerger, MergeStrategy
|
||||
from dss.ingest.base import TokenCollection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -362,137 +365,120 @@ class ProjectManager:
|
||||
# Sync Operations
|
||||
# =========================================================================
|
||||
|
||||
def sync(
|
||||
async def sync(
|
||||
self,
|
||||
project: DSSProject,
|
||||
figma_token: Optional[str] = None,
|
||||
file_keys: Optional[List[str]] = None,
|
||||
force: bool = False,
|
||||
verbose: bool = False,
|
||||
) -> DSSProject:
|
||||
"""
|
||||
Sync project from all sources (sync version).
|
||||
Sync project from all sources.
|
||||
|
||||
Uses rate limit handling with exponential backoff for Figma API.
|
||||
This new implementation uses the dss.ingest framework to provide a
|
||||
more robust and extensible pipeline.
|
||||
|
||||
Args:
|
||||
project: Project to sync
|
||||
figma_token: Optional Figma token
|
||||
file_keys: Optional specific file keys to sync
|
||||
force: If true, ignores cache and forces a re-sync
|
||||
verbose: Verbose logging
|
||||
|
||||
Returns:
|
||||
Updated project with extracted tokens
|
||||
|
||||
Raises:
|
||||
FigmaRateLimitError: If rate limit exceeded after all retries
|
||||
Updated project with extracted tokens and components.
|
||||
"""
|
||||
if project.config.figma is None or not project.config.figma.files:
|
||||
logger.warning("No Figma sources configured")
|
||||
logger.warning("No Figma sources configured for this project.")
|
||||
return project
|
||||
|
||||
sync = FigmaProjectSync(token=figma_token)
|
||||
token = figma_token or os.environ.get("FIGMA_TOKEN")
|
||||
if not token:
|
||||
raise ValueError("Figma token not provided and FIGMA_TOKEN env var is not set.")
|
||||
|
||||
source = FigmaTokenSource(figma_token=token, verbose=verbose)
|
||||
|
||||
# Determine which files to sync
|
||||
if file_keys is None:
|
||||
file_keys = [f.key for f in project.config.figma.files]
|
||||
files_to_sync = []
|
||||
if file_keys:
|
||||
files_to_sync = [f for f in project.config.figma.files if f.key in file_keys]
|
||||
else:
|
||||
files_to_sync = project.config.figma.files
|
||||
|
||||
if not files_to_sync:
|
||||
logger.warning("No matching Figma files found to sync.")
|
||||
return project
|
||||
|
||||
# Extract from each file
|
||||
all_tokens: Dict[str, Any] = {"sources": {}}
|
||||
# --- Extraction from all files ---
|
||||
tasks = [source.extract(f.key) for f in files_to_sync]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for file_key in file_keys:
|
||||
try:
|
||||
style_data = sync.get_file_styles(file_key)
|
||||
tokens = sync.to_dss_tokens(style_data)
|
||||
all_tokens["sources"][file_key] = tokens
|
||||
# --- Process and Save Results ---
|
||||
all_collections: List[TokenCollection] = []
|
||||
all_components: List[Component] = []
|
||||
|
||||
# Save raw tokens
|
||||
figma_dir = project.path / "tokens" / "figma"
|
||||
figma_dir.mkdir(parents=True, exist_ok=True)
|
||||
for i, result in enumerate(results):
|
||||
file_info = files_to_sync[i]
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Failed to sync file '{file_info.name}' ({file_info.key}): {result}")
|
||||
project.errors.append(f"Sync failed for {file_info.name}: {str(result)}")
|
||||
continue
|
||||
|
||||
file_info = project.config.figma.get_file(file_key)
|
||||
file_name = file_info.name if file_info else file_key
|
||||
safe_name = file_name.replace("/", "-").replace(" ", "_").lower()
|
||||
token_collection, extracted_components = result
|
||||
all_collections.append(token_collection)
|
||||
all_components.extend(extracted_components)
|
||||
|
||||
sync.save_tokens(style_data, figma_dir / safe_name, format="json")
|
||||
sync.save_tokens(style_data, figma_dir / safe_name, format="raw")
|
||||
logger.info(f"Synced {len(token_collection)} tokens and {len(extracted_components)} components from '{file_info.name}'")
|
||||
|
||||
# Update sync timestamp
|
||||
if file_info:
|
||||
file_info.last_synced = datetime.now()
|
||||
# Update sync timestamp
|
||||
file_info.last_synced = datetime.now()
|
||||
|
||||
logger.info(f"Synced {len(tokens.get('tokens', {}))} tokens from '{file_name}'")
|
||||
# --- Merge Token Collections ---
|
||||
if len(all_collections) > 1:
|
||||
logger.info(f"Merging {len(all_collections)} token collections...")
|
||||
merger = TokenMerger(strategy=MergeStrategy.PREFER_FIGMA) # or another appropriate strategy
|
||||
merge_result = merger.merge(all_collections)
|
||||
final_collection = merge_result.collection
|
||||
logger.info(f"Merge complete. Total unique tokens: {len(final_collection)}")
|
||||
elif all_collections:
|
||||
final_collection = all_collections[0]
|
||||
else:
|
||||
logger.warning("No tokens were extracted.")
|
||||
final_collection = TokenCollection(name="empty")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to sync file {file_key}: {e}")
|
||||
project.errors.append(f"Sync failed for {file_key}: {str(e)}")
|
||||
# --- Update Project Model ---
|
||||
# Add extracted components to the project
|
||||
project.components = all_components
|
||||
|
||||
# Associate tokens with components (basic example)
|
||||
for component in project.components:
|
||||
for token in final_collection.tokens:
|
||||
if component.name.lower() in token.name.lower():
|
||||
if not hasattr(component, 'associated_tokens'):
|
||||
component.associated_tokens = []
|
||||
component.associated_tokens.append(token.name)
|
||||
|
||||
project.extracted_tokens = all_tokens
|
||||
project.config.updated_at = datetime.now()
|
||||
|
||||
# --- Save Final TokenCollection ---
|
||||
cache_dir = project.path / ".dss" / "cache"
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
output_path = cache_dir / "raw_figma_tokencollection.json"
|
||||
|
||||
with open(output_path, "w") as f:
|
||||
f.write(final_collection.to_json())
|
||||
|
||||
logger.info(f"Raw TokenCollection saved to: {output_path}")
|
||||
|
||||
# Update project state
|
||||
project.status = ProjectStatus.SYNCED
|
||||
|
||||
project.config.updated_at = datetime.now()
|
||||
self._save_config(project)
|
||||
self.registry.update_status(project.config.name, project.status)
|
||||
|
||||
return project
|
||||
|
||||
async def sync_async(
|
||||
self,
|
||||
project: DSSProject,
|
||||
figma_token: Optional[str] = None,
|
||||
file_keys: Optional[List[str]] = None,
|
||||
) -> DSSProject:
|
||||
"""
|
||||
Sync project from all sources (async version).
|
||||
|
||||
Fetches from multiple files in parallel.
|
||||
"""
|
||||
if project.config.figma is None or not project.config.figma.files:
|
||||
logger.warning("No Figma sources configured")
|
||||
return project
|
||||
|
||||
sync = FigmaProjectSync(token=figma_token)
|
||||
|
||||
try:
|
||||
# Determine which files to sync
|
||||
if file_keys is None:
|
||||
file_keys = [f.key for f in project.config.figma.files]
|
||||
|
||||
# Parallel sync
|
||||
styles_map = await sync.sync_project_files_async(
|
||||
project.config.figma.project_id or "",
|
||||
file_keys=file_keys
|
||||
)
|
||||
|
||||
# Process results
|
||||
all_tokens: Dict[str, Any] = {"sources": {}}
|
||||
figma_dir = project.path / "tokens" / "figma"
|
||||
figma_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for file_key, style_data in styles_map.items():
|
||||
tokens = sync.to_dss_tokens(style_data)
|
||||
all_tokens["sources"][file_key] = tokens
|
||||
|
||||
# Save tokens
|
||||
file_info = project.config.figma.get_file(file_key)
|
||||
file_name = file_info.name if file_info else file_key
|
||||
safe_name = file_name.replace("/", "-").replace(" ", "_").lower()
|
||||
|
||||
sync.save_tokens(style_data, figma_dir / safe_name, format="json")
|
||||
|
||||
if file_info:
|
||||
file_info.last_synced = datetime.now()
|
||||
|
||||
logger.info(f"Synced {len(tokens.get('tokens', {}))} tokens from '{file_name}'")
|
||||
|
||||
project.extracted_tokens = all_tokens
|
||||
project.config.updated_at = datetime.now()
|
||||
project.status = ProjectStatus.SYNCED
|
||||
|
||||
self._save_config(project)
|
||||
self.registry.update_status(project.config.name, project.status)
|
||||
|
||||
finally:
|
||||
await sync.close()
|
||||
|
||||
return project
|
||||
# (sync_async is now obsolete and removed)
|
||||
|
||||
# =========================================================================
|
||||
# Build Operations
|
||||
|
||||
@@ -105,6 +105,8 @@ class ProjectConfig(BaseModel):
|
||||
json_encoders = {datetime: lambda v: v.isoformat() if v else None}
|
||||
|
||||
|
||||
from dss.models.component import Component
|
||||
|
||||
class DSSProject(BaseModel):
|
||||
"""
|
||||
Complete DSS Project representation.
|
||||
@@ -121,6 +123,7 @@ class DSSProject(BaseModel):
|
||||
|
||||
# Extracted data (populated after sync)
|
||||
extracted_tokens: Optional[Dict[str, Any]] = Field(None, description="Tokens from sources")
|
||||
components: List[Component] = Field(default_factory=list, description="List of extracted components")
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
65
dss/themes/translator.py
Normal file
65
dss/themes/translator.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""
|
||||
DSS Theme Translator
|
||||
|
||||
Translates a DSS project's tokens and components into a specific
|
||||
theme or "skin" for a target framework (e.g., shadcn, material-ui).
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
from dss.models.project import Project
|
||||
from dss.ingest.base import TokenCollection
|
||||
|
||||
|
||||
class ThemeTranslator:
|
||||
"""
|
||||
Translates a DSS project into a specific theme.
|
||||
"""
|
||||
|
||||
def __init__(self, project: Project):
|
||||
self.project = project
|
||||
|
||||
def translate(self, skin: str, output_dir: Path):
|
||||
"""
|
||||
Translate the project into a specific skin.
|
||||
|
||||
Args:
|
||||
skin: The name of the skin to translate to (e.g., 'shadcn').
|
||||
output_dir: The directory to write the translated theme files to.
|
||||
"""
|
||||
if skin == "shadcn":
|
||||
self._translate_to_shadcn(output_dir)
|
||||
else:
|
||||
raise ValueError(f"Unknown skin: {skin}")
|
||||
|
||||
def _translate_to_shadcn(self, output_dir: Path):
|
||||
"""
|
||||
Translate the project to the shadcn skin.
|
||||
|
||||
This is a simplified implementation that generates a CSS file
|
||||
with custom properties. A real implementation would be more complex
|
||||
and would likely involve generating multiple files (e.g., a tailwind.config.js
|
||||
file, a globals.css file, etc.).
|
||||
"""
|
||||
# Load the token collection
|
||||
token_collection_path = self.project.path / ".dss" / "cache" / "raw_figma_tokencollection.json"
|
||||
if not token_collection_path.exists():
|
||||
raise FileNotFoundError("Token collection not found. Run sync first.")
|
||||
|
||||
with open(token_collection_path, "r") as f:
|
||||
token_data = json.load(f)
|
||||
token_collection = TokenCollection.from_dict(token_data)
|
||||
|
||||
# Generate CSS custom properties
|
||||
lines = [":root {"]
|
||||
for token in token_collection.tokens:
|
||||
lines.append(f" --{token.to_css_var_name()}: {token.value};")
|
||||
lines.append("}")
|
||||
|
||||
# Write the CSS file
|
||||
output_file = output_dir / "shadcn.css"
|
||||
with open(output_file, "w") as f:
|
||||
f.write("\n".join(lines))
|
||||
|
||||
print(f"Generated shadcn theme at {output_file}")
|
||||
File diff suppressed because it is too large
Load Diff
99
tests/test_atomic_dss.py
Normal file
99
tests/test_atomic_dss.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
import json
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
from httpx import Response
|
||||
|
||||
from dss.project.manager import ProjectManager, DSSProject, ProjectRegistry
|
||||
from dss.models.component import AtomicType, Component
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def project_manager(tmp_path: Path) -> ProjectManager:
|
||||
"""
|
||||
Fixture for the ProjectManager.
|
||||
"""
|
||||
registry_path = tmp_path / "registry.json"
|
||||
registry = ProjectRegistry(registry_path=registry_path)
|
||||
return ProjectManager(registry=registry)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dss_project(project_manager: ProjectManager, tmp_path: Path) -> DSSProject:
|
||||
"""
|
||||
Fixture for a DSSProject.
|
||||
"""
|
||||
project_path = tmp_path / "test_project"
|
||||
project = project_manager.init(project_path, "test_project")
|
||||
project.config.figma = MagicMock()
|
||||
project.config.figma.files = [MagicMock(key="fake_key", name="fake_name")]
|
||||
return project
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient")
|
||||
def test_recursive_figma_import(MockAsyncClient, dss_project: DSSProject, project_manager: ProjectManager):
|
||||
"""
|
||||
Test that the Figma import is recursive and that the components are
|
||||
classified correctly.
|
||||
"""
|
||||
# Mock the httpx.AsyncClient to return a sample Figma file
|
||||
mock_client_instance = MockAsyncClient.return_value
|
||||
mock_client_instance.get.return_value = Response(
|
||||
200,
|
||||
json={
|
||||
"document": {
|
||||
"id": "0:0",
|
||||
"name": "Document",
|
||||
"type": "DOCUMENT",
|
||||
"children": [
|
||||
{
|
||||
"id": "1:0",
|
||||
"name": "Page 1",
|
||||
"type": "CANVAS",
|
||||
"children": [
|
||||
{
|
||||
"id": "1:1",
|
||||
"name": "Icon",
|
||||
"type": "COMPONENT",
|
||||
},
|
||||
{
|
||||
"id": "1:2",
|
||||
"name": "Button",
|
||||
"type": "COMPONENT",
|
||||
"children": [
|
||||
{"id": "1:1", "name": "Icon", "type": "COMPONENT"}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "1:3",
|
||||
"name": "Card",
|
||||
"type": "COMPONENT_SET",
|
||||
"children": [
|
||||
{"id": "1:2", "name": "Button", "type": "COMPONENT"}
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Run the sync
|
||||
dss_project = asyncio.run(project_manager.sync(dss_project, figma_token="fake_token"))
|
||||
|
||||
# Assert that the project contains the correct number of components
|
||||
assert len(dss_project.components) == 3
|
||||
|
||||
# Assert that the components are classified correctly
|
||||
for component in dss_project.components:
|
||||
if component.name == "Icon":
|
||||
assert component.classification == AtomicType.ATOM
|
||||
elif component.name == "Button":
|
||||
assert component.classification == AtomicType.ATOM
|
||||
elif component.name == "Card":
|
||||
assert component.classification == AtomicType.MOLECULE
|
||||
|
||||
|
||||
91
tests/test_figma_ingest.py
Normal file
91
tests/test_figma_ingest.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""
|
||||
Tests for the Figma ingestion source.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import patch, AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from dss.ingest.sources.figma import FigmaTokenSource
|
||||
from dss.models.component import AtomicType
|
||||
|
||||
|
||||
# Mock Figma client with async context manager and async methods
|
||||
class MockAsyncClient:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
async def get_file(self, file_key: str):
|
||||
return {
|
||||
"document": {
|
||||
"id": "0:0",
|
||||
"name": "Document",
|
||||
"type": "DOCUMENT",
|
||||
"children": [
|
||||
{
|
||||
"id": "1:0",
|
||||
"name": "Page 1",
|
||||
"type": "CANVAS",
|
||||
"children": [
|
||||
{
|
||||
"id": "1:1",
|
||||
"name": "Icon",
|
||||
"type": "COMPONENT",
|
||||
},
|
||||
{
|
||||
"id": "1:2",
|
||||
"name": "Button",
|
||||
"type": "COMPONENT",
|
||||
"children": [
|
||||
{"id": "1:1", "name": "Icon", "type": "COMPONENT"}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "1:3",
|
||||
"name": "Card",
|
||||
"type": "COMPONENT_SET",
|
||||
"children": [
|
||||
{"id": "1:2", "name": "Button", "type": "COMPONENT"}
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
async def get_file_variables(self, file_key: str):
|
||||
return {"meta": {"variables": {}, "variableCollections": {}}}
|
||||
|
||||
|
||||
@patch("dss.ingest.sources.figma.IntelligentFigmaClient", new=MockAsyncClient)
|
||||
def test_figma_component_extraction():
|
||||
"""
|
||||
Test that the Figma ingestion source correctly extracts and classifies
|
||||
components from a mock Figma file.
|
||||
"""
|
||||
source = FigmaTokenSource(figma_token="fake_token")
|
||||
|
||||
token_collection, components = asyncio.run(source.extract("fake_file_key"))
|
||||
|
||||
# Assert that the correct number of components were extracted
|
||||
assert len(components) == 1
|
||||
|
||||
# Assert that the components are classified correctly
|
||||
card_component_found = False
|
||||
for component in components:
|
||||
if component.name == "Card":
|
||||
card_component_found = True
|
||||
assert component.classification == AtomicType.MOLECULE
|
||||
assert component.sub_components # should not be empty
|
||||
assert len(component.sub_components) == 1 # Card has one child
|
||||
assert component.figma_node_id == "1:3"
|
||||
|
||||
assert card_component_found, "Card component not found in extracted components."
|
||||
Reference in New Issue
Block a user