Files
dss/scripts/figma-sync.py
DSS 08ce228df1
Some checks failed
DSS Project Analysis / dss-context-update (push) Has been cancelled
feat: Add DSS infrastructure, remove legacy admin-ui code
- Remove legacy admin-ui/js/ vanilla JS components
- Add .dss/ directory with core tokens, skins, themes
- Add Storybook configuration and generated stories
- Add DSS management scripts (dss-services, dss-init, dss-setup, dss-reset)
- Add MCP command definitions for DSS plugin
- Add Figma sync architecture and scripts
- Update pre-commit hooks with documentation validation
- Fix JSON trailing commas in skin files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-10 22:15:11 -03:00

1106 lines
39 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DSS Intelligent Figma Sync
4-Layer Pipeline Architecture:
1. API Layer - Rate-limited Figma API access with caching
2. Validation - Design linting and contract enforcement
3. Extraction - Variables, Styles, Components extraction
4. Translation - Figma → DSS canonical format with W3C tokens
Features:
- Exponential backoff rate limiting with request queue
- lastModified caching to minimize API calls
- Figma Variables extraction (colors, spacing, breakpoints)
- Style extraction (typography, effects)
- Component variant classification (visual props vs interaction states)
- W3C Design Token format output
- Design contract validation
Usage: python3 scripts/figma-sync.py [--file-key KEY] [--force] [--verbose]
"""
import sys
import os
import json
import asyncio
import time
import hashlib
import re
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Any, Set
from enum import Enum
import aiohttp
# =============================================================================
# CONFIGURATION
# =============================================================================
DSS_ROOT = Path(__file__).parent.parent
CACHE_DIR = DSS_ROOT / ".dss/cache"
TOKENS_DIR = DSS_ROOT / ".dss/data/_system/tokens"
COMPONENTS_DIR = DSS_ROOT / ".dss/components"
SKINS_DIR = DSS_ROOT / ".dss/skins"
# Rate limiting
MAX_REQUESTS_PER_MINUTE = 30
INITIAL_BACKOFF_SECONDS = 2
MAX_BACKOFF_SECONDS = 120
MAX_RETRIES = 5
# Caching
CACHE_TTL_HOURS = 24
# Variant classification
VISUAL_PROPS = {"Size", "Variant", "Roundness", "Type", "Icon", "Orientation", "Layout"}
INTERACTION_STATES = {"State", "Hover", "Focused", "Pressed", "Active", "Disabled"}
BOOLEAN_PROPS = {"Checked?", "Selected", "Open", "Expanded", "Loading", "Flip Icon"}
# Design contract - required naming patterns
VALID_NAME_PATTERN = re.compile(r'^[A-Z][a-zA-Z0-9 /-]*$')
INVALID_NAME_PATTERNS = [
re.compile(r'^Property \d+$'),
re.compile(r'^Frame \d+$'),
re.compile(r'^Group \d+$'),
re.compile(r'^Component \d+$'),
]
# =============================================================================
# DATA CLASSES
# =============================================================================
class TokenType(Enum):
COLOR = "color"
SPACING = "dimension"
TYPOGRAPHY = "typography"
SHADOW = "shadow"
BORDER = "border"
OPACITY = "number"
DURATION = "duration"
CUBIC_BEZIER = "cubicBezier"
@dataclass
class W3CToken:
"""W3C Design Token format"""
value: Any
type: str
description: str = ""
extensions: Dict = field(default_factory=dict)
def to_dict(self) -> Dict:
result = {
"$value": self.value,
"$type": self.type,
}
if self.description:
result["$description"] = self.description
if self.extensions:
result["$extensions"] = self.extensions
return result
@dataclass
class ValidationIssue:
"""Design validation issue"""
severity: str # error, warning, info
component: str
message: str
suggestion: str = ""
@dataclass
class SyncManifest:
"""Cache manifest for tracking sync state"""
file_key: str
last_modified: str
synced_at: str
node_hashes: Dict[str, str] = field(default_factory=dict)
extracted_tokens: int = 0
extracted_components: int = 0
validation_issues: int = 0
# =============================================================================
# RATE LIMITER
# =============================================================================
class RateLimiter:
"""Exponential backoff rate limiter with request queue"""
def __init__(self, max_per_minute: int = MAX_REQUESTS_PER_MINUTE):
self.max_per_minute = max_per_minute
self.requests: List[float] = []
self.backoff_until: float = 0
self.consecutive_429s: int = 0
self._lock = asyncio.Lock()
async def acquire(self):
"""Wait for rate limit slot"""
async with self._lock:
now = time.time()
# Check if in backoff period
if now < self.backoff_until:
wait_time = self.backoff_until - now
print(f" [RATE] Backoff: waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
now = time.time()
# Clean old requests (older than 1 minute)
self.requests = [t for t in self.requests if now - t < 60]
# Wait if at limit
if len(self.requests) >= self.max_per_minute:
oldest = self.requests[0]
wait_time = 60 - (now - oldest) + 0.1
if wait_time > 0:
print(f" [RATE] Limit reached: waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
now = time.time()
self.requests = [t for t in self.requests if now - t < 60]
self.requests.append(now)
def handle_429(self):
"""Handle rate limit response with exponential backoff"""
self.consecutive_429s += 1
backoff = min(
INITIAL_BACKOFF_SECONDS * (2 ** self.consecutive_429s),
MAX_BACKOFF_SECONDS
)
self.backoff_until = time.time() + backoff
print(f" [RATE] 429 received: backoff {backoff}s (attempt {self.consecutive_429s})")
return backoff
def reset_backoff(self):
"""Reset backoff after successful request"""
self.consecutive_429s = 0
# =============================================================================
# FIGMA API CLIENT
# =============================================================================
class IntelligentFigmaClient:
"""Figma API client with caching and rate limiting"""
def __init__(self, token: str, verbose: bool = False):
self.token = token
self.verbose = verbose
self.rate_limiter = RateLimiter()
self.base_url = "https://api.figma.com/v1"
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(
headers={"X-Figma-Token": self.token}
)
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def _request(self, endpoint: str, params: Dict = None) -> Dict:
"""Make rate-limited API request with retries"""
url = f"{self.base_url}/{endpoint}"
for attempt in range(MAX_RETRIES):
await self.rate_limiter.acquire()
try:
if self.verbose:
print(f" [API] GET {endpoint}")
async with self._session.get(url, params=params) as resp:
if resp.status == 429:
backoff = self.rate_limiter.handle_429()
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(backoff)
continue
raise Exception(f"Rate limit exceeded after {MAX_RETRIES} retries")
self.rate_limiter.reset_backoff()
if resp.status != 200:
text = await resp.text()
raise Exception(f"API error {resp.status}: {text[:200]}")
return await resp.json()
except aiohttp.ClientError as e:
if attempt < MAX_RETRIES - 1:
wait = INITIAL_BACKOFF_SECONDS * (2 ** attempt)
print(f" [API] Connection error, retry in {wait}s: {e}")
await asyncio.sleep(wait)
continue
raise
raise Exception(f"Failed after {MAX_RETRIES} attempts")
async def get_file(self, file_key: str) -> Dict:
"""Get full Figma file"""
return await self._request(f"files/{file_key}")
async def get_file_meta(self, file_key: str) -> Dict:
"""Get file metadata (lightweight, for caching check)"""
return await self._request(f"files/{file_key}", {"depth": 1})
async def get_file_variables(self, file_key: str) -> Dict:
"""Get Figma variables (colors, spacing, etc.)"""
return await self._request(f"files/{file_key}/variables/local")
async def get_file_styles(self, file_key: str) -> Dict:
"""Get published styles"""
return await self._request(f"files/{file_key}/styles")
async def get_file_components(self, file_key: str) -> Dict:
"""Get components and component sets"""
data = await self._request(f"files/{file_key}/components")
return data
# =============================================================================
# CACHE MANAGER
# =============================================================================
class CacheManager:
"""Manages sync cache and incremental updates"""
def __init__(self, cache_dir: Path = CACHE_DIR):
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.manifest_path = cache_dir / "figma-sync-manifest.json"
def load_manifest(self, file_key: str) -> Optional[SyncManifest]:
"""Load cached manifest for file"""
if not self.manifest_path.exists():
return None
try:
with open(self.manifest_path) as f:
data = json.load(f)
if file_key not in data:
return None
entry = data[file_key]
return SyncManifest(
file_key=entry.get("file_key", file_key),
last_modified=entry.get("last_modified", ""),
synced_at=entry.get("synced_at", ""),
node_hashes=entry.get("node_hashes", {}),
extracted_tokens=entry.get("extracted_tokens", 0),
extracted_components=entry.get("extracted_components", 0),
validation_issues=entry.get("validation_issues", 0)
)
except (json.JSONDecodeError, KeyError):
return None
def save_manifest(self, manifest: SyncManifest):
"""Save manifest to cache"""
data = {}
if self.manifest_path.exists():
try:
with open(self.manifest_path) as f:
data = json.load(f)
except json.JSONDecodeError:
data = {}
data[manifest.file_key] = asdict(manifest)
with open(self.manifest_path, "w") as f:
json.dump(data, f, indent=2)
def should_sync(self, file_key: str, remote_modified: str) -> bool:
"""Check if sync is needed based on cache"""
manifest = self.load_manifest(file_key)
if not manifest:
return True
# Check if remote is newer
if manifest.last_modified != remote_modified:
return True
# Check cache TTL
try:
synced = datetime.fromisoformat(manifest.synced_at.replace("Z", "+00:00"))
if datetime.now(synced.tzinfo) - synced > timedelta(hours=CACHE_TTL_HOURS):
return True
except (ValueError, TypeError):
return True
return False
# =============================================================================
# DESIGN VALIDATOR
# =============================================================================
class DesignValidator:
"""Validates Figma design against DSS contract"""
def __init__(self):
self.issues: List[ValidationIssue] = []
def validate_component_name(self, name: str) -> bool:
"""Check if component name follows conventions"""
for pattern in INVALID_NAME_PATTERNS:
if pattern.match(name):
return False
return True
def validate_variant_props(self, component_name: str, variant_props: Dict) -> List[ValidationIssue]:
"""Validate variant property naming"""
issues = []
for prop_name, values in variant_props.items():
# Check for auto-generated names
if not self.validate_component_name(prop_name):
issues.append(ValidationIssue(
severity="error",
component=component_name,
message=f"Invalid variant property name: '{prop_name}'",
suggestion="Use PascalCase names like 'Size', 'Variant', 'State'"
))
# Check values
for value in values if isinstance(values, list) else [values]:
if isinstance(value, str) and not self.validate_component_name(value):
issues.append(ValidationIssue(
severity="warning",
component=component_name,
message=f"Invalid variant value: '{value}' in {prop_name}",
suggestion="Use PascalCase values like 'Large', 'Primary', 'Hover'"
))
return issues
def validate_component(self, component: Dict) -> List[ValidationIssue]:
"""Validate a component set"""
issues = []
name = component.get("name", "Unknown")
# Check component name
if not self.validate_component_name(name):
issues.append(ValidationIssue(
severity="error",
component=name,
message=f"Invalid component name: '{name}'",
suggestion="Rename to PascalCase (e.g., 'Button', 'InputField')"
))
# Check variant properties
variant_props = component.get("variant_dimensions", {})
issues.extend(self.validate_variant_props(name, variant_props))
self.issues.extend(issues)
return issues
def get_report(self) -> Dict:
"""Generate validation report"""
errors = [i for i in self.issues if i.severity == "error"]
warnings = [i for i in self.issues if i.severity == "warning"]
return {
"total_issues": len(self.issues),
"errors": len(errors),
"warnings": len(warnings),
"issues": [asdict(i) for i in self.issues],
"valid": len(errors) == 0
}
# =============================================================================
# TOKEN EXTRACTORS
# =============================================================================
class VariableExtractor:
"""Extracts tokens from Figma Variables"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
def extract(self, variables_data: Dict) -> Dict[str, W3CToken]:
"""Extract variables as W3C tokens"""
tokens = {}
meta = variables_data.get("meta", {})
variables = meta.get("variables", {})
collections = meta.get("variableCollections", {})
if self.verbose:
print(f" [VAR] Found {len(variables)} variables in {len(collections)} collections")
for var_id, var in variables.items():
name = var.get("name", "")
resolved_type = var.get("resolvedType", "")
# Get collection for namespacing
collection_id = var.get("variableCollectionId", "")
collection = collections.get(collection_id, {})
collection_name = collection.get("name", "").lower().replace(" ", "-")
# Build token path
token_path = f"{collection_name}.{name}".replace("/", ".")
token_path = self._sanitize_path(token_path)
# Get value (use first mode)
values_by_mode = var.get("valuesByMode", {})
modes = collection.get("modes", [])
if not values_by_mode or not modes:
continue
first_mode_id = modes[0].get("modeId") if modes else None
value = values_by_mode.get(first_mode_id)
if value is None:
continue
# Handle different value types
token = self._create_token(name, resolved_type, value, var_id)
if token:
tokens[token_path] = token
return tokens
def _sanitize_path(self, path: str) -> str:
"""Sanitize token path"""
return path.lower().replace(" ", "-").replace("--", "-").strip("-.")
def _create_token(self, name: str, resolved_type: str, value: Any, var_id: str) -> Optional[W3CToken]:
"""Create W3C token from Figma variable"""
extensions = {"figma": {"variableId": var_id}}
if resolved_type == "COLOR":
if isinstance(value, dict):
# Check if it's a reference
if "id" in value:
# Variable alias - preserve reference
return W3CToken(
value=f"{{var:{value['id']}}}",
type="color",
description=f"Alias to {value.get('id', '')}",
extensions={**extensions, "alias": True}
)
else:
# Direct color value
css_color = self._rgba_to_css(value)
return W3CToken(
value=css_color,
type="color",
extensions=extensions
)
elif resolved_type == "FLOAT":
if isinstance(value, (int, float)):
# Determine if spacing/dimension based on name
token_type = "dimension" if any(
x in name.lower() for x in ["spacing", "size", "width", "height", "radius", "gap"]
) else "number"
return W3CToken(
value=f"{value}px" if token_type == "dimension" else value,
type=token_type,
extensions=extensions
)
elif resolved_type == "STRING":
return W3CToken(
value=str(value),
type="string",
extensions=extensions
)
return None
def _rgba_to_css(self, color: Dict) -> str:
"""Convert Figma RGBA to CSS"""
r = int(color.get("r", 0) * 255)
g = int(color.get("g", 0) * 255)
b = int(color.get("b", 0) * 255)
a = round(color.get("a", 1), 3)
if a == 1:
return f"#{r:02x}{g:02x}{b:02x}"
return f"rgba({r}, {g}, {b}, {a})"
class StyleExtractor:
"""Extracts tokens from Figma Styles"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
def extract(self, file_data: Dict) -> Dict[str, W3CToken]:
"""Extract styles as W3C tokens"""
tokens = {}
styles = file_data.get("styles", {})
doc = file_data.get("document", {})
by_type = {"TEXT": [], "FILL": [], "EFFECT": [], "GRID": []}
for style_id, style in styles.items():
st = style.get("styleType", "OTHER")
if st in by_type:
by_type[st].append({"id": style_id, **style})
if self.verbose:
print(f" [STY] TEXT: {len(by_type['TEXT'])}, FILL: {len(by_type['FILL'])}, EFFECT: {len(by_type['EFFECT'])}")
# Extract typography
for ts in by_type["TEXT"]:
node = self._find_styled_node(doc, ts["id"])
if node and node.get("style"):
name = self._sanitize_name(ts["name"])
style_props = node["style"]
tokens[f"typography.{name}"] = W3CToken(
value={
"fontFamily": style_props.get("fontFamily", "Inter"),
"fontWeight": style_props.get("fontWeight", 400),
"fontSize": f"{round(style_props.get('fontSize', 16))}px",
"lineHeight": f"{round(style_props.get('lineHeightPx', 24))}px",
"letterSpacing": f"{round(style_props.get('letterSpacing', 0), 2)}px"
},
type="typography",
extensions={"figma": {"styleId": ts["id"]}}
)
# Extract effects (shadows)
for es in by_type["EFFECT"]:
node = self._find_styled_node(doc, es["id"])
if node and node.get("effects"):
name = self._sanitize_name(es["name"])
css_shadow = self._effects_to_css(node["effects"])
tokens[f"shadow.{name}"] = W3CToken(
value=css_shadow,
type="shadow",
extensions={"figma": {"styleId": es["id"]}}
)
# Extract fill colors
for fs in by_type["FILL"]:
node = self._find_styled_node(doc, fs["id"])
if node and node.get("fills"):
name = self._sanitize_name(fs["name"])
fills = node["fills"]
if fills and fills[0].get("type") == "SOLID":
color = fills[0].get("color", {})
css_color = self._rgba_to_css(color)
tokens[f"color.{name}"] = W3CToken(
value=css_color,
type="color",
extensions={"figma": {"styleId": fs["id"]}}
)
return tokens
def _find_styled_node(self, node: Dict, target_style_id: str, depth: int = 0) -> Optional[Dict]:
"""Find node using a style"""
if depth > 20:
return None
for role, sid in node.get("styles", {}).items():
if sid == target_style_id:
return node
for child in node.get("children", []):
result = self._find_styled_node(child, target_style_id, depth + 1)
if result:
return result
return None
def _sanitize_name(self, name: str) -> str:
"""Sanitize style name"""
return name.lower().replace("/", ".").replace(" ", "-").replace("--", "-")
def _rgba_to_css(self, color: Dict) -> str:
"""Convert Figma RGBA to CSS"""
r = int(color.get("r", 0) * 255)
g = int(color.get("g", 0) * 255)
b = int(color.get("b", 0) * 255)
a = round(color.get("a", 1), 3)
if a == 1:
return f"#{r:02x}{g:02x}{b:02x}"
return f"rgba({r}, {g}, {b}, {a})"
def _effects_to_css(self, effects: List[Dict]) -> str:
"""Convert Figma effects to CSS box-shadow"""
shadows = []
for effect in effects:
if not effect.get("visible", True):
continue
etype = effect.get("type", "")
if etype in ("DROP_SHADOW", "INNER_SHADOW"):
color = self._rgba_to_css(effect.get("color", {}))
offset = effect.get("offset", {})
x = offset.get("x", 0)
y = offset.get("y", 0)
radius = effect.get("radius", 0)
spread = effect.get("spread", 0)
prefix = "inset " if etype == "INNER_SHADOW" else ""
shadows.append(f"{prefix}{x}px {y}px {radius}px {spread}px {color}")
return ", ".join(shadows) if shadows else "none"
class ComponentExtractor:
"""Extracts components with intelligent variant classification"""
def __init__(self, validator: DesignValidator, verbose: bool = False):
self.validator = validator
self.verbose = verbose
def extract(self, file_data: Dict) -> Dict:
"""Extract components with variant classification"""
component_sets = file_data.get("componentSets", {})
components = file_data.get("components", {})
registry = {
"file_name": file_data.get("name", "Unknown"),
"extracted_at": datetime.now().isoformat(),
"component_count": 0,
"components": {}
}
# Build component set map
set_map = {}
for set_id, set_data in component_sets.items():
set_name = set_data.get("name", "Unknown")
set_map[set_id] = {
"id": set_id,
"name": set_name,
"key": set_data.get("key", ""),
"description": set_data.get("description", ""),
"variants": [],
"variant_dimensions": {},
"props": {}, # Visual props for Storybook
"states": {} # Interaction states (CSS only)
}
# Assign components to sets and extract variants
for comp_id, comp_data in components.items():
set_id = comp_data.get("componentSetId")
if not set_id or set_id not in set_map:
continue
variant_name = comp_data.get("name", "")
variant_props = self._parse_variant_name(variant_name)
set_map[set_id]["variants"].append({
"id": comp_id,
"name": variant_name,
"props": variant_props
})
# Build dimensions
for prop_name, prop_value in variant_props.items():
if prop_name not in set_map[set_id]["variant_dimensions"]:
set_map[set_id]["variant_dimensions"][prop_name] = set()
set_map[set_id]["variant_dimensions"][prop_name].add(prop_value)
# Classify variants and validate
for set_id, set_data in set_map.items():
# Convert sets to lists
for dim_name in set_data["variant_dimensions"]:
values = sorted(set_data["variant_dimensions"][dim_name])
set_data["variant_dimensions"][dim_name] = values
# Classify as prop or state
if dim_name in VISUAL_PROPS or dim_name in BOOLEAN_PROPS:
set_data["props"][dim_name] = {
"values": values,
"type": "boolean" if dim_name in BOOLEAN_PROPS else "select",
"default": values[0] if values else None
}
elif dim_name in INTERACTION_STATES:
set_data["states"][dim_name] = {
"values": values,
"css_pseudo": self._get_css_pseudo(dim_name)
}
else:
# Unknown - treat as prop with warning
set_data["props"][dim_name] = {
"values": values,
"type": "select",
"default": values[0] if values else None
}
# Validate
self.validator.validate_component(set_data)
# Add to registry
registry["components"][set_data["name"]] = set_data
registry["component_count"] += 1
if self.verbose:
print(f" [CMP] Extracted {registry['component_count']} component sets")
return registry
def _parse_variant_name(self, name: str) -> Dict[str, str]:
"""Parse 'Prop=Value, Prop2=Value2' format"""
props = {}
for part in name.split(", "):
if "=" in part:
key, value = part.split("=", 1)
props[key.strip()] = value.strip()
return props
def _get_css_pseudo(self, state_name: str) -> str:
"""Map state to CSS pseudo-class"""
mapping = {
"Hover": ":hover",
"Focused": ":focus",
"Focus": ":focus",
"Pressed": ":active",
"Active": ":active",
"Disabled": ":disabled",
"State": "" # Generic, needs value check
}
return mapping.get(state_name, "")
# =============================================================================
# TRANSLATION LAYER
# =============================================================================
class FigmaToDSSTranslator:
"""Translates Figma tokens to DSS canonical format"""
# DSS canonical token categories
DSS_CATEGORIES = ["color", "spacing", "typography", "shadow", "border", "radius", "motion", "opacity"]
def __init__(self, verbose: bool = False):
self.verbose = verbose
def translate(self, variable_tokens: Dict, style_tokens: Dict) -> Dict:
"""Merge and translate tokens to W3C format with DSS structure"""
output = {
"$schema": "https://design-tokens.org/schema.json",
"_meta": {
"generator": "dss-figma-sync",
"version": "2.0.0",
"generated": datetime.now().isoformat()
}
}
# Merge tokens (variables take precedence for semantic tokens)
all_tokens = {**style_tokens, **variable_tokens}
# Organize by DSS category
categorized = {cat: {} for cat in self.DSS_CATEGORIES}
for path, token in all_tokens.items():
category = path.split(".")[0] if "." in path else "other"
# Map to DSS category
dss_category = self._map_category(category, token.type)
if dss_category in categorized:
# Use rest of path as token name
token_name = ".".join(path.split(".")[1:]) if "." in path else path
categorized[dss_category][token_name] = token.to_dict()
# Add to output
for category, tokens in categorized.items():
if tokens:
output[category] = tokens
return output
def _map_category(self, figma_category: str, token_type: str) -> str:
"""Map Figma category to DSS canonical"""
category_map = {
"color": "color",
"colours": "color",
"colors": "color",
"spacing": "spacing",
"space": "spacing",
"size": "spacing",
"typography": "typography",
"text": "typography",
"font": "typography",
"shadow": "shadow",
"elevation": "shadow",
"effect": "shadow",
"border": "border",
"stroke": "border",
"radius": "radius",
"corner": "radius",
"motion": "motion",
"animation": "motion",
"duration": "motion",
"opacity": "opacity",
}
return category_map.get(figma_category.lower(), "color" if token_type == "color" else "spacing")
# =============================================================================
# OUTPUT WRITERS
# =============================================================================
class OutputWriter:
"""Writes extraction results to DSS structure"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
def write_tokens(self, tokens: Dict, output_dir: Path = TOKENS_DIR):
"""Write W3C tokens to file"""
output_dir.mkdir(parents=True, exist_ok=True)
# Main tokens file
tokens_file = output_dir / "figma-tokens.json"
with open(tokens_file, "w") as f:
json.dump(tokens, f, indent=2)
print(f" [OUT] Tokens: {tokens_file}")
# Style-dictionary compatible (flat structure)
sd_tokens = self._flatten_for_style_dictionary(tokens)
sd_file = output_dir / "tokens.json"
with open(sd_file, "w") as f:
json.dump(sd_tokens, f, indent=2)
print(f" [OUT] Style-dictionary: {sd_file}")
def write_components(self, components: Dict, output_dir: Path = COMPONENTS_DIR):
"""Write component registry"""
output_dir.mkdir(parents=True, exist_ok=True)
comp_file = output_dir / "figma-registry.json"
with open(comp_file, "w") as f:
json.dump(components, f, indent=2)
print(f" [OUT] Components: {comp_file}")
def write_validation_report(self, report: Dict, output_dir: Path = CACHE_DIR):
"""Write validation report"""
output_dir.mkdir(parents=True, exist_ok=True)
report_file = output_dir / "figma-lint-report.json"
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
print(f" [OUT] Validation: {report_file}")
def _flatten_for_style_dictionary(self, tokens: Dict) -> Dict:
"""Convert W3C format to style-dictionary format"""
result = {}
for category, cat_tokens in tokens.items():
if category.startswith("$") or category.startswith("_"):
continue
result[category] = {}
for name, token in cat_tokens.items():
if isinstance(token, dict) and "$value" in token:
result[category][name] = {"value": token["$value"]}
elif isinstance(token, dict):
result[category][name] = token
return result
# =============================================================================
# MAIN SYNC ORCHESTRATOR
# =============================================================================
async def intelligent_sync(file_key: str, token: str, force: bool = False, verbose: bool = False) -> bool:
"""Main sync orchestration"""
cache_manager = CacheManager()
validator = DesignValidator()
writer = OutputWriter(verbose=verbose)
async with IntelligentFigmaClient(token, verbose=verbose) as client:
# Step 1: Check cache
print("\n[1/5] Checking cache...")
try:
meta = await client.get_file_meta(file_key)
remote_modified = meta.get("lastModified", "")
file_name = meta.get("name", "Unknown")
if not force and not cache_manager.should_sync(file_key, remote_modified):
print(f" [SKIP] File unchanged since last sync")
print(f" Use --force to sync anyway")
return True
print(f" [OK] File: {file_name}")
print(f" [OK] Modified: {remote_modified}")
except Exception as e:
print(f" [ERROR] Failed to check file: {e}")
return False
# Step 2: Fetch data
print("\n[2/5] Fetching Figma data...")
try:
# Parallel fetches
file_task = client.get_file(file_key)
vars_task = client.get_file_variables(file_key)
file_data = await file_task
try:
vars_data = await vars_task
except Exception as ve:
print(f" [WARN] Variables API unavailable: {ve}")
vars_data = {"meta": {"variables": {}, "variableCollections": {}}}
print(f" [OK] File fetched")
print(f" [OK] Styles: {len(file_data.get('styles', {}))}")
print(f" [OK] Components: {len(file_data.get('components', {}))}")
print(f" [OK] Variables: {len(vars_data.get('meta', {}).get('variables', {}))}")
except Exception as e:
print(f" [ERROR] Failed to fetch: {e}")
return False
# Step 3: Extract tokens
print("\n[3/5] Extracting tokens...")
var_extractor = VariableExtractor(verbose=verbose)
style_extractor = StyleExtractor(verbose=verbose)
comp_extractor = ComponentExtractor(validator, verbose=verbose)
variable_tokens = var_extractor.extract(vars_data)
style_tokens = style_extractor.extract(file_data)
components = comp_extractor.extract(file_data)
print(f" [OK] Variable tokens: {len(variable_tokens)}")
print(f" [OK] Style tokens: {len(style_tokens)}")
print(f" [OK] Components: {components['component_count']}")
# Step 4: Translate to DSS format
print("\n[4/5] Translating to DSS format...")
translator = FigmaToDSSTranslator(verbose=verbose)
w3c_tokens = translator.translate(variable_tokens, style_tokens)
token_count = sum(
len(v) for k, v in w3c_tokens.items()
if not k.startswith("$") and not k.startswith("_")
)
print(f" [OK] W3C tokens: {token_count}")
# Step 5: Validate and write
print("\n[5/5] Writing output...")
validation_report = validator.get_report()
writer.write_tokens(w3c_tokens)
writer.write_components(components)
writer.write_validation_report(validation_report)
# Update cache manifest
manifest = SyncManifest(
file_key=file_key,
last_modified=remote_modified,
synced_at=datetime.now().isoformat(),
extracted_tokens=token_count,
extracted_components=components["component_count"],
validation_issues=validation_report["total_issues"]
)
cache_manager.save_manifest(manifest)
# Summary
print("\n" + "=" * 60)
print("SYNC COMPLETE")
print("=" * 60)
print(f" File: {file_name}")
print(f" Tokens: {token_count}")
print(f" Components: {components['component_count']}")
print(f" Issues: {validation_report['errors']} errors, {validation_report['warnings']} warnings")
if validation_report["errors"] > 0:
print("\n [WARN] Design validation errors found!")
print(" Check: .dss/cache/figma-lint-report.json")
return True
# =============================================================================
# CLI
# =============================================================================
def load_config():
"""Load Figma config"""
config_path = DSS_ROOT / ".dss/config/figma.json"
if config_path.exists():
with open(config_path) as f:
return json.load(f)
return {}
def main():
import argparse
parser = argparse.ArgumentParser(description="DSS Intelligent Figma Sync")
parser.add_argument("--file-key", help="Figma file key")
parser.add_argument("--force", action="store_true", help="Force sync even if cached")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
# Get file key
file_key = args.file_key
if not file_key:
config = load_config()
uikit = config.get("uikit_reference", {})
file_key = uikit.get("file_key")
if not file_key:
print("[ERROR] No Figma file key provided")
print(" Usage: python3 scripts/figma-sync.py --file-key KEY")
print(" Or add to .dss/config/figma.json")
sys.exit(1)
# Get token
token = os.environ.get("FIGMA_TOKEN")
if not token:
config = load_config()
token = config.get("token")
if not token:
print("[ERROR] No Figma token found")
print(" Set FIGMA_TOKEN env var or add to .dss/config/figma.json")
sys.exit(1)
print("╔══════════════════════════════════════════════════════════════╗")
print("║ DSS INTELLIGENT FIGMA SYNC v2.0 ║")
print("╚══════════════════════════════════════════════════════════════╝")
print(f" File: {file_key}")
print(f" Token: {token[:10]}...")
print(f" Force: {args.force}")
# Run sync
success = asyncio.run(intelligent_sync(
file_key=file_key,
token=token,
force=args.force,
verbose=args.verbose
))
if success:
print("\n[OK] Sync successful!")
print(" Next: Run scripts/generate-storybook.py")
sys.exit(0)
else:
print("\n[ERROR] Sync failed")
sys.exit(1)
if __name__ == "__main__":
main()