"""Smart merge strategy for .dss imports with conflict detection""" import hashlib from datetime import datetime from typing import Dict, Any, List, Optional, Tuple, Literal from dataclasses import dataclass from enum import Enum from .security import TimestampConflictResolver from ..models.project import Project from ..models.theme import DesignToken from ..models.component import Component from dss.storage.json_store import Projects, Components, Tokens MergeStrategy = Literal["overwrite", "keep_local", "fork", "skip"] class ConflictResolutionMode(str, Enum): """How to handle conflicts during merge""" OVERWRITE = "overwrite" # Import wins KEEP_LOCAL = "keep_local" # Local wins FORK = "fork" # Create duplicate with new UUID MANUAL = "manual" # Require user decision @dataclass class ConflictItem: """Detected conflict""" uuid: str entity_type: str # token, component, theme entity_name: str local_updated_at: datetime imported_updated_at: datetime local_hash: str imported_hash: str is_modified_both: bool # True if changed in both places @property def local_is_newer(self) -> bool: """Is local version newer?""" return self.local_updated_at > self.imported_updated_at @property def imported_is_newer(self) -> bool: """Is imported version newer?""" return self.imported_updated_at > self.local_updated_at @property def is_identical(self) -> bool: """Are both versions identical?""" return self.local_hash == self.imported_hash def get_safe_recommendation(self, allow_drift_detection: bool = True) -> Tuple[str, Optional[str]]: """Get safe conflict resolution recommendation with clock skew detection. Uses TimestampConflictResolver to safely determine winner, accounting for possible clock drift between systems. Args: allow_drift_detection: If True, warn about possible clock skew Returns: Tuple of (recommended_winner: 'local'|'imported'|'unknown', warning: str|None) """ resolver = TimestampConflictResolver() return resolver.resolve_conflict( self.local_updated_at, self.imported_updated_at, allow_drift_detection=allow_drift_detection, ) @dataclass class MergeAnalysis: """Analysis of merge operation""" new_items: Dict[str, List[str]] # type -> [names] updated_items: Dict[str, List[str]] # type -> [names] conflicted_items: List[ConflictItem] total_changes: int @property def has_conflicts(self) -> bool: """Are there conflicts?""" return len(self.conflicted_items) > 0 class UUIDHashMap: """Maps UUIDs to content hashes for detecting changes""" def __init__(self): self.hashes: Dict[str, str] = {} @staticmethod def hash_token(token: DesignToken) -> str: """Generate stable hash of token content (excludes UUID, timestamps)""" content = f"{token.name}:{token.value}:{token.type}:{token.category}:{token.description}:{token.source}:{token.deprecated}" return hashlib.sha256(content.encode()).hexdigest() @staticmethod def hash_component(component: Component) -> str: """Generate stable hash of component content""" import json content = json.dumps( { "name": component.name, "source": component.source, "description": component.description, "variants": component.variants, "props": component.props, "dependencies": sorted(component.dependencies), }, sort_keys=True, ) return hashlib.sha256(content.encode()).hexdigest() def add_token(self, token: DesignToken): """Add token to hash map""" self.hashes[token.uuid] = self.hash_token(token) def add_component(self, component: Component): """Add component to hash map""" self.hashes[component.uuid] = self.hash_component(component) def get(self, uuid: str) -> Optional[str]: """Get hash for UUID""" return self.hashes.get(uuid) class SmartMerger: """Intelligent merge strategy for archives""" def __init__(self, local_project: Project, imported_project: Project): self.local_project = local_project self.imported_project = imported_project def analyze_merge(self) -> MergeAnalysis: """ Analyze what would happen in a merge without modifying anything Returns: MergeAnalysis with new, updated, and conflicted items """ new_items: Dict[str, List[str]] = { "tokens": [], "components": [], "themes": [], } updated_items: Dict[str, List[str]] = { "tokens": [], "components": [], "themes": [], } conflicts = [] # Build local UUID maps local_token_uuids = {t.uuid: t for t in self.local_project.theme.tokens.values()} local_component_uuids = {c.uuid: c for c in self.local_project.components} # Check imported tokens for token_name, imported_token in self.imported_project.theme.tokens.items(): if imported_token.uuid not in local_token_uuids: new_items["tokens"].append(token_name) else: local_token = local_token_uuids[imported_token.uuid] if local_token != imported_token: # Detect conflict conflict = self._detect_token_conflict( imported_token.uuid, local_token, imported_token, ) if conflict: conflicts.append(conflict) else: updated_items["tokens"].append(token_name) # Check imported components for imported_comp in self.imported_project.components: if imported_comp.uuid not in local_component_uuids: new_items["components"].append(imported_comp.name) else: local_comp = local_component_uuids[imported_comp.uuid] if local_comp != imported_comp: conflict = self._detect_component_conflict( imported_comp.uuid, local_comp, imported_comp, ) if conflict: conflicts.append(conflict) else: updated_items["components"].append(imported_comp.name) total_changes = ( len(new_items["tokens"]) + len(new_items["components"]) + len(updated_items["tokens"]) + len(updated_items["components"]) + len(conflicts) ) return MergeAnalysis( new_items=new_items, updated_items=updated_items, conflicted_items=conflicts, total_changes=total_changes, ) def merge_with_strategy( self, conflict_handler: ConflictResolutionMode = ConflictResolutionMode.OVERWRITE, ) -> Project: """ Perform merge with specified conflict strategy Args: conflict_handler: How to handle conflicts Returns: Merged project """ analysis = self.analyze_merge() # Create copy of local project merged_project = self.local_project.model_copy(deep=True) # Apply new tokens for token_name in analysis.new_items["tokens"]: if token_name in self.imported_project.theme.tokens: imported_token = self.imported_project.theme.tokens[token_name] merged_project.theme.tokens[token_name] = imported_token.model_copy() # Apply updated tokens for token_name in analysis.updated_items["tokens"]: if token_name in self.imported_project.theme.tokens: imported_token = self.imported_project.theme.tokens[token_name] merged_project.theme.tokens[token_name] = imported_token.model_copy() # Apply new components for comp in self.imported_project.components: if not any(c.uuid == comp.uuid for c in merged_project.components): merged_project.components.append(comp.model_copy()) # Apply updated components for comp in self.imported_project.components: for i, local_comp in enumerate(merged_project.components): if local_comp.uuid == comp.uuid: merged_project.components[i] = comp.model_copy() break # Handle conflicts based on strategy for conflict in analysis.conflicted_items: self._resolve_conflict( merged_project, conflict, conflict_handler, ) return merged_project def _detect_token_conflict( self, token_uuid: str, local_token: DesignToken, imported_token: DesignToken, ) -> Optional[ConflictItem]: """Check if token versions conflict""" local_hash = UUIDHashMap.hash_token(local_token) imported_hash = UUIDHashMap.hash_token(imported_token) # No conflict if identical if local_hash == imported_hash: return None # Conflict detected return ConflictItem( uuid=token_uuid, entity_type="token", entity_name=local_token.name, local_updated_at=local_token.updated_at, imported_updated_at=imported_token.updated_at, local_hash=local_hash, imported_hash=imported_hash, is_modified_both=True, ) def _detect_component_conflict( self, comp_uuid: str, local_comp: Component, imported_comp: Component, ) -> Optional[ConflictItem]: """Check if component versions conflict""" local_hash = UUIDHashMap.hash_component(local_comp) imported_hash = UUIDHashMap.hash_component(imported_comp) # No conflict if identical if local_hash == imported_hash: return None # Conflict detected return ConflictItem( uuid=comp_uuid, entity_type="component", entity_name=local_comp.name, local_updated_at=local_comp.updated_at if hasattr(local_comp, 'updated_at') else datetime.utcnow(), imported_updated_at=imported_comp.updated_at if hasattr(imported_comp, 'updated_at') else datetime.utcnow(), local_hash=local_hash, imported_hash=imported_hash, is_modified_both=True, ) def _resolve_conflict( self, project: Project, conflict: ConflictItem, strategy: ConflictResolutionMode, ): """Apply conflict resolution strategy""" if strategy == ConflictResolutionMode.OVERWRITE: # Import wins - already applied pass elif strategy == ConflictResolutionMode.KEEP_LOCAL: # Undo the import if conflict.entity_type == "token": # Find and restore local token local_token = next( (t for t in self.local_project.theme.tokens.values() if t.uuid == conflict.uuid), None, ) if local_token: project.theme.tokens[local_token.name] = local_token.model_copy() elif conflict.entity_type == "component": local_comp = next( (c for c in self.local_project.components if c.uuid == conflict.uuid), None, ) if local_comp: for i, comp in enumerate(project.components): if comp.uuid == conflict.uuid: project.components[i] = local_comp.model_copy() break elif strategy == ConflictResolutionMode.FORK: # Create new item with new UUID from uuid import uuid4 if conflict.entity_type == "token": imported_token = next( (t for t in self.imported_project.theme.tokens.values() if t.uuid == conflict.uuid), None, ) if imported_token: forked = imported_token.model_copy() forked.uuid = str(uuid4()) project.theme.tokens[f"{imported_token.name}_imported"] = forked elif conflict.entity_type == "component": imported_comp = next( (c for c in self.imported_project.components if c.uuid == conflict.uuid), None, ) if imported_comp: forked = imported_comp.model_copy() forked.uuid = str(uuid4()) forked.name = f"{imported_comp.name}_imported" project.components.append(forked) # Export __all__ = [ "SmartMerger", "ConflictResolutionMode", "ConflictItem", "MergeAnalysis", "UUIDHashMap", ]