Files
dss/.dss/doc-sync/generators/base_generator.py
Digital Production Factory 276ed71f31 Initial commit: Clean DSS implementation
Migrated from design-system-swarm with fresh git history.
Old project history preserved in /home/overbits/apps/design-system-swarm

Core components:
- MCP Server (Python FastAPI with mcp 1.23.1)
- Claude Plugin (agents, commands, skills, strategies, hooks, core)
- DSS Backend (dss-mvp1 - token translation, Figma sync)
- Admin UI (Node.js/React)
- Server (Node.js/Express)
- Storybook integration (dss-mvp1/.storybook)

Self-contained configuration:
- All paths relative or use DSS_BASE_PATH=/home/overbits/dss
- PYTHONPATH configured for dss-mvp1 and dss-claude-plugin
- .env file with all configuration
- Claude plugin uses ${CLAUDE_PLUGIN_ROOT} for portability

Migration completed: $(date)
🤖 Clean migration with full functionality preserved
2025-12-09 18:45:48 -03:00

192 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Base Documentation Generator
Abstract base class for all documentation generators.
"""
import json
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DocGenerator(ABC):
"""
Abstract base class for documentation generators.
Subclasses must implement:
- extract(): Extract data from source file
- transform(): Transform extracted data to target schema
- load(): Write transformed data to target file
"""
def __init__(self, project_root: Path):
"""
Initialize generator.
Args:
project_root: Project root directory
"""
self.project_root = Path(project_root)
self.metadata = {
"generator": self.__class__.__name__,
"generated_at": None,
"source_files": [],
"version": "1.0.0"
}
@abstractmethod
def extract(self, source_path: Path) -> Dict[str, Any]:
"""
Extract data from source file.
Args:
source_path: Path to source file
Returns:
Extracted data dictionary
"""
pass
@abstractmethod
def transform(self, extracted_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform extracted data to target schema.
Args:
extracted_data: Raw extracted data
Returns:
Transformed data matching target schema
"""
pass
def load(self, transformed_data: Dict[str, Any], target_path: Path) -> None:
"""
Write transformed data to target file.
Args:
transformed_data: Data to write
target_path: Target file path
"""
target_path.parent.mkdir(parents=True, exist_ok=True)
# Backup existing file if it exists
if target_path.exists():
backup_dir = self.project_root / ".dss" / "backups" / "knowledge"
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{target_path.stem}_{timestamp}.json"
with open(target_path, 'r') as f:
backup_data = f.read()
with open(backup_path, 'w') as f:
f.write(backup_data)
logger.info(f"Backed up {target_path} to {backup_path}")
# Write new data
with open(target_path, 'w') as f:
json.dump(transformed_data, f, indent=2)
logger.info(f"Generated documentation: {target_path}")
def run(self, source_path: Path, target_path: Path) -> Dict[str, Any]:
"""
Execute full ETL pipeline: Extract → Transform → Load
Args:
source_path: Source file to extract from
target_path: Target file to write to
Returns:
Generated documentation data
"""
logger.info(f"Running {self.__class__.__name__}: {source_path}{target_path}")
# Extract
extracted_data = self.extract(source_path)
self.metadata["source_files"].append(str(source_path))
# Transform
transformed_data = self.transform(extracted_data)
# Add metadata
self.metadata["generated_at"] = datetime.now().isoformat()
transformed_data["_metadata"] = self.metadata
# Load
self.load(transformed_data, target_path)
return transformed_data
def validate_json_schema(self, data: Dict[str, Any], schema: Dict[str, Any]) -> bool:
"""
Validate data against JSON schema.
Args:
data: Data to validate
schema: JSON schema
Returns:
True if valid, False otherwise
"""
try:
import jsonschema
jsonschema.validate(instance=data, schema=schema)
return True
except ImportError:
logger.warning("jsonschema not installed, skipping validation")
return True
except jsonschema.ValidationError as e:
logger.error(f"Schema validation failed: {e}")
return False
def read_existing_target(self, target_path: Path) -> Optional[Dict[str, Any]]:
"""
Read existing target file if it exists.
Args:
target_path: Target file path
Returns:
Existing data or None
"""
if not target_path.exists():
return None
try:
with open(target_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to read existing target {target_path}: {e}")
return None
def merge_with_existing(
self,
new_data: Dict[str, Any],
existing_data: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Merge new data with existing data (incremental update).
Args:
new_data: New extracted data
existing_data: Existing data from target file
Returns:
Merged data
"""
if not existing_data:
return new_data
# Default: Replace completely
# Subclasses can override for smarter merging
return new_data