#!/usr/bin/env python3 """ DSS Pre-Commit Hook Enforces DSS architectural guardrails before allowing commits Validators: 1. Immutable file protection 2. Temp folder discipline 3. Schema validation 4. Terminology checks 5. Audit logging """ import json import os import re import subprocess import sys from datetime import datetime from pathlib import Path # Configuration DSS_ROOT = Path("/home/overbits/dss") IMMUTABLE_FILES = [ ".dss/schema/*.schema.json", ".dss-boundaries.yaml", "API_SPECIFICATION_IMMUTABLE.md", "dss-claude-plugin/.mcp.json", "dss-mvp1/dss/validators/schema.py", ] AUDIT_LOG = DSS_ROOT / ".dss/logs/git-hooks.jsonl" TEMP_DIR = DSS_ROOT / ".dss/temp" class Colors: RED = "\033[0;31m" GREEN = "\033[0;32m" YELLOW = "\033[1;33m" NC = "\033[0m" # No Color def log_audit(validator, status, details): """Log hook events to audit trail""" AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True) log_entry = { "timestamp": datetime.utcnow().isoformat(), "hook": "pre-commit", "validator": validator, "status": status, "details": details, } with open(AUDIT_LOG, "a") as f: f.write(json.dumps(log_entry) + "\n") def get_staged_files(): """Get list of staged files""" result = subprocess.run( ["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"], capture_output=True, text=True, cwd=DSS_ROOT, ) return [Path(f) for f in result.stdout.strip().split("\n") if f] def check_immutable_files(staged_files): """Validate that immutable files are not modified""" from fnmatch import fnmatch violations = [] for file_path in staged_files: for pattern in IMMUTABLE_FILES: if fnmatch(str(file_path), pattern): # Only block if file exists in last commit (modification, not addition) result = subprocess.run( ["git", "ls-tree", "--name-only", "HEAD", str(file_path)], capture_output=True, text=True, cwd=DSS_ROOT, ) if result.stdout.strip(): # File exists in HEAD violations.append(str(file_path)) if violations: # Check for bypass via environment variable or commit message bypass = os.environ.get("DSS_IMMUTABLE_BYPASS") == "1" if not bypass: # Try to get commit message from various sources commit_msg_file = DSS_ROOT / ".git/COMMIT_EDITMSG" if commit_msg_file.exists(): commit_msg = commit_msg_file.read_text() if "[IMMUTABLE-UPDATE]" in commit_msg: bypass = True log_audit( "immutable_files", "bypass", { "files": violations, "commit_message": commit_msg.split("\n")[0], "method": "commit_message", }, ) if bypass: log_audit( "immutable_files", "bypass", {"files": violations, "method": "environment_variable"} ) if not bypass: print(f"{Colors.RED}✗ IMMUTABLE FILE VIOLATION{Colors.NC}") print("\nThe following protected files cannot be modified:") for v in violations: print(f" - {v}") print("\nTo update immutable files:") print(" 1. Use commit message: [IMMUTABLE-UPDATE] Reason for change") print(" 2. Include justification in commit body") print("\nProtected files:") for pattern in IMMUTABLE_FILES: print(f" - {pattern}") log_audit("immutable_files", "rejected", {"files": violations}) return False log_audit("immutable_files", "passed", {"files_checked": len(staged_files)}) return True def check_temp_folder(staged_files): """Validate that temp files are only in .dss/temp/""" violations = [] # Patterns that indicate temp files temp_patterns = [ r".*\.tmp$", r".*\.temp$", r".*~$", r".*\.swp$", r".*\.swo$", r".*\.backup$", r".*\.bak$", r"^temp/", r"^tmp/", r"^scratch/", ] for file_path in staged_files: file_str = str(file_path) # Check if it matches temp patterns but is NOT in .dss/temp/ if any(re.match(pattern, file_str) for pattern in temp_patterns): if not file_str.startswith(".dss/temp/"): violations.append(file_str) if violations: print(f"{Colors.RED}✗ TEMP FOLDER VIOLATION{Colors.NC}") print("\nTemp files must be created in .dss/temp/ only:") for v in violations: print(f" - {v}") print("\nAll temporary files MUST go in: .dss/temp/[session-id]/") print("Use the get_temp_dir() helper function.") log_audit("temp_folder", "rejected", {"files": violations}) return False log_audit("temp_folder", "passed", {"files_checked": len(staged_files)}) return True def check_schemas(staged_files): """Validate JSON and YAML schemas""" violations = [] for file_path in staged_files: if file_path.suffix in [".json", ".yaml", ".yml"]: full_path = DSS_ROOT / file_path try: if file_path.suffix == ".json": with open(full_path) as f: json.load(f) elif file_path.suffix in [".yaml", ".yml"]: try: import yaml with open(full_path) as f: yaml.safe_load(f) except ImportError: # YAML not available, skip validation continue except Exception as e: violations.append({"file": str(file_path), "error": str(e)}) if violations: print(f"{Colors.RED}✗ SCHEMA VALIDATION FAILED{Colors.NC}") print("\nInvalid JSON/YAML files:") for v in violations: print(f" - {v['file']}") print(f" Error: {v['error']}") log_audit("schema_validation", "rejected", {"violations": violations}) return False log_audit("schema_validation", "passed", {"files_checked": len(staged_files)}) return True def check_documentation(staged_files): """Check that new implementations have documentation""" violations = [] warnings = [] # Track new Python files that need docstrings python_files = [f for f in staged_files if f.suffix == ".py"] for file_path in python_files: full_path = DSS_ROOT / file_path if not full_path.exists(): continue try: content = full_path.read_text() # Check for classes without docstrings class_pattern = r'class\s+(\w+)[^:]*:\s*\n\s*(?!""")' missing_class_docs = re.findall(class_pattern, content) # Check for public functions without docstrings (not starting with _) func_pattern = r'def\s+([a-zA-Z][^_][^(]*)\([^)]*\):\s*\n\s*(?!""")' missing_func_docs = re.findall(func_pattern, content) if missing_class_docs: warnings.append( { "file": str(file_path), "type": "class", "items": missing_class_docs[:5], # Limit to first 5 } ) if missing_func_docs: warnings.append( { "file": str(file_path), "type": "function", "items": missing_func_docs[:5], # Limit to first 5 } ) except Exception: continue # Check if significant code changes have knowledge updates code_extensions = [".py", ".ts", ".tsx", ".js", ".jsx"] code_files_changed = [f for f in staged_files if f.suffix in code_extensions] knowledge_files_changed = [f for f in staged_files if ".knowledge" in str(f)] # If many code files changed but no knowledge updates, warn if len(code_files_changed) > 5 and len(knowledge_files_changed) == 0: warnings.append( { "file": "general", "type": "knowledge", "items": [ f"Changed {len(code_files_changed)} code files but no .knowledge/ updates" ], } ) if warnings: print(f"{Colors.YELLOW}⚠ DOCUMENTATION WARNING{Colors.NC}") print("\nMissing documentation found (non-blocking):") for w in warnings: if w["type"] == "class": print(f" - {w['file']}: Classes without docstrings: {', '.join(w['items'])}") elif w["type"] == "function": print(f" - {w['file']}: Functions without docstrings: {', '.join(w['items'])}") elif w["type"] == "knowledge": print(f" - {w['items'][0]}") print("\n Tip: Add docstrings to new classes/functions") print(" Tip: Update .knowledge/ files when adding major features\n") log_audit("documentation", "warning", {"warnings": warnings}) else: log_audit("documentation", "passed", {"files_checked": len(staged_files)}) # Always return True (warnings only) - change to False to make blocking return True def check_terminology(staged_files): """Check for deprecated terminology (warn only)""" warnings = [] deprecated_terms = { "swarm": "Design System Server / DSS", "organism": "component", } for file_path in staged_files: # Only check text files if file_path.suffix in [".py", ".js", ".ts", ".md", ".txt", ".json", ".yaml", ".yml"]: full_path = DSS_ROOT / file_path try: content = full_path.read_text() for old_term, new_term in deprecated_terms.items(): if re.search(rf"\b{old_term}\b", content, re.IGNORECASE): warnings.append( {"file": str(file_path), "term": old_term, "suggested": new_term} ) except: # Skip binary or unreadable files continue if warnings: print(f"{Colors.YELLOW}⚠ TERMINOLOGY WARNING{Colors.NC}") print("\nDeprecated terminology found (non-blocking):") for w in warnings: print(f" - {w['file']}: '{w['term']}' → use '{w['suggested']}'") print() log_audit("terminology", "warning", {"warnings": warnings}) else: log_audit("terminology", "passed", {"files_checked": len(staged_files)}) # Always return True (warnings only) return True def main(): """Run all validators""" print(f"{Colors.GREEN}Running DSS pre-commit validations...{Colors.NC}\n") staged_files = get_staged_files() if not staged_files: print("No files to validate.") return 0 validators = [ ("Immutable File Protection", check_immutable_files), ("Temp Folder Discipline", check_temp_folder), ("Schema Validation", check_schemas), ("Documentation Check", check_documentation), ("Terminology Check", check_terminology), ] results = [] for name, validator in validators: print(f"• {name}...", end=" ") result = validator(staged_files) results.append(result) if result: print(f"{Colors.GREEN}✓{Colors.NC}") else: print(f"{Colors.RED}✗{Colors.NC}") print() if all(results): print(f"\n{Colors.GREEN}✓ All validations passed{Colors.NC}") log_audit("pre_commit", "success", {"files": len(staged_files)}) return 0 else: print(f"\n{Colors.RED}✗ Pre-commit validation failed{Colors.NC}") print("Fix the issues above and try again.\n") log_audit("pre_commit", "failed", {"files": len(staged_files)}) return 1 if __name__ == "__main__": sys.exit(main())