""" DSS MCP Security Module Handles encryption, decryption, and secure storage of sensitive credentials. Uses cryptography library for AES-256 encryption with per-credential salt. """ import os import json import secrets from typing import Optional, Dict, Any from datetime import datetime from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.backends import default_backend from .config import mcp_config from storage.json_store import read_json, write_json, SYSTEM_DIR # JSON storage class CredentialVault: """ Manages encrypted credential storage. All credentials are encrypted using Fernet (AES-128 in CBC mode) with PBKDF2-derived keys from a master encryption key. """ # Master encryption key (should be set via environment variable) MASTER_KEY = os.environ.get('DSS_ENCRYPTION_KEY', '').encode() @classmethod def _get_cipher_suite(cls, salt: bytes) -> Fernet: """Derive encryption cipher from master key and salt""" if not cls.MASTER_KEY: raise ValueError( "DSS_ENCRYPTION_KEY environment variable not set. " "Required for credential encryption." ) # Derive key from master key using PBKDF2 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) key = kdf.derive(cls.MASTER_KEY) # Encode key for Fernet import base64 key_b64 = base64.urlsafe_b64encode(key) return Fernet(key_b64) @classmethod def encrypt_credential( cls, credential_type: str, credential_data: Dict[str, Any], user_id: Optional[str] = None ) -> str: """ Encrypt and store a credential. Args: credential_type: Type of credential (figma_token, jira_token, etc.) credential_data: Dictionary containing credential details user_id: Optional user ID for multi-tenant security Returns: Credential ID for later retrieval """ import uuid import base64 credential_id = str(uuid.uuid4()) salt = secrets.token_bytes(16) # 128-bit salt # Serialize credential data json_data = json.dumps(credential_data) # Encrypt cipher = cls._get_cipher_suite(salt) encrypted = cipher.encrypt(json_data.encode()) # Store in database with get_connection() as conn: conn.execute(""" INSERT INTO credentials ( id, credential_type, encrypted_data, salt, user_id, created_at ) VALUES (?, ?, ?, ?, ?, ?) """, ( credential_id, credential_type, encrypted.decode(), base64.b64encode(salt).decode(), user_id, datetime.utcnow().isoformat() )) return credential_id @classmethod def decrypt_credential( cls, credential_id: str ) -> Optional[Dict[str, Any]]: """ Decrypt and retrieve a credential. Args: credential_id: Credential ID from encrypt_credential() Returns: Decrypted credential data or None if not found """ import base64 with get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT encrypted_data, salt FROM credentials WHERE id = ? """, (credential_id,)) row = cursor.fetchone() if not row: return None encrypted_data, salt_b64 = row salt = base64.b64decode(salt_b64) # Decrypt cipher = cls._get_cipher_suite(salt) decrypted = cipher.decrypt(encrypted_data.encode()) return json.loads(decrypted.decode()) @classmethod def delete_credential(cls, credential_id: str) -> bool: """Delete a credential""" with get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM credentials WHERE id = ?", (credential_id,)) return cursor.rowcount > 0 @classmethod def list_credentials( cls, credential_type: Optional[str] = None, user_id: Optional[str] = None ) -> list: """List credentials (metadata only, not decrypted)""" with get_connection() as conn: cursor = conn.cursor() query = "SELECT id, credential_type, user_id, created_at FROM credentials WHERE 1=1" params = [] if credential_type: query += " AND credential_type = ?" params.append(credential_type) if user_id: query += " AND user_id = ?" params.append(user_id) cursor.execute(query, params) return [dict(row) for row in cursor.fetchall()] @classmethod def rotate_encryption_key(cls) -> bool: """ Rotate the master encryption key. This re-encrypts all credentials with a new master key. Requires new key to be set in DSS_ENCRYPTION_KEY_NEW environment variable. """ new_key = os.environ.get('DSS_ENCRYPTION_KEY_NEW', '').encode() if not new_key: raise ValueError( "DSS_ENCRYPTION_KEY_NEW environment variable not set for key rotation" ) try: with get_connection() as conn: cursor = conn.cursor() # Get all credentials cursor.execute("SELECT id, encrypted_data, salt FROM credentials") rows = cursor.fetchall() # Re-encrypt with new key for row in rows: credential_id, encrypted_data, salt_b64 = row import base64 salt = base64.b64decode(salt_b64) # Decrypt with old key old_cipher = cls._get_cipher_suite(salt) decrypted = old_cipher.decrypt(encrypted_data.encode()) # Encrypt with new key (use new master key) old_master = cls.MASTER_KEY cls.MASTER_KEY = new_key try: new_cipher = cls._get_cipher_suite(salt) new_encrypted = new_cipher.encrypt(decrypted) # Update database conn.execute( "UPDATE credentials SET encrypted_data = ? WHERE id = ?", (new_encrypted.decode(), credential_id) ) finally: cls.MASTER_KEY = old_master # Update environment os.environ['DSS_ENCRYPTION_KEY'] = new_key.decode() return True except Exception as e: raise RuntimeError(f"Key rotation failed: {str(e)}") @classmethod def ensure_credentials_table(cls): """Ensure credentials table exists""" with get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS credentials ( id TEXT PRIMARY KEY, credential_type TEXT NOT NULL, encrypted_data TEXT NOT NULL, salt TEXT NOT NULL, user_id TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) conn.execute( "CREATE INDEX IF NOT EXISTS idx_credentials_type ON credentials(credential_type)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_credentials_user ON credentials(user_id)" ) # Initialize table on import CredentialVault.ensure_credentials_table()