""" Dashboard routes for MCP Gateway status monitoring Exposes service health, tool counts, and response metrics """ import json import time import aiohttp from datetime import datetime from typing import Dict, List, Optional # Define backend services with their details BACKEND_SERVICES = { "erpnext": { "url": "http://mcp-erpnext:32802/mcp", "display_name": "ERPNext", "description": "ERP System Integration" }, "truenas": { "url": "http://mcp-truenas:8100/mcp", "display_name": "TrueNAS", "description": "NAS Storage Management" }, "homeassistant": { "url": "http://mcp-homeassistant:8200/mcp", "display_name": "Home Assistant", "description": "Home Automation" }, "wave": { "url": "http://mcp-wave:8300/mcp", "display_name": "Wave Finance", "description": "Financial Management" }, "linkedin": { "url": "http://mcp-linkedin:3000/mcp", "display_name": "LinkedIn", "description": "LinkedIn Integration" } } SERVICE_CACHE = {} CACHE_TTL = 30 # Cache for 30 seconds async def get_service_status() -> Dict: """ Get status of all MCP services Returns health, tool count, response time, and other metrics """ services = [] for service_key, service_config in BACKEND_SERVICES.items(): service_data = { "name": service_config.get("display_name", service_key), "key": service_key, "url": service_config["url"], "status": "unknown", "toolCount": 0, "responseTime": None, "lastCheck": None, "error": None } try: start_time = time.time() async with aiohttp.ClientSession() as session: # Try to fetch tools list tools_url = f"{service_config['url']}/tools" async with session.get(tools_url, timeout=aiohttp.ClientTimeout(total=5)) as response: elapsed = (time.time() - start_time) * 1000 # Convert to ms service_data["responseTime"] = round(elapsed, 2) if response.status == 200: try: data = await response.json() tools = data.get("tools", []) service_data["toolCount"] = len(tools) service_data["status"] = "healthy" except json.JSONDecodeError: service_data["status"] = "warning" service_data["error"] = "Invalid JSON response" elif response.status == 503: service_data["status"] = "warning" service_data["error"] = "Service unavailable" else: service_data["status"] = "unhealthy" service_data["error"] = f"HTTP {response.status}" except asyncio.TimeoutError: service_data["status"] = "unhealthy" service_data["error"] = "Connection timeout" service_data["responseTime"] = 5000 except aiohttp.ClientConnectorError: service_data["status"] = "unhealthy" service_data["error"] = "Connection refused" except Exception as e: service_data["status"] = "unhealthy" service_data["error"] = str(e) service_data["lastCheck"] = datetime.utcnow().isoformat() services.append(service_data) return { "timestamp": datetime.utcnow().isoformat(), "services": services, "summary": { "total": len(services), "healthy": len([s for s in services if s["status"] == "healthy"]), "warning": len([s for s in services if s["status"] == "warning"]), "unhealthy": len([s for s in services if s["status"] == "unhealthy"]), "totalTools": sum(s["toolCount"] for s in services) } } async def setup_dashboard_routes(app): """ Register dashboard routes with the FastAPI app """ from fastapi.responses import FileResponse, JSONResponse from fastapi import APIRouter import asyncio router = APIRouter() @router.get("/dashboard") async def get_dashboard(): """Serve the dashboard HTML""" try: return FileResponse("dashboard/dashboard.html", media_type="text/html") except FileNotFoundError: return JSONResponse( {"error": "Dashboard not found"}, status_code=404 ) @router.get("/mcp/status") async def get_mcp_status(): """Get status of all MCP services""" status = await get_service_status() return JSONResponse(status) @router.get("/mcp/status/{service_name}") async def get_service_status_by_name(service_name: str): """Get status of a specific service""" status = await get_service_status() for service in status["services"]: if service["key"] == service_name or service["name"].lower() == service_name.lower(): return JSONResponse(service) return JSONResponse( {"error": f"Service '{service_name}' not found"}, status_code=404 ) return router