""" Atlassian-based Authentication Validates users by verifying their Atlassian (Jira/Confluence) credentials. On successful login, creates a JWT token for subsequent requests. """ import os import jwt import hashlib from datetime import datetime, timedelta from typing import Optional, Dict, Any from atlassian import Jira, Confluence from storage.json_store import read_json, write_json, SYSTEM_DIR class AtlassianAuth: """ Authentication using Atlassian API credentials. Users provide: - Atlassian URL (Jira or Confluence) - Email - API Token On successful validation, we: 1. Verify credentials against Atlassian API 2. Store user in database 3. Generate JWT token """ def __init__(self): self.jwt_secret = os.getenv("JWT_SECRET", "change-me-in-production") self.jwt_algorithm = "HS256" self.jwt_expiry_hours = int(os.getenv("JWT_EXPIRY_HOURS", "24")) async def verify_atlassian_credentials( self, url: str, email: str, api_token: str, service: str = "jira" ) -> Dict[str, Any]: """ Verify Atlassian credentials by making a test API call. Args: url: Atlassian URL (e.g., https://yourcompany.atlassian.net) email: User email api_token: Atlassian API token (use "1234" for mock mode) service: "jira" or "confluence" Returns: User info dict if valid, raises exception if invalid """ # Mock mode for development/testing if api_token == "1234": return { "email": email, "display_name": email.split("@")[0].title().replace(".", " ") + " (Mock)", "account_id": "mock_" + hashlib.md5(email.encode()).hexdigest()[:8], "atlassian_url": url or "https://mock.atlassian.net", "service": service, "verified": True, "mock_mode": True } try: if service == "jira": client = Jira(url=url, username=email, password=api_token) # Test API call - get current user user_info = client.myself() else: # confluence client = Confluence(url=url, username=email, password=api_token) # Test API call - get current user user_info = client.get_current_user() return { "email": email, "display_name": user_info.get("displayName", email), "account_id": user_info.get("accountId"), "atlassian_url": url, "service": service, "verified": True, "mock_mode": False } except Exception as e: raise ValueError(f"Invalid Atlassian credentials: {str(e)}") def hash_api_token(self, api_token: str) -> str: """Hash API token for storage (we don't store plain tokens)""" return hashlib.sha256(api_token.encode()).hexdigest() async def login( self, url: str, email: str, api_token: str, service: str = "jira" ) -> Dict[str, Any]: """ Authenticate user with Atlassian credentials. Returns: { "token": "jwt_token", "user": {...}, "expires_at": "iso_timestamp" } """ # Verify credentials against Atlassian user_info = await self.verify_atlassian_credentials( url, email, api_token, service ) # Hash the API token token_hash = self.hash_api_token(api_token) # Store or update user in database with get_connection() as conn: # Check if user exists existing = conn.execute( "SELECT id, email FROM users WHERE email = ?", (email,) ).fetchone() if existing: # Update existing user user_id = existing["id"] conn.execute( """ UPDATE users SET display_name = ?, atlassian_url = ?, atlassian_service = ?, api_token_hash = ?, last_login = ? WHERE id = ? """, ( user_info["display_name"], url, service, token_hash, datetime.utcnow().isoformat(), user_id ) ) else: # Create new user cursor = conn.execute( """ INSERT INTO users ( email, display_name, atlassian_url, atlassian_service, api_token_hash, created_at, last_login ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( email, user_info["display_name"], url, service, token_hash, datetime.utcnow().isoformat(), datetime.utcnow().isoformat() ) ) user_id = cursor.lastrowid # Generate JWT token expires_at = datetime.utcnow() + timedelta(hours=self.jwt_expiry_hours) token_payload = { "user_id": user_id, "email": email, "display_name": user_info["display_name"], "exp": expires_at, "iat": datetime.utcnow() } jwt_token = jwt.encode( token_payload, self.jwt_secret, algorithm=self.jwt_algorithm ) return { "token": jwt_token, "user": { "id": user_id, "email": email, "display_name": user_info["display_name"], "atlassian_url": url, "service": service }, "expires_at": expires_at.isoformat() } def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """ Verify JWT token and return user info. Returns: User dict if valid, None if invalid/expired """ try: payload = jwt.decode( token, self.jwt_secret, algorithms=[self.jwt_algorithm] ) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None async def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]: """Get user information by ID""" with get_connection() as conn: user = conn.execute( """ SELECT id, email, display_name, atlassian_url, atlassian_service, created_at, last_login FROM users WHERE id = ? """, (user_id,) ).fetchone() if user: return dict(user) return None # Singleton instance _auth_instance: Optional[AtlassianAuth] = None def get_auth() -> AtlassianAuth: """Get singleton auth instance""" global _auth_instance if _auth_instance is None: _auth_instance = AtlassianAuth() return _auth_instance