#!/usr/bin/env python3 """ DSS Browser Log Monitor with MCP Alerts Monitors browser console logs in real-time and sends critical errors through MCP (Model Context Protocol) to alert developers. Features: - Real-time log monitoring (tail-like) - Error/Warning/Info filtering - MCP integration for smart alerts - Duplicate suppression - Severity-based routing """ import os import sys import time import json import re from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Any from dataclasses import dataclass from collections import defaultdict import subprocess LOG_FILE = Path(__file__).parent / 'logs' / 'browser-logs' / 'browser.log' POLL_INTERVAL = 2 # seconds ERROR_KEYWORDS = ['error', 'uncaught', 'failed', 'exception', 'critical'] WARN_KEYWORDS = ['warning', 'warn', 'timeout'] @dataclass class LogEntry: """Parsed log entry from browser logs""" date: str time: str level: str source: str message: str full_line: str @classmethod def parse(cls, line: str) -> Optional['LogEntry']: """Parse log line format: YYYY-MM-DD HH:MM:SS,mmm [LEVEL] [SOURCE] message""" pattern = r'^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}),\d+ \[(\w+)\] \[(\w+)\] (.*)$' match = re.match(pattern, line) if not match: return None return cls( date=match.group(1), time=match.group(2), level=match.group(3), source=match.group(4), message=match.group(5), full_line=line ) def is_error(self) -> bool: """Check if entry is an error""" return (self.level == 'ERROR' or any(kw in self.message.lower() for kw in ERROR_KEYWORDS)) def is_warning(self) -> bool: """Check if entry is a warning""" return (self.level == 'WARNING' or any(kw in self.message.lower() for kw in WARN_KEYWORDS)) def is_critical(self) -> bool: """Check if entry is critical (needs immediate alert)""" critical_terms = ['uncaught', 'failed to load', 'not found', 'undefined'] return any(term in self.message.lower() for term in critical_terms) class LogMonitor: """Monitors browser logs and sends MCP alerts""" def __init__(self): self.last_position = 0 self.seen_errors = defaultdict(int) # Track duplicate errors self.initialized = False self.errors_this_cycle = [] self.warnings_this_cycle = [] def init(self) -> bool: """Initialize monitor""" try: if LOG_FILE.exists(): self.last_position = LOG_FILE.stat().st_size self.initialized = True print(f'[LogMonitor] ✅ Initialized. Watching: {LOG_FILE}') return True else: print(f'[LogMonitor] ⚠️ Log file not found yet: {LOG_FILE}') return False except Exception as e: print(f'[LogMonitor] ❌ Failed to initialize: {e}') return False def get_new_logs(self) -> List[str]: """Read new log lines since last check""" try: if not LOG_FILE.exists(): return [] current_size = LOG_FILE.stat().st_size # File rotated or shrunk if current_size < self.last_position: self.last_position = 0 # No new data if current_size == self.last_position: return [] # Read new content with open(LOG_FILE, 'r') as f: f.seek(self.last_position) content = f.read() self.last_position = current_size return [line.strip() for line in content.split('\n') if line.strip()] except Exception as e: print(f'[LogMonitor] ❌ Error reading logs: {e}') return [] def send_mcp_alert(self, severity: str, title: str, details: str) -> bool: """Send alert through MCP integration""" try: timestamp = datetime.now().isoformat() message = f""" 🔴 **[DSS Admin UI] {severity} Alert** **Title:** {title} **Details:** ``` {details} ``` **Time:** {timestamp} **Monitor:** Browser Console Log Monitor """.strip() # In production, this would connect to an MCP server # For now, log to stdout (which can be captured by MCP) print(f'\n{"=" * 70}') print(f'[MCP-ALERT] {severity}') print(message) print(f'{"=" * 70}\n') return True except Exception as e: print(f'[LogMonitor] ❌ Failed to send MCP alert: {e}') return False def process_logs(self, logs: List[str]) -> None: """Process new logs and detect errors""" self.errors_this_cycle = [] self.warnings_this_cycle = [] for line in logs: entry = LogEntry.parse(line) if not entry: continue if entry.is_error(): self.errors_this_cycle.append(entry) # Check for duplicates key = entry.message[:100] # First 100 chars as key self.seen_errors[key] += 1 # Alert on first occurrence or critical errors if self.seen_errors[key] == 1: if entry.is_critical(): self.send_mcp_alert( 'CRITICAL', f'Browser Error: {entry.message[:50]}...', entry.full_line ) else: self.send_mcp_alert( 'ERROR', f'Browser Error: {entry.message[:50]}...', entry.full_line ) elif entry.is_warning(): self.warnings_this_cycle.append(entry) # Alert if too many warnings if len(self.warnings_this_cycle) > 3: self.send_mcp_alert( 'WARNING', f'Multiple Warnings Detected ({len(self.warnings_this_cycle)})', f'Detected {len(self.warnings_this_cycle)} warnings in this cycle' ) def poll(self) -> None: """Single monitoring cycle""" try: logs = self.get_new_logs() if logs: self.process_logs(logs) # Log summary if self.errors_this_cycle: print(f'[LogMonitor] 🔴 {len(self.errors_this_cycle)} error(s) detected') if self.warnings_this_cycle: print(f'[LogMonitor] ⚠️ {len(self.warnings_this_cycle)} warning(s) detected') except Exception as e: print(f'[LogMonitor] ❌ Poll error: {e}') def start(self) -> None: """Start the monitoring loop""" if not self.initialized: if not self.init(): print('[LogMonitor] ⚠️ Waiting for log file...') for _ in range(5): time.sleep(1) if self.init(): break if not self.initialized: print('[LogMonitor] ❌ Failed to initialize log file') return print('[LogMonitor] 🚀 Monitoring started. Press Ctrl+C to stop.') print('') try: # Initial poll self.poll() # Continuous polling while True: time.sleep(POLL_INTERVAL) self.poll() except KeyboardInterrupt: print('\n[LogMonitor] 🛑 Shutdown signal received') print(f'[LogMonitor] 📊 Summary: {len(self.seen_errors)} unique errors tracked') sys.exit(0) def main(): """Main entry point""" print('🔍 DSS Browser Log Monitor (MCP Integration)') print(f'📝 Log file: {LOG_FILE}') print(f'⏱️ Poll interval: {POLL_INTERVAL}s') print('') monitor = LogMonitor() monitor.start() if __name__ == '__main__': main()