Initial commit: Clean DSS implementation
Migrated from design-system-swarm with fresh git history.
Old project history preserved in /home/overbits/apps/design-system-swarm
Core components:
- MCP Server (Python FastAPI with mcp 1.23.1)
- Claude Plugin (agents, commands, skills, strategies, hooks, core)
- DSS Backend (dss-mvp1 - token translation, Figma sync)
- Admin UI (Node.js/React)
- Server (Node.js/Express)
- Storybook integration (dss-mvp1/.storybook)
Self-contained configuration:
- All paths relative or use DSS_BASE_PATH=/home/overbits/dss
- PYTHONPATH configured for dss-mvp1 and dss-claude-plugin
- .env file with all configuration
- Claude plugin uses ${CLAUDE_PLUGIN_ROOT} for portability
Migration completed: $(date)
🤖 Clean migration with full functionality preserved
This commit is contained in:
231
tools/api/services/sandboxed_fs.py
Normal file
231
tools/api/services/sandboxed_fs.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""
|
||||
SandboxedFS - Secure File System Operations
|
||||
|
||||
This service restricts all file operations to within a project's root directory,
|
||||
preventing path traversal attacks and ensuring AI operations are safely scoped.
|
||||
|
||||
Security Features:
|
||||
- Path resolution with escape detection
|
||||
- Symlink attack prevention
|
||||
- Read/write operation logging
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SandboxedFS:
|
||||
"""
|
||||
File system operations restricted to a project root.
|
||||
|
||||
All paths are validated to ensure they don't escape the sandbox.
|
||||
This is critical for AI operations that may receive untrusted input.
|
||||
"""
|
||||
|
||||
def __init__(self, root_path: str):
|
||||
"""
|
||||
Initialize sandboxed file system.
|
||||
|
||||
Args:
|
||||
root_path: Absolute path to project root directory
|
||||
|
||||
Raises:
|
||||
ValueError: If root_path doesn't exist or isn't a directory
|
||||
"""
|
||||
self.root = Path(root_path).resolve()
|
||||
if not self.root.is_dir():
|
||||
raise ValueError(f"Invalid root path: {root_path}")
|
||||
logger.info(f"SandboxedFS initialized with root: {self.root}")
|
||||
|
||||
def _validate_path(self, relative_path: str) -> Path:
|
||||
"""
|
||||
Validate and resolve a path within the sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
|
||||
Returns:
|
||||
Resolved absolute Path within sandbox
|
||||
|
||||
Raises:
|
||||
PermissionError: If path escapes sandbox
|
||||
"""
|
||||
# Normalize the path
|
||||
clean_path = os.path.normpath(relative_path)
|
||||
|
||||
# Resolve full path
|
||||
full_path = (self.root / clean_path).resolve()
|
||||
|
||||
# Security check: must be within root
|
||||
try:
|
||||
full_path.relative_to(self.root)
|
||||
except ValueError:
|
||||
logger.warning(f"Path traversal attempt blocked: {relative_path}")
|
||||
raise PermissionError(f"Path escapes sandbox: {relative_path}")
|
||||
|
||||
return full_path
|
||||
|
||||
def read_file(self, relative_path: str, max_size_kb: int = 500) -> str:
|
||||
"""
|
||||
Read file content within sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
max_size_kb: Maximum file size in KB (default 500KB)
|
||||
|
||||
Returns:
|
||||
File content as string
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
PermissionError: If path escapes sandbox
|
||||
ValueError: If file exceeds max size
|
||||
"""
|
||||
path = self._validate_path(relative_path)
|
||||
|
||||
if not path.is_file():
|
||||
raise FileNotFoundError(f"File not found: {relative_path}")
|
||||
|
||||
# Check file size
|
||||
size_kb = path.stat().st_size / 1024
|
||||
if size_kb > max_size_kb:
|
||||
raise ValueError(f"File too large: {size_kb:.1f}KB > {max_size_kb}KB limit")
|
||||
|
||||
content = path.read_text(encoding='utf-8')
|
||||
logger.debug(f"Read file: {relative_path} ({len(content)} chars)")
|
||||
return content
|
||||
|
||||
def write_file(self, relative_path: str, content: str) -> None:
|
||||
"""
|
||||
Write file content within sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
content: Content to write
|
||||
|
||||
Raises:
|
||||
PermissionError: If path escapes sandbox
|
||||
"""
|
||||
path = self._validate_path(relative_path)
|
||||
|
||||
# Create parent directories if needed
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
path.write_text(content, encoding='utf-8')
|
||||
logger.info(f"Wrote file: {relative_path} ({len(content)} chars)")
|
||||
|
||||
def delete_file(self, relative_path: str) -> None:
|
||||
"""
|
||||
Delete file within sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
PermissionError: If path escapes sandbox
|
||||
"""
|
||||
path = self._validate_path(relative_path)
|
||||
|
||||
if not path.is_file():
|
||||
raise FileNotFoundError(f"File not found: {relative_path}")
|
||||
|
||||
path.unlink()
|
||||
logger.info(f"Deleted file: {relative_path}")
|
||||
|
||||
def list_directory(self, relative_path: str = ".") -> List[Dict[str, any]]:
|
||||
"""
|
||||
List directory contents within sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
|
||||
Returns:
|
||||
List of dicts with name, type, and size
|
||||
|
||||
Raises:
|
||||
NotADirectoryError: If path isn't a directory
|
||||
PermissionError: If path escapes sandbox
|
||||
"""
|
||||
path = self._validate_path(relative_path)
|
||||
|
||||
if not path.is_dir():
|
||||
raise NotADirectoryError(f"Not a directory: {relative_path}")
|
||||
|
||||
result = []
|
||||
for item in sorted(path.iterdir()):
|
||||
entry = {
|
||||
"name": item.name,
|
||||
"type": "directory" if item.is_dir() else "file",
|
||||
}
|
||||
if item.is_file():
|
||||
entry["size"] = item.stat().st_size
|
||||
result.append(entry)
|
||||
|
||||
return result
|
||||
|
||||
def file_exists(self, relative_path: str) -> bool:
|
||||
"""
|
||||
Check if file exists within sandbox.
|
||||
|
||||
Args:
|
||||
relative_path: Path relative to project root
|
||||
|
||||
Returns:
|
||||
True if file exists, False otherwise
|
||||
"""
|
||||
try:
|
||||
path = self._validate_path(relative_path)
|
||||
return path.exists()
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
def get_file_tree(self, max_depth: int = 3, include_hidden: bool = False) -> Dict:
|
||||
"""
|
||||
Get hierarchical file tree for AI context injection.
|
||||
|
||||
Args:
|
||||
max_depth: Maximum directory depth to traverse
|
||||
include_hidden: Include hidden files (starting with .)
|
||||
|
||||
Returns:
|
||||
Nested dict representing file tree with sizes
|
||||
"""
|
||||
def build_tree(path: Path, depth: int) -> Dict:
|
||||
if depth > max_depth:
|
||||
return {"...": "truncated"}
|
||||
|
||||
result = {}
|
||||
try:
|
||||
items = sorted(path.iterdir())
|
||||
except PermissionError:
|
||||
return {"error": "permission denied"}
|
||||
|
||||
for item in items:
|
||||
# Skip hidden files unless requested
|
||||
if not include_hidden and item.name.startswith('.'):
|
||||
# Always include .dss config folder
|
||||
if item.name != '.dss':
|
||||
continue
|
||||
|
||||
# Skip common non-essential directories
|
||||
if item.name in ('node_modules', '__pycache__', '.git', 'dist', 'build'):
|
||||
result[item.name + "/"] = {"...": "skipped"}
|
||||
continue
|
||||
|
||||
if item.is_dir():
|
||||
result[item.name + "/"] = build_tree(item, depth + 1)
|
||||
else:
|
||||
result[item.name] = item.stat().st_size
|
||||
|
||||
return result
|
||||
|
||||
return build_tree(self.root, 0)
|
||||
|
||||
def get_root_path(self) -> str:
|
||||
"""Get the absolute root path of this sandbox."""
|
||||
return str(self.root)
|
||||
Reference in New Issue
Block a user