fix: Address high-severity bandit issues
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
DSS Runtime - Dependency Injection & Boundary Enforcement
|
||||
DSS Runtime - Dependency Injection & Boundary Enforcement.
|
||||
|
||||
This module provides a bounded runtime environment for DSS MCP tools.
|
||||
All external API access (Figma, Browser, HTTP) MUST go through this runtime.
|
||||
@@ -16,20 +16,24 @@ Usage:
|
||||
browser = runtime.get_browser() # Sandboxed
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
# Setup logging
|
||||
logger = logging.getLogger("dss.runtime")
|
||||
|
||||
|
||||
class BoundaryViolationError(Exception):
|
||||
"""Raised when an operation violates DSS boundaries"""
|
||||
"""Raised when an operation violates DSS boundaries."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class DSSRuntime:
|
||||
"""
|
||||
Bounded runtime environment for DSS operations.
|
||||
@@ -52,7 +56,11 @@ class DSSRuntime:
|
||||
self.config = self._load_config()
|
||||
self.enforcement_mode = self.config.get("enforcement", {}).get("mode", "strict")
|
||||
self.log_violations = self.config.get("enforcement", {}).get("log_violations", True)
|
||||
self.violation_log_path = Path(self.config.get("enforcement", {}).get("violation_log", ".dss/logs/boundary-violations.jsonl"))
|
||||
self.violation_log_path = Path(
|
||||
self.config.get("enforcement", {}).get(
|
||||
"violation_log", ".dss/logs/boundary-violations.jsonl"
|
||||
)
|
||||
)
|
||||
|
||||
# Client caches (lazy initialization)
|
||||
self._figma_client = None
|
||||
@@ -62,7 +70,7 @@ class DSSRuntime:
|
||||
logger.info(f"DSSRuntime initialized with enforcement mode: {self.enforcement_mode}")
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Load boundary configuration from YAML"""
|
||||
"""Load boundary configuration from YAML."""
|
||||
if not self.config_path.exists():
|
||||
logger.warning(f"Boundary config not found: {self.config_path}, using defaults")
|
||||
return self._default_config()
|
||||
@@ -75,7 +83,7 @@ class DSSRuntime:
|
||||
return self._default_config()
|
||||
|
||||
def _default_config(self) -> Dict[str, Any]:
|
||||
"""Default boundary configuration (strict)"""
|
||||
"""Default boundary configuration (strict)."""
|
||||
return {
|
||||
"version": "1.0",
|
||||
"blocked_external_apis": ["api.figma.com"],
|
||||
@@ -83,12 +91,12 @@ class DSSRuntime:
|
||||
"enforcement": {
|
||||
"mode": "strict",
|
||||
"log_violations": True,
|
||||
"violation_log": ".dss/logs/boundary-violations.jsonl"
|
||||
}
|
||||
"violation_log": ".dss/logs/boundary-violations.jsonl",
|
||||
},
|
||||
}
|
||||
|
||||
def _log_violation(self, operation: str, details: Dict[str, Any]):
|
||||
"""Log boundary violation to audit trail"""
|
||||
"""Log boundary violation to audit trail."""
|
||||
if not self.log_violations:
|
||||
return
|
||||
|
||||
@@ -99,7 +107,7 @@ class DSSRuntime:
|
||||
"type": "boundary_violation",
|
||||
"operation": operation,
|
||||
"enforcement_mode": self.enforcement_mode,
|
||||
"details": details
|
||||
"details": details,
|
||||
}
|
||||
|
||||
with open(self.violation_log_path, "a") as f:
|
||||
@@ -108,7 +116,7 @@ class DSSRuntime:
|
||||
logger.warning(f"Boundary violation: {operation} - {details}")
|
||||
|
||||
def _log_access(self, operation: str, allowed: bool, details: Dict[str, Any]):
|
||||
"""Log successful access for audit trail"""
|
||||
"""Log successful access for audit trail."""
|
||||
access_log_path = Path(".dss/logs/runtime-access.jsonl")
|
||||
access_log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -117,7 +125,7 @@ class DSSRuntime:
|
||||
"type": "runtime_access",
|
||||
"operation": operation,
|
||||
"allowed": allowed,
|
||||
"details": details
|
||||
"details": details,
|
||||
}
|
||||
|
||||
with open(access_log_path, "a") as f:
|
||||
@@ -139,11 +147,7 @@ class DSSRuntime:
|
||||
# Check if operation requires going through DSS tools
|
||||
for category, tools in required_tools.items():
|
||||
if operation in category:
|
||||
details = {
|
||||
"operation": operation,
|
||||
"context": context,
|
||||
"required_tools": tools
|
||||
}
|
||||
details = {"operation": operation, "context": context, "required_tools": tools}
|
||||
|
||||
self._log_violation(operation, details)
|
||||
|
||||
@@ -173,8 +177,8 @@ class DSSRuntime:
|
||||
|
||||
self._figma_client = SafeFigmaClient(
|
||||
token=token,
|
||||
allow_write=False, # Read-only by default
|
||||
runtime=self
|
||||
allow_write=False,
|
||||
runtime=self, # Read-only by default
|
||||
)
|
||||
|
||||
logger.info("Figma client initialized (read-only mode)")
|
||||
@@ -195,6 +199,7 @@ class DSSRuntime:
|
||||
if strategy == "local":
|
||||
try:
|
||||
from strategies.local.browser import LocalBrowserStrategy
|
||||
|
||||
self._browser_strategy = LocalBrowserStrategy(runtime=self)
|
||||
logger.info("Local browser strategy initialized")
|
||||
except ImportError:
|
||||
@@ -204,6 +209,7 @@ class DSSRuntime:
|
||||
elif strategy == "remote":
|
||||
try:
|
||||
from strategies.remote.browser import RemoteBrowserStrategy
|
||||
|
||||
self._browser_strategy = RemoteBrowserStrategy(runtime=self)
|
||||
logger.info("Remote browser strategy initialized")
|
||||
except ImportError:
|
||||
@@ -224,8 +230,7 @@ class DSSRuntime:
|
||||
from core.safe_http_client import SafeHTTPClient
|
||||
|
||||
self._http_client = SafeHTTPClient(
|
||||
blocked_domains=self.config.get("blocked_external_apis", []),
|
||||
runtime=self
|
||||
blocked_domains=self.config.get("blocked_external_apis", []), runtime=self
|
||||
)
|
||||
|
||||
logger.info("HTTP client initialized with URL validation")
|
||||
@@ -245,10 +250,7 @@ class DSSRuntime:
|
||||
blocked = self.config.get("blocked_imports", [])
|
||||
|
||||
if module_name in blocked:
|
||||
details = {
|
||||
"module": module_name,
|
||||
"blocked_imports": blocked
|
||||
}
|
||||
details = {"module": module_name, "blocked_imports": blocked}
|
||||
|
||||
self._log_violation(f"direct_import:{module_name}", details)
|
||||
|
||||
@@ -292,14 +294,16 @@ class DSSRuntime:
|
||||
"browser": self._browser_strategy is not None,
|
||||
"http": self._http_client is not None,
|
||||
},
|
||||
"config_version": self.config.get("version", "unknown")
|
||||
"config_version": self.config.get("version", "unknown"),
|
||||
}
|
||||
|
||||
|
||||
# Global runtime instance (singleton pattern)
|
||||
_runtime_instance: Optional[DSSRuntime] = None
|
||||
|
||||
|
||||
def get_runtime() -> DSSRuntime:
|
||||
"""Get the global DSSRuntime instance (singleton)"""
|
||||
"""Get the global DSSRuntime instance (singleton)."""
|
||||
global _runtime_instance
|
||||
|
||||
if _runtime_instance is None:
|
||||
|
||||
Reference in New Issue
Block a user