Files
dss/.dss/log-monitor-mcp.py
Digital Production Factory 276ed71f31 Initial commit: Clean DSS implementation
Migrated from design-system-swarm with fresh git history.
Old project history preserved in /home/overbits/apps/design-system-swarm

Core components:
- MCP Server (Python FastAPI with mcp 1.23.1)
- Claude Plugin (agents, commands, skills, strategies, hooks, core)
- DSS Backend (dss-mvp1 - token translation, Figma sync)
- Admin UI (Node.js/React)
- Server (Node.js/Express)
- Storybook integration (dss-mvp1/.storybook)

Self-contained configuration:
- All paths relative or use DSS_BASE_PATH=/home/overbits/dss
- PYTHONPATH configured for dss-mvp1 and dss-claude-plugin
- .env file with all configuration
- Claude plugin uses ${CLAUDE_PLUGIN_ROOT} for portability

Migration completed: $(date)
🤖 Clean migration with full functionality preserved
2025-12-09 18:45:48 -03:00

266 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DSS Browser Log Monitor with MCP Alerts
Monitors browser console logs in real-time and sends critical errors
through MCP (Model Context Protocol) to alert developers.
Features:
- Real-time log monitoring (tail-like)
- Error/Warning/Info filtering
- MCP integration for smart alerts
- Duplicate suppression
- Severity-based routing
"""
import os
import sys
import time
import json
import re
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from collections import defaultdict
import subprocess
LOG_FILE = Path(__file__).parent / 'logs' / 'browser-logs' / 'browser.log'
POLL_INTERVAL = 2 # seconds
ERROR_KEYWORDS = ['error', 'uncaught', 'failed', 'exception', 'critical']
WARN_KEYWORDS = ['warning', 'warn', 'timeout']
@dataclass
class LogEntry:
"""Parsed log entry from browser logs"""
date: str
time: str
level: str
source: str
message: str
full_line: str
@classmethod
def parse(cls, line: str) -> Optional['LogEntry']:
"""Parse log line format: YYYY-MM-DD HH:MM:SS,mmm [LEVEL] [SOURCE] message"""
pattern = r'^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}),\d+ \[(\w+)\] \[(\w+)\] (.*)$'
match = re.match(pattern, line)
if not match:
return None
return cls(
date=match.group(1),
time=match.group(2),
level=match.group(3),
source=match.group(4),
message=match.group(5),
full_line=line
)
def is_error(self) -> bool:
"""Check if entry is an error"""
return (self.level == 'ERROR' or
any(kw in self.message.lower() for kw in ERROR_KEYWORDS))
def is_warning(self) -> bool:
"""Check if entry is a warning"""
return (self.level == 'WARNING' or
any(kw in self.message.lower() for kw in WARN_KEYWORDS))
def is_critical(self) -> bool:
"""Check if entry is critical (needs immediate alert)"""
critical_terms = ['uncaught', 'failed to load', 'not found', 'undefined']
return any(term in self.message.lower() for term in critical_terms)
class LogMonitor:
"""Monitors browser logs and sends MCP alerts"""
def __init__(self):
self.last_position = 0
self.seen_errors = defaultdict(int) # Track duplicate errors
self.initialized = False
self.errors_this_cycle = []
self.warnings_this_cycle = []
def init(self) -> bool:
"""Initialize monitor"""
try:
if LOG_FILE.exists():
self.last_position = LOG_FILE.stat().st_size
self.initialized = True
print(f'[LogMonitor] ✅ Initialized. Watching: {LOG_FILE}')
return True
else:
print(f'[LogMonitor] ⚠️ Log file not found yet: {LOG_FILE}')
return False
except Exception as e:
print(f'[LogMonitor] ❌ Failed to initialize: {e}')
return False
def get_new_logs(self) -> List[str]:
"""Read new log lines since last check"""
try:
if not LOG_FILE.exists():
return []
current_size = LOG_FILE.stat().st_size
# File rotated or shrunk
if current_size < self.last_position:
self.last_position = 0
# No new data
if current_size == self.last_position:
return []
# Read new content
with open(LOG_FILE, 'r') as f:
f.seek(self.last_position)
content = f.read()
self.last_position = current_size
return [line.strip() for line in content.split('\n') if line.strip()]
except Exception as e:
print(f'[LogMonitor] ❌ Error reading logs: {e}')
return []
def send_mcp_alert(self, severity: str, title: str, details: str) -> bool:
"""Send alert through MCP integration"""
try:
timestamp = datetime.now().isoformat()
message = f"""
🔴 **[DSS Admin UI] {severity} Alert**
**Title:** {title}
**Details:**
```
{details}
```
**Time:** {timestamp}
**Monitor:** Browser Console Log Monitor
""".strip()
# In production, this would connect to an MCP server
# For now, log to stdout (which can be captured by MCP)
print(f'\n{"=" * 70}')
print(f'[MCP-ALERT] {severity}')
print(message)
print(f'{"=" * 70}\n')
return True
except Exception as e:
print(f'[LogMonitor] ❌ Failed to send MCP alert: {e}')
return False
def process_logs(self, logs: List[str]) -> None:
"""Process new logs and detect errors"""
self.errors_this_cycle = []
self.warnings_this_cycle = []
for line in logs:
entry = LogEntry.parse(line)
if not entry:
continue
if entry.is_error():
self.errors_this_cycle.append(entry)
# Check for duplicates
key = entry.message[:100] # First 100 chars as key
self.seen_errors[key] += 1
# Alert on first occurrence or critical errors
if self.seen_errors[key] == 1:
if entry.is_critical():
self.send_mcp_alert(
'CRITICAL',
f'Browser Error: {entry.message[:50]}...',
entry.full_line
)
else:
self.send_mcp_alert(
'ERROR',
f'Browser Error: {entry.message[:50]}...',
entry.full_line
)
elif entry.is_warning():
self.warnings_this_cycle.append(entry)
# Alert if too many warnings
if len(self.warnings_this_cycle) > 3:
self.send_mcp_alert(
'WARNING',
f'Multiple Warnings Detected ({len(self.warnings_this_cycle)})',
f'Detected {len(self.warnings_this_cycle)} warnings in this cycle'
)
def poll(self) -> None:
"""Single monitoring cycle"""
try:
logs = self.get_new_logs()
if logs:
self.process_logs(logs)
# Log summary
if self.errors_this_cycle:
print(f'[LogMonitor] 🔴 {len(self.errors_this_cycle)} error(s) detected')
if self.warnings_this_cycle:
print(f'[LogMonitor] ⚠️ {len(self.warnings_this_cycle)} warning(s) detected')
except Exception as e:
print(f'[LogMonitor] ❌ Poll error: {e}')
def start(self) -> None:
"""Start the monitoring loop"""
if not self.initialized:
if not self.init():
print('[LogMonitor] ⚠️ Waiting for log file...')
for _ in range(5):
time.sleep(1)
if self.init():
break
if not self.initialized:
print('[LogMonitor] ❌ Failed to initialize log file')
return
print('[LogMonitor] 🚀 Monitoring started. Press Ctrl+C to stop.')
print('')
try:
# Initial poll
self.poll()
# Continuous polling
while True:
time.sleep(POLL_INTERVAL)
self.poll()
except KeyboardInterrupt:
print('\n[LogMonitor] 🛑 Shutdown signal received')
print(f'[LogMonitor] 📊 Summary: {len(self.seen_errors)} unique errors tracked')
sys.exit(0)
def main():
"""Main entry point"""
print('🔍 DSS Browser Log Monitor (MCP Integration)')
print(f'📝 Log file: {LOG_FILE}')
print(f'⏱️ Poll interval: {POLL_INTERVAL}s')
print('')
monitor = LogMonitor()
monitor.start()
if __name__ == '__main__':
main()