""" Claude Persistent Agent - FastAPI Backend Interfaces with a long-running interactive Claude Code process via stdin/stdout pipes. """ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List, Dict, Any import asyncio import json import sqlite3 import subprocess import logging from datetime import datetime import uuid from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import os import threading # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI(title="Claude Persistent Agent", version="3.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Paths DB_PATH = Path("/app/data/tasks.db") LOGS_PATH = Path("/app/logs") TASKS_PATH = Path("/app/tasks") DB_PATH.parent.mkdir(parents=True, exist_ok=True) LOGS_PATH.mkdir(parents=True, exist_ok=True) TASKS_PATH.mkdir(parents=True, exist_ok=True) # ============ Models ============ class Task(BaseModel): id: Optional[str] = None name: str description: Optional[str] = None prompt: str schedule_type: str = "manual" schedule_value: Optional[str] = None enabled: bool = True created_at: Optional[str] = None last_run: Optional[str] = None next_run: Optional[str] = None status: str = "idle" agent_tools: Optional[str] = None agent_system_prompt: Optional[str] = None agent_max_turns: Optional[int] = None agent_permission_mode: str = "auto" agent_timeout: int = 300 class TaskRun(BaseModel): task_id: str run_id: str status: str output: Optional[str] = None error: Optional[str] = None started_at: str completed_at: Optional[str] = None class ChatSend(BaseModel): message: str session_id: Optional[str] = None class McpServerAdd(BaseModel): name: str server_type: str = "sse" url: Optional[str] = None command: Optional[str] = None args: Optional[List[str]] = None # ============ Database ============ def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, prompt TEXT NOT NULL, schedule_type TEXT NOT NULL, schedule_value TEXT, enabled BOOLEAN DEFAULT 1, created_at TEXT, last_run TEXT, next_run TEXT, status TEXT DEFAULT 'idle', agent_tools TEXT, agent_system_prompt TEXT, agent_max_turns INTEGER, agent_permission_mode TEXT DEFAULT 'auto', agent_timeout INTEGER DEFAULT 300 ) """) c.execute(""" CREATE TABLE IF NOT EXISTS task_runs ( run_id TEXT PRIMARY KEY, task_id TEXT NOT NULL, status TEXT NOT NULL, output TEXT, error TEXT, started_at TEXT, completed_at TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) c.execute(""" CREATE TABLE IF NOT EXISTS chat_messages ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp TEXT NOT NULL, metadata TEXT ) """) # Migrations for any legacy columns for col in ["agent_tools TEXT", "agent_system_prompt TEXT", "agent_max_turns INTEGER", "agent_permission_mode TEXT DEFAULT 'auto'", "agent_timeout INTEGER DEFAULT 300"]: try: c.execute(f"ALTER TABLE tasks ADD COLUMN {col}") except sqlite3.OperationalError: pass conn.commit() conn.close() def db_save_message(session_id: str, role: str, content: str, metadata: dict = None): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "INSERT INTO chat_messages (id, session_id, role, content, timestamp, metadata) VALUES (?,?,?,?,?,?)", (str(uuid.uuid4()), session_id, role, content, datetime.now().isoformat(), json.dumps(metadata) if metadata else None) ) conn.commit() conn.close() def db_get_messages(session_id: str) -> list: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "SELECT id, role, content, timestamp, metadata FROM chat_messages WHERE session_id=? ORDER BY timestamp", (session_id,) ) rows = c.fetchall() conn.close() return [{"id": r[0], "role": r[1], "content": r[2], "timestamp": r[3], "metadata": json.loads(r[4]) if r[4] else None} for r in rows] def db_get_sessions() -> list: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" SELECT session_id, MIN(timestamp) as started, MAX(timestamp) as last_active, COUNT(*) as message_count FROM chat_messages GROUP BY session_id ORDER BY last_active DESC """) rows = c.fetchall() conn.close() return [{"session_id": r[0], "started": r[1], "last_active": r[2], "message_count": r[3]} for r in rows] # ============ Persistent Claude Process Manager ============ class ClaudeProcessManager: """ Manages a long-running `claude` interactive process. Communicates via stdin/stdout using stream-json format. The process is started once (after auth is confirmed) and kept alive. Messages are sent as JSON lines to stdin; responses arrive as JSON lines from stdout. Claude stream-json input format (one line per message): {"type": "user", "message": {"role": "user", "content": "your prompt"}} Claude stream-json output events (one line per event): {"type": "system", "subtype": "init", "session_id": "...", ...} {"type": "assistant", "message": {"role": "assistant", "content": [...]}} {"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}} {"type": "result", "subtype": "success", "result": "...", "session_id": "..."} {"type": "result", "subtype": "error_max_turns", ...} """ def __init__(self): self._process: Optional[asyncio.subprocess.Process] = None self._lock = asyncio.Lock() self._response_buffer: List[str] = [] self._waiting_for_response = False self._stdout_task: Optional[asyncio.Task] = None self._current_session_id: Optional[str] = None # Subscribers: session_id -> list of WebSocket queues self._subscribers: Dict[str, List[asyncio.Queue]] = {} # Global broadcast queues for all connected clients self._broadcast_queues: List[asyncio.Queue] = [] self._is_ready = False self._status = "not_started" # not_started, starting, ready, dead @property def status(self): return self._status @property def session_id(self): return self._current_session_id async def start(self) -> bool: """Start the Claude interactive process.""" async with self._lock: if self._process and self._process.returncode is None: return True # Already running self._status = "starting" logger.info("Starting Claude interactive process...") env = os.environ.copy() # Build command: interactive claude with stream-json I/O cmd = [ "claude", "--output-format", "stream-json", "--input-format", "stream-json", "--verbose", ] try: self._process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) self._is_ready = False self._status = "starting" # Start stdout reader task if self._stdout_task: self._stdout_task.cancel() self._stdout_task = asyncio.create_task(self._read_stdout()) # Wait for the init event (up to 15 seconds) for _ in range(150): await asyncio.sleep(0.1) if self._is_ready: break if self._is_ready: logger.info(f"Claude process ready, session: {self._current_session_id}") self._status = "ready" return True else: logger.warning("Claude process started but no init event yet - proceeding anyway") self._status = "ready" return True except FileNotFoundError: self._status = "dead" logger.error("claude CLI not found in PATH") return False except Exception as e: self._status = "dead" logger.error(f"Failed to start Claude process: {e}") return False async def stop(self): """Stop the Claude process.""" if self._process: try: self._process.terminate() await asyncio.wait_for(self._process.wait(), timeout=5) except Exception: self._process.kill() self._process = None self._status = "not_started" self._is_ready = False if self._stdout_task: self._stdout_task.cancel() self._stdout_task = None async def restart(self) -> bool: """Restart the Claude process.""" await self.stop() return await self.start() async def _read_stdout(self): """Continuously read stdout from the Claude process and broadcast events.""" try: while self._process and self._process.returncode is None: line = await self._process.stdout.readline() if not line: break line_str = line.decode("utf-8", errors="replace").strip() if not line_str: continue try: event = json.loads(line_str) except json.JSONDecodeError: logger.debug(f"Non-JSON stdout: {line_str}") continue await self._handle_event(event) except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error reading Claude stdout: {e}") finally: if self._status == "ready": self._status = "dead" logger.warning("Claude process stdout ended") # Notify all subscribers the process died await self._broadcast({"type": "system", "subtype": "process_dead", "message": "Claude process ended unexpectedly"}) async def _handle_event(self, event: dict): """Process an event from Claude's stdout.""" event_type = event.get("type", "") if event_type == "system" and event.get("subtype") == "init": self._current_session_id = event.get("session_id") self._is_ready = True logger.info(f"Claude session initialized: {self._current_session_id}") elif event_type == "result": # End of a response turn session_id = event.get("session_id", self._current_session_id) self._current_session_id = session_id await self._broadcast(event) async def _broadcast(self, event: dict): """Send event to all connected WebSocket queues.""" dead_queues = [] for q in self._broadcast_queues: try: q.put_nowait(event) except asyncio.QueueFull: dead_queues.append(q) for q in dead_queues: if q in self._broadcast_queues: self._broadcast_queues.remove(q) def subscribe(self) -> asyncio.Queue: """Subscribe to Claude output events. Returns a queue.""" q = asyncio.Queue(maxsize=200) self._broadcast_queues.append(q) return q def unsubscribe(self, q: asyncio.Queue): """Unsubscribe a queue.""" if q in self._broadcast_queues: self._broadcast_queues.remove(q) async def send_message(self, content: str) -> bool: """Send a message to the running Claude process.""" if not self._process or self._process.returncode is not None: return False if self._status != "ready": return False # Claude stream-json input format msg = json.dumps({ "type": "user", "message": {"role": "user", "content": content} }) + "\n" try: self._process.stdin.write(msg.encode("utf-8")) await self._process.stdin.drain() return True except Exception as e: logger.error(f"Failed to send message to Claude: {e}") return False async def send_task_noninteractive(self, prompt: str, tools: str = None, system_prompt: str = None, permission_mode: str = "auto", timeout: int = 300) -> dict: """ Run a one-shot task using a separate claude -p invocation (so it doesn't interfere with the interactive session). Returns {output, error, exit_code}. """ cmd = ["claude", "-p", "--output-format", "json"] if tools: cmd.extend(["--allowedTools", tools]) if system_prompt: cmd.extend(["--system-prompt", system_prompt]) if permission_mode: cmd.extend(["--permission-mode", permission_mode]) cmd.append(prompt) env = os.environ.copy() try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) stdout_str = stdout.decode("utf-8", errors="replace") stderr_str = stderr.decode("utf-8", errors="replace") result_text = "" try: data = json.loads(stdout_str) result_text = data.get("result", stdout_str) except json.JSONDecodeError: result_text = stdout_str return { "output": result_text, "error": stderr_str if proc.returncode != 0 else None, "exit_code": proc.returncode } except asyncio.TimeoutError: return {"output": None, "error": f"Task timed out after {timeout}s", "exit_code": -1} except FileNotFoundError: return {"output": None, "error": "claude CLI not found", "exit_code": -1} except Exception as e: return {"output": None, "error": str(e), "exit_code": -1} # Global process manager instance claude_mgr = ClaudeProcessManager() # ============ Scheduler ============ scheduler = BackgroundScheduler() def _schedule_task_sync(task_id: str): """Synchronous wrapper for scheduled tasks (runs in APScheduler thread).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(_run_scheduled_task(task_id)) finally: loop.close() async def _run_scheduled_task(task_id: str): """Execute a scheduled task via the non-interactive claude -p path.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) row = c.fetchone() conn.close() if not row: logger.error(f"Scheduled task {task_id} not found") return cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", "enabled", "created_at", "last_run", "next_run", "status", "agent_tools", "agent_system_prompt", "agent_max_turns", "agent_permission_mode", "agent_timeout"] task_data = dict(zip(cols, row)) task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields}) run_id = str(uuid.uuid4()) started = datetime.now().isoformat() conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "INSERT INTO task_runs (run_id, task_id, status, started_at) VALUES (?,?,?,?)", (run_id, task_id, "running", started) ) c.execute("UPDATE tasks SET status='running', last_run=? WHERE id=?", (started, task_id)) conn.commit() conn.close() result = await claude_mgr.send_task_noninteractive( prompt=task.prompt, tools=task.agent_tools, system_prompt=task.agent_system_prompt, permission_mode=task.agent_permission_mode, timeout=task.agent_timeout ) completed = datetime.now().isoformat() status = "completed" if result["exit_code"] == 0 else "failed" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "UPDATE task_runs SET status=?, output=?, error=?, completed_at=? WHERE run_id=?", (status, result["output"], result["error"], completed, run_id) ) c.execute("UPDATE tasks SET status='idle' WHERE id=?", (task_id,)) conn.commit() conn.close() logger.info(f"Task {task_id} ({task.name}) completed with status {status}") # ============ Startup / Shutdown ============ @app.on_event("startup") async def startup(): init_db() scheduler.start() _reload_schedules() # Auto-start the Claude interactive process asyncio.create_task(_auto_start_claude()) async def _auto_start_claude(): """Try to start the Claude process on startup.""" await asyncio.sleep(2) # Let the server fully start first ok = await claude_mgr.start() if ok: logger.info("Claude interactive process started on startup") else: logger.warning("Could not start Claude process on startup (auth may be needed)") @app.on_event("shutdown") async def shutdown(): scheduler.shutdown() await claude_mgr.stop() def _reload_schedules(): """Load all enabled recurring tasks into the scheduler.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT id, schedule_type, schedule_value FROM tasks WHERE enabled=1") rows = c.fetchall() conn.close() for row in rows: task_id, stype, svalue = row if stype == "recurring" and svalue: try: scheduler.add_job( _schedule_task_sync, CronTrigger.from_crontab(svalue), args=[task_id], id=f"task_{task_id}", replace_existing=True, misfire_grace_time=60 ) except Exception as e: logger.error(f"Failed to schedule task {task_id}: {e}") # ============ API Routes ============ @app.get("/api/health") async def health(): return { "status": "ok", "claude_status": claude_mgr.status, "claude_session_id": claude_mgr.session_id, "version": "3.0.0" } # -------- Claude Process Control -------- @app.post("/api/claude/start") async def start_claude(): """Start the Claude interactive process.""" ok = await claude_mgr.start() return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id} @app.post("/api/claude/stop") async def stop_claude(): """Stop the Claude interactive process.""" await claude_mgr.stop() return {"success": True, "status": claude_mgr.status} @app.post("/api/claude/restart") async def restart_claude(): """Restart the Claude interactive process.""" ok = await claude_mgr.restart() return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id} @app.get("/api/claude/status") async def claude_status(): """Get current status of the Claude process.""" proc_alive = claude_mgr._process is not None and ( claude_mgr._process.returncode is None ) return { "status": claude_mgr.status, "session_id": claude_mgr.session_id, "process_alive": proc_alive } # -------- Chat -------- @app.get("/api/chat/sessions") async def get_sessions(): return db_get_sessions() @app.get("/api/chat/sessions/{session_id}/messages") async def get_session_messages(session_id: str): return db_get_messages(session_id) @app.post("/api/chat/send") async def send_chat(msg: ChatSend): """ Send a message to the running Claude process. The session_id here is OUR tracking ID (stored in DB), not necessarily Claude's internal session ID. """ if claude_mgr.status != "ready": raise HTTPException(503, f"Claude process not ready (status: {claude_mgr.status}). " "Use /api/claude/start to start it.") session_id = msg.session_id or str(uuid.uuid4()) # Save user message to DB db_save_message(session_id, "user", msg.message) # Send to Claude process ok = await claude_mgr.send_message(msg.message) if not ok: raise HTTPException(503, "Failed to send message to Claude process") return {"session_id": session_id, "status": "sent"} @app.websocket("/api/chat/ws") async def chat_websocket(ws: WebSocket): """ WebSocket endpoint for real-time chat. Client sends: {"message": "...", "session_id": "..."} Server streams: Claude output events as JSON """ await ws.accept() queue = claude_mgr.subscribe() session_id = None # Send current process status immediately await ws.send_json({ "type": "system", "subtype": "connected", "claude_status": claude_mgr.status, "claude_session_id": claude_mgr.session_id }) async def send_from_queue(): """Forward Claude events to the WebSocket client.""" while True: try: event = await asyncio.wait_for(queue.get(), timeout=30) await ws.send_json(event) except asyncio.TimeoutError: # Send keepalive ping try: await ws.send_json({"type": "ping"}) except Exception: break except Exception: break send_task = asyncio.create_task(send_from_queue()) try: while True: data = await ws.receive_json() action = data.get("action", "send") if action == "send": text = data.get("message", "") session_id = data.get("session_id") or session_id or str(uuid.uuid4()) if not text: continue # Save user message db_save_message(session_id, "user", text) if claude_mgr.status != "ready": await ws.send_json({ "type": "error", "message": f"Claude process not ready (status: {claude_mgr.status})" }) continue ok = await claude_mgr.send_message(text) if not ok: await ws.send_json({ "type": "error", "message": "Failed to send to Claude process" }) else: # Acknowledge await ws.send_json({ "type": "user_ack", "session_id": session_id, "message": text }) elif action == "start_claude": ok = await claude_mgr.start() await ws.send_json({ "type": "system", "subtype": "start_result", "success": ok, "claude_status": claude_mgr.status }) elif action == "restart_claude": ok = await claude_mgr.restart() await ws.send_json({ "type": "system", "subtype": "restart_result", "success": ok, "claude_status": claude_mgr.status }) elif action == "get_status": await ws.send_json({ "type": "system", "subtype": "status", "claude_status": claude_mgr.status, "claude_session_id": claude_mgr.session_id }) except WebSocketDisconnect: pass except Exception as e: logger.error(f"WebSocket error: {e}") finally: send_task.cancel() claude_mgr.unsubscribe(queue) # Save assistant messages from buffer (handled via DB in handle_event for full impl) logger.info(f"WebSocket disconnected, session: {session_id}") # -------- Tasks -------- @app.get("/api/tasks") async def list_tasks(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tasks ORDER BY created_at DESC") rows = c.fetchall() cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", "enabled", "created_at", "last_run", "next_run", "status", "agent_tools", "agent_system_prompt", "agent_max_turns", "agent_permission_mode", "agent_timeout"] conn.close() return [dict(zip(cols, r)) for r in rows] @app.post("/api/tasks") async def create_task(task: Task): task.id = str(uuid.uuid4()) task.created_at = datetime.now().isoformat() task.status = "idle" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" INSERT INTO tasks (id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status, agent_tools, agent_system_prompt, agent_max_turns, agent_permission_mode, agent_timeout) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) """, (task.id, task.name, task.description, task.prompt, task.schedule_type, task.schedule_value, task.enabled, task.created_at, task.status, task.agent_tools, task.agent_system_prompt, task.agent_max_turns, task.agent_permission_mode, task.agent_timeout)) conn.commit() conn.close() if task.schedule_type == "recurring" and task.schedule_value and task.enabled: try: scheduler.add_job( _schedule_task_sync, CronTrigger.from_crontab(task.schedule_value), args=[task.id], id=f"task_{task.id}", replace_existing=True, misfire_grace_time=60 ) except Exception as e: logger.error(f"Failed to schedule: {e}") return task @app.get("/api/tasks/{task_id}") async def get_task(task_id: str): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) row = c.fetchone() conn.close() if not row: raise HTTPException(404, "Task not found") cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", "enabled", "created_at", "last_run", "next_run", "status", "agent_tools", "agent_system_prompt", "agent_max_turns", "agent_permission_mode", "agent_timeout"] return dict(zip(cols, row)) @app.put("/api/tasks/{task_id}") async def update_task(task_id: str, task: Task): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" UPDATE tasks SET name=?, description=?, prompt=?, schedule_type=?, schedule_value=?, enabled=?, agent_tools=?, agent_system_prompt=?, agent_max_turns=?, agent_permission_mode=?, agent_timeout=? WHERE id=? """, (task.name, task.description, task.prompt, task.schedule_type, task.schedule_value, task.enabled, task.agent_tools, task.agent_system_prompt, task.agent_max_turns, task.agent_permission_mode, task.agent_timeout, task_id)) conn.commit() conn.close() # Reschedule try: scheduler.remove_job(f"task_{task_id}") except Exception: pass if task.schedule_type == "recurring" and task.schedule_value and task.enabled: try: scheduler.add_job( _schedule_task_sync, CronTrigger.from_crontab(task.schedule_value), args=[task_id], id=f"task_{task_id}", replace_existing=True, misfire_grace_time=60 ) except Exception as e: logger.error(f"Failed to reschedule: {e}") return {"success": True} @app.delete("/api/tasks/{task_id}") async def delete_task(task_id: str): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("DELETE FROM tasks WHERE id=?", (task_id,)) conn.commit() conn.close() try: scheduler.remove_job(f"task_{task_id}") except Exception: pass return {"success": True} @app.post("/api/tasks/{task_id}/run") async def run_task_now(task_id: str, background_tasks: BackgroundTasks): """Run a task immediately (non-interactive, separate process).""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) row = c.fetchone() conn.close() if not row: raise HTTPException(404, "Task not found") cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", "enabled", "created_at", "last_run", "next_run", "status", "agent_tools", "agent_system_prompt", "agent_max_turns", "agent_permission_mode", "agent_timeout"] task_data = dict(zip(cols, row)) task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields}) background_tasks.add_task(_run_scheduled_task, task_id) return {"success": True, "message": f"Task '{task.name}' started"} @app.get("/api/tasks/{task_id}/runs") async def get_task_runs(task_id: str): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM task_runs WHERE task_id=? ORDER BY started_at DESC LIMIT 20", (task_id,)) rows = c.fetchall() conn.close() cols = ["run_id", "task_id", "status", "output", "error", "started_at", "completed_at"] return [dict(zip(cols, r)) for r in rows] # -------- MCP Servers (config file based) -------- MCP_CONFIG_PATH = Path("/app/data/mcp_config.json") def _load_mcp_config() -> dict: if MCP_CONFIG_PATH.exists(): try: return json.loads(MCP_CONFIG_PATH.read_text()) except Exception: pass return {"mcpServers": {}} def _save_mcp_config(config: dict): MCP_CONFIG_PATH.write_text(json.dumps(config, indent=2)) @app.get("/api/mcp/servers") async def list_mcp_servers(): config = _load_mcp_config() servers = [] for name, cfg in config.get("mcpServers", {}).items(): servers.append({"name": name, **cfg}) return servers @app.post("/api/mcp/servers") async def add_mcp_server(server: McpServerAdd): config = _load_mcp_config() entry = {"type": server.server_type} if server.url: entry["url"] = server.url if server.command: entry["command"] = server.command if server.args: entry["args"] = server.args config.setdefault("mcpServers", {})[server.name] = entry _save_mcp_config(config) return {"success": True} @app.delete("/api/mcp/servers/{name}") async def remove_mcp_server(name: str): config = _load_mcp_config() config.get("mcpServers", {}).pop(name, None) _save_mcp_config(config) return {"success": True} # -------- Static files / React SPA -------- # Try multiple possible frontend build output locations STATIC_DIR = Path("/app/static") if not STATIC_DIR.exists(): STATIC_DIR = Path("/app/frontend/dist") # Mount /assets for Vite-built JS/CSS bundles if STATIC_DIR.exists() and (STATIC_DIR / "assets").exists(): app.mount("/assets", StaticFiles(directory=str(STATIC_DIR / "assets")), name="assets") @app.get("/") async def serve_index(): index = STATIC_DIR / "index.html" if index.exists(): return FileResponse(str(index)) return {"message": "Claude Persistent Agent API v3.0", "docs": "/docs"} @app.get("/vite.svg") @app.get("/favicon.ico") async def serve_static_root_files(): """Serve known static root files.""" from starlette.requests import Request return FileResponse(str(STATIC_DIR / "vite.svg")) # SPA fallback via middleware — this avoids the catch-all route problem # that breaks WebSocket routing in some Starlette versions from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request as StarletteRequest from starlette.responses import Response class SPAFallbackMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: StarletteRequest, call_next): response = await call_next(request) # If a non-API GET request returned 404, serve index.html for SPA if (response.status_code == 404 and request.method == "GET" and not request.url.path.startswith("/api") and not request.url.path.startswith("/docs") and not request.url.path.startswith("/openapi") and not request.url.path.startswith("/assets") and "websocket" not in request.headers.get("upgrade", "").lower()): index = STATIC_DIR / "index.html" if index.exists(): return FileResponse(str(index)) return response app.add_middleware(SPAFallbackMiddleware)