""" DSS Local Analysis Cache Module. Handles reading and writing to the local .dss/ folder for developer workstation mode. Provides offline-capable validation using cached analysis results. Enterprise Architecture: - LOCAL mode reads from .dss/cache/ for fast, offline-capable feedback - Cache is populated by `dss-rules validate` or periodic sync - Stale cache shows warnings but doesn't block (advisory mode) """ import json import logging import os from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # Cache file names within .dss/ ANALYSIS_CACHE_FILE = "analysis_cache.json" RULES_CACHE_FILE = "rules_cache.json" VIOLATIONS_CACHE_FILE = "violations_cache.json" METADATA_FILE = "metadata.json" # Default cache TTL in seconds (1 hour) DEFAULT_CACHE_TTL = 3600 # Stale cache threshold (24 hours - show warning but still use) STALE_THRESHOLD = 86400 class LocalAnalysisCache: """ Manages local .dss/ folder cache for developer workstations. Provides: - Fast, offline-capable validation results - Cached rule definitions from @dss/rules - Violation history for incremental feedback """ def __init__(self, project_path: Optional[str] = None): """ Initialize cache with project path. Args: project_path: Path to project root. Defaults to current directory. """ self.project_path = Path(project_path) if project_path else Path.cwd() self.dss_dir = self.project_path / ".dss" self.cache_dir = self.dss_dir / "cache" self._ensure_structure() def _ensure_structure(self) -> None: """Ensure .dss/ folder structure exists.""" try: self.dss_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True) # Create .gitignore if it doesn't exist gitignore_path = self.dss_dir / ".gitignore" if not gitignore_path.exists(): gitignore_path.write_text("# DSS local cache - do not commit\n*\n!.gitignore\n") logger.debug(f"Created .gitignore in {self.dss_dir}") except Exception as e: logger.warning(f"Failed to create .dss/ structure: {e}") def get_cache_status(self) -> Dict[str, Any]: """ Get current cache status including freshness. Returns: Dict with cache status, age, and recommendation. """ metadata = self._read_metadata() if not metadata: return { "exists": False, "age_seconds": None, "is_fresh": False, "is_stale": True, "recommendation": "Run `npx dss-rules validate` to populate cache" } last_updated = metadata.get("last_updated") if not last_updated: return { "exists": True, "age_seconds": None, "is_fresh": False, "is_stale": True, "recommendation": "Cache missing timestamp, run validation" } try: last_dt = datetime.fromisoformat(last_updated.replace("Z", "+00:00")) now = datetime.now(timezone.utc) age_seconds = (now - last_dt).total_seconds() is_fresh = age_seconds < DEFAULT_CACHE_TTL is_stale = age_seconds > STALE_THRESHOLD if is_fresh: recommendation = "Cache is fresh" elif is_stale: recommendation = f"Cache is {int(age_seconds / 3600)}h old. Run `npx dss-rules validate` to refresh" else: recommendation = "Cache is usable but consider refreshing" return { "exists": True, "age_seconds": int(age_seconds), "is_fresh": is_fresh, "is_stale": is_stale, "last_updated": last_updated, "rules_version": metadata.get("rules_version"), "recommendation": recommendation } except Exception as e: logger.warning(f"Failed to parse cache timestamp: {e}") return { "exists": True, "age_seconds": None, "is_fresh": False, "is_stale": True, "recommendation": "Cache timestamp invalid, run validation" } def get_analysis_results(self) -> Optional[Dict[str, Any]]: """ Get cached analysis results. Returns: Analysis results dict or None if not cached. """ return self._read_cache_file(ANALYSIS_CACHE_FILE) def get_violations(self, file_path: Optional[str] = None) -> List[Dict[str, Any]]: """ Get cached violations, optionally filtered by file. Args: file_path: Optional file path to filter violations. Returns: List of violation dicts. """ violations = self._read_cache_file(VIOLATIONS_CACHE_FILE) if not violations: return [] violation_list = violations.get("violations", []) if file_path: # Normalize path for comparison norm_path = str(Path(file_path).resolve()) return [v for v in violation_list if v.get("file", "").endswith(file_path) or norm_path in v.get("file", "")] return violation_list def get_rules(self) -> Optional[Dict[str, Any]]: """ Get cached rule definitions. Returns: Rules dict or None if not cached. """ return self._read_cache_file(RULES_CACHE_FILE) def save_analysis_results(self, results: Dict[str, Any]) -> bool: """ Save analysis results to cache. Args: results: Analysis results from validation. Returns: True if saved successfully. """ success = self._write_cache_file(ANALYSIS_CACHE_FILE, results) if success: self._update_metadata({"last_analysis": datetime.now(timezone.utc).isoformat()}) return success def save_violations(self, violations: List[Dict[str, Any]], metadata: Optional[Dict[str, Any]] = None) -> bool: """ Save violations to cache. Args: violations: List of violation dicts. metadata: Optional metadata (rules_version, commit, etc.) Returns: True if saved successfully. """ data = { "violations": violations, "count": len(violations), "saved_at": datetime.now(timezone.utc).isoformat(), **(metadata or {}) } success = self._write_cache_file(VIOLATIONS_CACHE_FILE, data) if success: self._update_metadata({ "last_updated": datetime.now(timezone.utc).isoformat(), "violation_count": len(violations), "rules_version": metadata.get("rules_version") if metadata else None }) return success def save_rules(self, rules: Dict[str, Any], version: str) -> bool: """ Save rule definitions to cache. Args: rules: Rule definitions dict. version: Rules package version. Returns: True if saved successfully. """ data = { "rules": rules, "version": version, "cached_at": datetime.now(timezone.utc).isoformat() } success = self._write_cache_file(RULES_CACHE_FILE, data) if success: self._update_metadata({"rules_version": version}) return success def clear_cache(self) -> bool: """ Clear all cached data. Returns: True if cleared successfully. """ try: for file in [ANALYSIS_CACHE_FILE, VIOLATIONS_CACHE_FILE, RULES_CACHE_FILE]: cache_file = self.cache_dir / file if cache_file.exists(): cache_file.unlink() # Reset metadata self._write_metadata({"cleared_at": datetime.now(timezone.utc).isoformat()}) logger.info("Cache cleared") return True except Exception as e: logger.error(f"Failed to clear cache: {e}") return False def _read_cache_file(self, filename: str) -> Optional[Dict[str, Any]]: """Read a cache file.""" cache_file = self.cache_dir / filename if not cache_file.exists(): return None try: return json.loads(cache_file.read_text(encoding="utf-8")) except (json.JSONDecodeError, Exception) as e: logger.warning(f"Failed to read cache file {filename}: {e}") return None def _write_cache_file(self, filename: str, data: Dict[str, Any]) -> bool: """Write a cache file.""" cache_file = self.cache_dir / filename try: cache_file.write_text(json.dumps(data, indent=2), encoding="utf-8") return True except Exception as e: logger.error(f"Failed to write cache file {filename}: {e}") return False def _read_metadata(self) -> Optional[Dict[str, Any]]: """Read metadata file.""" metadata_file = self.dss_dir / METADATA_FILE if not metadata_file.exists(): return None try: return json.loads(metadata_file.read_text(encoding="utf-8")) except Exception: return None def _write_metadata(self, data: Dict[str, Any]) -> bool: """Write metadata file.""" metadata_file = self.dss_dir / METADATA_FILE try: metadata_file.write_text(json.dumps(data, indent=2), encoding="utf-8") return True except Exception as e: logger.error(f"Failed to write metadata: {e}") return False def _update_metadata(self, updates: Dict[str, Any]) -> bool: """Update metadata file with new values.""" existing = self._read_metadata() or {} existing.update(updates) return self._write_metadata(existing) class LocalCacheValidator: """ Validator that uses local cache for offline-capable feedback. Used in LOCAL mode to provide fast, advisory validation without requiring network access to the dashboard. """ def __init__(self, cache: LocalAnalysisCache): """ Initialize validator with cache. Args: cache: LocalAnalysisCache instance. """ self.cache = cache def validate_file(self, file_path: str) -> Dict[str, Any]: """ Validate a single file using cached violations. Args: file_path: Path to file to validate. Returns: Validation result dict. """ cache_status = self.cache.get_cache_status() violations = self.cache.get_violations(file_path) result = { "file": file_path, "violations": violations, "error_count": len([v for v in violations if v.get("severity") == "error"]), "warning_count": len([v for v in violations if v.get("severity") == "warning"]), "cache_status": cache_status, "mode": "local_cache" } if cache_status.get("is_stale"): result["warning"] = cache_status["recommendation"] return result def get_file_status(self, file_path: str) -> str: """ Get simple status for a file. Returns: 'pass', 'fail', or 'unknown'. """ violations = self.cache.get_violations(file_path) errors = [v for v in violations if v.get("severity") == "error"] if not violations: # No cached data for this file return "unknown" return "fail" if errors else "pass" def get_summary(self) -> Dict[str, Any]: """ Get summary of cached validation state. Returns: Summary dict with counts and status. """ cache_status = self.cache.get_cache_status() analysis = self.cache.get_analysis_results() all_violations = self.cache.get_violations() errors = [v for v in all_violations if v.get("severity") == "error"] warnings = [v for v in all_violations if v.get("severity") == "warning"] return { "cache_status": cache_status, "total_violations": len(all_violations), "error_count": len(errors), "warning_count": len(warnings), "rules_version": cache_status.get("rules_version"), "last_updated": cache_status.get("last_updated"), "analysis": analysis } def get_project_cache(project_path: Optional[str] = None) -> LocalAnalysisCache: """ Factory function to get cache for a project. Args: project_path: Path to project root. Returns: LocalAnalysisCache instance. """ return LocalAnalysisCache(project_path)