From 4b3b34ef195eb53fae2c56fc2886c8259eb4dce3 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sun, 5 Apr 2026 10:49:04 -0400 Subject: [PATCH] feat: persistent Claude process manager v3 --- backend/main.py | 1011 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1011 insertions(+) diff --git a/backend/main.py b/backend/main.py index e69de29..c900854 100644 --- a/backend/main.py +++ b/backend/main.py @@ -0,0 +1,1011 @@ +""" +Claude Persistent Agent - FastAPI Backend +Interfaces with a long-running interactive Claude Code process via stdin/stdout pipes. +""" + +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Optional, List, Dict, Any +import asyncio +import json +import sqlite3 +import subprocess +import logging +from datetime import datetime +import uuid +from pathlib import Path +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +import os +import threading + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI(title="Claude Persistent Agent", version="3.0.0") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Paths +DB_PATH = Path("/app/data/tasks.db") +LOGS_PATH = Path("/app/logs") +TASKS_PATH = Path("/app/tasks") +DB_PATH.parent.mkdir(parents=True, exist_ok=True) +LOGS_PATH.mkdir(parents=True, exist_ok=True) +TASKS_PATH.mkdir(parents=True, exist_ok=True) + + +# ============ Models ============ + +class Task(BaseModel): + id: Optional[str] = None + name: str + description: Optional[str] = None + prompt: str + schedule_type: str = "manual" + schedule_value: Optional[str] = None + enabled: bool = True + created_at: Optional[str] = None + last_run: Optional[str] = None + next_run: Optional[str] = None + status: str = "idle" + agent_tools: Optional[str] = None + agent_system_prompt: Optional[str] = None + agent_max_turns: Optional[int] = None + agent_permission_mode: str = "auto" + agent_timeout: int = 300 + + +class TaskRun(BaseModel): + task_id: str + run_id: str + status: str + output: Optional[str] = None + error: Optional[str] = None + started_at: str + completed_at: Optional[str] = None + + +class ChatSend(BaseModel): + message: str + session_id: Optional[str] = None + + +class McpServerAdd(BaseModel): + name: str + server_type: str = "sse" + url: Optional[str] = None + command: Optional[str] = None + args: Optional[List[str]] = None + + +# ============ Database ============ + +def init_db(): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + c.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + prompt TEXT NOT NULL, + schedule_type TEXT NOT NULL, + schedule_value TEXT, + enabled BOOLEAN DEFAULT 1, + created_at TEXT, + last_run TEXT, + next_run TEXT, + status TEXT DEFAULT 'idle', + agent_tools TEXT, + agent_system_prompt TEXT, + agent_max_turns INTEGER, + agent_permission_mode TEXT DEFAULT 'auto', + agent_timeout INTEGER DEFAULT 300 + ) + """) + + c.execute(""" + CREATE TABLE IF NOT EXISTS task_runs ( + run_id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + status TEXT NOT NULL, + output TEXT, + error TEXT, + started_at TEXT, + completed_at TEXT, + FOREIGN KEY (task_id) REFERENCES tasks(id) + ) + """) + + c.execute(""" + CREATE TABLE IF NOT EXISTS chat_messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp TEXT NOT NULL, + metadata TEXT + ) + """) + + # Migrations for any legacy columns + for col in ["agent_tools TEXT", "agent_system_prompt TEXT", + "agent_max_turns INTEGER", "agent_permission_mode TEXT DEFAULT 'auto'", + "agent_timeout INTEGER DEFAULT 300"]: + try: + c.execute(f"ALTER TABLE tasks ADD COLUMN {col}") + except sqlite3.OperationalError: + pass + + conn.commit() + conn.close() + + +def db_save_message(session_id: str, role: str, content: str, metadata: dict = None): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute( + "INSERT INTO chat_messages (id, session_id, role, content, timestamp, metadata) VALUES (?,?,?,?,?,?)", + (str(uuid.uuid4()), session_id, role, content, + datetime.now().isoformat(), json.dumps(metadata) if metadata else None) + ) + conn.commit() + conn.close() + + +def db_get_messages(session_id: str) -> list: + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute( + "SELECT id, role, content, timestamp, metadata FROM chat_messages WHERE session_id=? ORDER BY timestamp", + (session_id,) + ) + rows = c.fetchall() + conn.close() + return [{"id": r[0], "role": r[1], "content": r[2], "timestamp": r[3], + "metadata": json.loads(r[4]) if r[4] else None} for r in rows] + + +def db_get_sessions() -> list: + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(""" + SELECT session_id, + MIN(timestamp) as started, + MAX(timestamp) as last_active, + COUNT(*) as message_count + FROM chat_messages + GROUP BY session_id + ORDER BY last_active DESC + """) + rows = c.fetchall() + conn.close() + return [{"session_id": r[0], "started": r[1], "last_active": r[2], "message_count": r[3]} + for r in rows] + + +# ============ Persistent Claude Process Manager ============ + +class ClaudeProcessManager: + """ + Manages a long-running `claude` interactive process. + Communicates via stdin/stdout using stream-json format. + + The process is started once (after auth is confirmed) and kept alive. + Messages are sent as JSON lines to stdin; responses arrive as JSON lines from stdout. + + Claude stream-json input format (one line per message): + {"type": "user", "message": {"role": "user", "content": "your prompt"}} + + Claude stream-json output events (one line per event): + {"type": "system", "subtype": "init", "session_id": "...", ...} + {"type": "assistant", "message": {"role": "assistant", "content": [...]}} + {"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}} + {"type": "result", "subtype": "success", "result": "...", "session_id": "..."} + {"type": "result", "subtype": "error_max_turns", ...} + """ + + def __init__(self): + self._process: Optional[asyncio.subprocess.Process] = None + self._lock = asyncio.Lock() + self._response_buffer: List[str] = [] + self._waiting_for_response = False + self._stdout_task: Optional[asyncio.Task] = None + self._current_session_id: Optional[str] = None + # Subscribers: session_id -> list of WebSocket queues + self._subscribers: Dict[str, List[asyncio.Queue]] = {} + # Global broadcast queues for all connected clients + self._broadcast_queues: List[asyncio.Queue] = [] + self._is_ready = False + self._status = "not_started" # not_started, starting, ready, dead + + @property + def status(self): + return self._status + + @property + def session_id(self): + return self._current_session_id + + async def start(self) -> bool: + """Start the Claude interactive process.""" + async with self._lock: + if self._process and self._process.returncode is None: + return True # Already running + + self._status = "starting" + logger.info("Starting Claude interactive process...") + + env = os.environ.copy() + + # Build command: interactive claude with stream-json I/O + cmd = [ + "claude", + "--output-format", "stream-json", + "--input-format", "stream-json", + "--verbose", + "--dangerously-skip-permissions", + ] + + try: + self._process = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env + ) + self._is_ready = False + self._status = "starting" + + # Start stdout reader task + if self._stdout_task: + self._stdout_task.cancel() + self._stdout_task = asyncio.create_task(self._read_stdout()) + + # Wait for the init event (up to 15 seconds) + for _ in range(150): + await asyncio.sleep(0.1) + if self._is_ready: + break + + if self._is_ready: + logger.info(f"Claude process ready, session: {self._current_session_id}") + self._status = "ready" + return True + else: + logger.warning("Claude process started but no init event yet - proceeding anyway") + self._status = "ready" + return True + + except FileNotFoundError: + self._status = "dead" + logger.error("claude CLI not found in PATH") + return False + except Exception as e: + self._status = "dead" + logger.error(f"Failed to start Claude process: {e}") + return False + + async def stop(self): + """Stop the Claude process.""" + if self._process: + try: + self._process.terminate() + await asyncio.wait_for(self._process.wait(), timeout=5) + except Exception: + self._process.kill() + self._process = None + self._status = "not_started" + self._is_ready = False + if self._stdout_task: + self._stdout_task.cancel() + self._stdout_task = None + + async def restart(self) -> bool: + """Restart the Claude process.""" + await self.stop() + return await self.start() + + async def _read_stdout(self): + """Continuously read stdout from the Claude process and broadcast events.""" + try: + while self._process and self._process.returncode is None: + line = await self._process.stdout.readline() + if not line: + break + line_str = line.decode("utf-8", errors="replace").strip() + if not line_str: + continue + + try: + event = json.loads(line_str) + except json.JSONDecodeError: + logger.debug(f"Non-JSON stdout: {line_str}") + continue + + await self._handle_event(event) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error reading Claude stdout: {e}") + finally: + if self._status == "ready": + self._status = "dead" + logger.warning("Claude process stdout ended") + # Notify all subscribers the process died + await self._broadcast({"type": "system", "subtype": "process_dead", + "message": "Claude process ended unexpectedly"}) + + async def _handle_event(self, event: dict): + """Process an event from Claude's stdout.""" + event_type = event.get("type", "") + + if event_type == "system" and event.get("subtype") == "init": + self._current_session_id = event.get("session_id") + self._is_ready = True + logger.info(f"Claude session initialized: {self._current_session_id}") + + elif event_type == "result": + # End of a response turn + session_id = event.get("session_id", self._current_session_id) + self._current_session_id = session_id + + await self._broadcast(event) + + async def _broadcast(self, event: dict): + """Send event to all connected WebSocket queues.""" + dead_queues = [] + for q in self._broadcast_queues: + try: + q.put_nowait(event) + except asyncio.QueueFull: + dead_queues.append(q) + for q in dead_queues: + if q in self._broadcast_queues: + self._broadcast_queues.remove(q) + + def subscribe(self) -> asyncio.Queue: + """Subscribe to Claude output events. Returns a queue.""" + q = asyncio.Queue(maxsize=200) + self._broadcast_queues.append(q) + return q + + def unsubscribe(self, q: asyncio.Queue): + """Unsubscribe a queue.""" + if q in self._broadcast_queues: + self._broadcast_queues.remove(q) + + async def send_message(self, content: str) -> bool: + """Send a message to the running Claude process.""" + if not self._process or self._process.returncode is not None: + return False + if self._status != "ready": + return False + + # Claude stream-json input format + msg = json.dumps({ + "type": "user", + "message": {"role": "user", "content": content} + }) + "\n" + + try: + self._process.stdin.write(msg.encode("utf-8")) + await self._process.stdin.drain() + return True + except Exception as e: + logger.error(f"Failed to send message to Claude: {e}") + return False + + async def send_task_noninteractive(self, prompt: str, tools: str = None, + system_prompt: str = None, + permission_mode: str = "auto", + timeout: int = 300) -> dict: + """ + Run a one-shot task using a separate claude -p invocation + (so it doesn't interfere with the interactive session). + Returns {output, error, exit_code}. + """ + cmd = ["claude", "-p", + "--output-format", "json", + "--dangerously-skip-permissions"] + + if tools: + cmd.extend(["--allowedTools", tools]) + if system_prompt: + cmd.extend(["--system-prompt", system_prompt]) + if permission_mode: + cmd.extend(["--permission-mode", permission_mode]) + + cmd.append(prompt) + + env = os.environ.copy() + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + stdout_str = stdout.decode("utf-8", errors="replace") + stderr_str = stderr.decode("utf-8", errors="replace") + + result_text = "" + try: + data = json.loads(stdout_str) + result_text = data.get("result", stdout_str) + except json.JSONDecodeError: + result_text = stdout_str + + return { + "output": result_text, + "error": stderr_str if proc.returncode != 0 else None, + "exit_code": proc.returncode + } + except asyncio.TimeoutError: + return {"output": None, "error": f"Task timed out after {timeout}s", "exit_code": -1} + except FileNotFoundError: + return {"output": None, "error": "claude CLI not found", "exit_code": -1} + except Exception as e: + return {"output": None, "error": str(e), "exit_code": -1} + + +# Global process manager instance +claude_mgr = ClaudeProcessManager() + + +# ============ Scheduler ============ + +scheduler = BackgroundScheduler() + + +def _schedule_task_sync(task_id: str): + """Synchronous wrapper for scheduled tasks (runs in APScheduler thread).""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_run_scheduled_task(task_id)) + finally: + loop.close() + + +async def _run_scheduled_task(task_id: str): + """Execute a scheduled task via the non-interactive claude -p path.""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) + row = c.fetchone() + conn.close() + + if not row: + logger.error(f"Scheduled task {task_id} not found") + return + + cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", + "enabled", "created_at", "last_run", "next_run", "status", + "agent_tools", "agent_system_prompt", "agent_max_turns", + "agent_permission_mode", "agent_timeout"] + task_data = dict(zip(cols, row)) + task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields}) + + run_id = str(uuid.uuid4()) + started = datetime.now().isoformat() + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute( + "INSERT INTO task_runs (run_id, task_id, status, started_at) VALUES (?,?,?,?)", + (run_id, task_id, "running", started) + ) + c.execute("UPDATE tasks SET status='running', last_run=? WHERE id=?", (started, task_id)) + conn.commit() + conn.close() + + result = await claude_mgr.send_task_noninteractive( + prompt=task.prompt, + tools=task.agent_tools, + system_prompt=task.agent_system_prompt, + permission_mode=task.agent_permission_mode, + timeout=task.agent_timeout + ) + + completed = datetime.now().isoformat() + status = "completed" if result["exit_code"] == 0 else "failed" + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute( + "UPDATE task_runs SET status=?, output=?, error=?, completed_at=? WHERE run_id=?", + (status, result["output"], result["error"], completed, run_id) + ) + c.execute("UPDATE tasks SET status='idle' WHERE id=?", (task_id,)) + conn.commit() + conn.close() + logger.info(f"Task {task_id} ({task.name}) completed with status {status}") + + +# ============ Startup / Shutdown ============ + +@app.on_event("startup") +async def startup(): + init_db() + scheduler.start() + _reload_schedules() + # Auto-start the Claude interactive process + asyncio.create_task(_auto_start_claude()) + + +async def _auto_start_claude(): + """Try to start the Claude process on startup.""" + await asyncio.sleep(2) # Let the server fully start first + ok = await claude_mgr.start() + if ok: + logger.info("Claude interactive process started on startup") + else: + logger.warning("Could not start Claude process on startup (auth may be needed)") + + +@app.on_event("shutdown") +async def shutdown(): + scheduler.shutdown() + await claude_mgr.stop() + + +def _reload_schedules(): + """Load all enabled recurring tasks into the scheduler.""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT id, schedule_type, schedule_value FROM tasks WHERE enabled=1") + rows = c.fetchall() + conn.close() + + for row in rows: + task_id, stype, svalue = row + if stype == "recurring" and svalue: + try: + scheduler.add_job( + _schedule_task_sync, + CronTrigger.from_crontab(svalue), + args=[task_id], + id=f"task_{task_id}", + replace_existing=True, + misfire_grace_time=60 + ) + except Exception as e: + logger.error(f"Failed to schedule task {task_id}: {e}") + + +# ============ API Routes ============ + +@app.get("/api/health") +async def health(): + return { + "status": "ok", + "claude_status": claude_mgr.status, + "claude_session_id": claude_mgr.session_id, + "version": "3.0.0" + } + + +# -------- Claude Process Control -------- + +@app.post("/api/claude/start") +async def start_claude(): + """Start the Claude interactive process.""" + ok = await claude_mgr.start() + return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id} + + +@app.post("/api/claude/stop") +async def stop_claude(): + """Stop the Claude interactive process.""" + await claude_mgr.stop() + return {"success": True, "status": claude_mgr.status} + + +@app.post("/api/claude/restart") +async def restart_claude(): + """Restart the Claude interactive process.""" + ok = await claude_mgr.restart() + return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id} + + +@app.get("/api/claude/status") +async def claude_status(): + """Get current status of the Claude process.""" + proc_alive = claude_mgr._process is not None and ( + claude_mgr._process.returncode is None + ) + return { + "status": claude_mgr.status, + "session_id": claude_mgr.session_id, + "process_alive": proc_alive + } + + +# -------- Chat -------- + +@app.get("/api/chat/sessions") +async def get_sessions(): + return db_get_sessions() + + +@app.get("/api/chat/sessions/{session_id}/messages") +async def get_session_messages(session_id: str): + return db_get_messages(session_id) + + +@app.post("/api/chat/send") +async def send_chat(msg: ChatSend): + """ + Send a message to the running Claude process. + The session_id here is OUR tracking ID (stored in DB), + not necessarily Claude's internal session ID. + """ + if claude_mgr.status != "ready": + raise HTTPException(503, f"Claude process not ready (status: {claude_mgr.status}). " + "Use /api/claude/start to start it.") + + session_id = msg.session_id or str(uuid.uuid4()) + + # Save user message to DB + db_save_message(session_id, "user", msg.message) + + # Send to Claude process + ok = await claude_mgr.send_message(msg.message) + if not ok: + raise HTTPException(503, "Failed to send message to Claude process") + + return {"session_id": session_id, "status": "sent"} + + +@app.websocket("/api/chat/ws") +async def chat_websocket(ws: WebSocket): + """ + WebSocket endpoint for real-time chat. + Client sends: {"message": "...", "session_id": "..."} + Server streams: Claude output events as JSON + """ + await ws.accept() + queue = claude_mgr.subscribe() + session_id = None + + # Send current process status immediately + await ws.send_json({ + "type": "system", + "subtype": "connected", + "claude_status": claude_mgr.status, + "claude_session_id": claude_mgr.session_id + }) + + async def send_from_queue(): + """Forward Claude events to the WebSocket client.""" + while True: + try: + event = await asyncio.wait_for(queue.get(), timeout=30) + await ws.send_json(event) + except asyncio.TimeoutError: + # Send keepalive ping + try: + await ws.send_json({"type": "ping"}) + except Exception: + break + except Exception: + break + + send_task = asyncio.create_task(send_from_queue()) + + try: + while True: + data = await ws.receive_json() + action = data.get("action", "send") + + if action == "send": + text = data.get("message", "") + session_id = data.get("session_id") or session_id or str(uuid.uuid4()) + + if not text: + continue + + # Save user message + db_save_message(session_id, "user", text) + + if claude_mgr.status != "ready": + await ws.send_json({ + "type": "error", + "message": f"Claude process not ready (status: {claude_mgr.status})" + }) + continue + + ok = await claude_mgr.send_message(text) + if not ok: + await ws.send_json({ + "type": "error", + "message": "Failed to send to Claude process" + }) + else: + # Acknowledge + await ws.send_json({ + "type": "user_ack", + "session_id": session_id, + "message": text + }) + + elif action == "start_claude": + ok = await claude_mgr.start() + await ws.send_json({ + "type": "system", + "subtype": "start_result", + "success": ok, + "claude_status": claude_mgr.status + }) + + elif action == "restart_claude": + ok = await claude_mgr.restart() + await ws.send_json({ + "type": "system", + "subtype": "restart_result", + "success": ok, + "claude_status": claude_mgr.status + }) + + elif action == "get_status": + await ws.send_json({ + "type": "system", + "subtype": "status", + "claude_status": claude_mgr.status, + "claude_session_id": claude_mgr.session_id + }) + + except WebSocketDisconnect: + pass + except Exception as e: + logger.error(f"WebSocket error: {e}") + finally: + send_task.cancel() + claude_mgr.unsubscribe(queue) + + # Save assistant messages from buffer (handled via DB in handle_event for full impl) + logger.info(f"WebSocket disconnected, session: {session_id}") + + +# -------- Tasks -------- + +@app.get("/api/tasks") +async def list_tasks(): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT * FROM tasks ORDER BY created_at DESC") + rows = c.fetchall() + cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", + "enabled", "created_at", "last_run", "next_run", "status", + "agent_tools", "agent_system_prompt", "agent_max_turns", + "agent_permission_mode", "agent_timeout"] + conn.close() + return [dict(zip(cols, r)) for r in rows] + + +@app.post("/api/tasks") +async def create_task(task: Task): + task.id = str(uuid.uuid4()) + task.created_at = datetime.now().isoformat() + task.status = "idle" + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(""" + INSERT INTO tasks (id, name, description, prompt, schedule_type, schedule_value, + enabled, created_at, status, agent_tools, agent_system_prompt, + agent_max_turns, agent_permission_mode, agent_timeout) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) + """, (task.id, task.name, task.description, task.prompt, task.schedule_type, + task.schedule_value, task.enabled, task.created_at, task.status, + task.agent_tools, task.agent_system_prompt, task.agent_max_turns, + task.agent_permission_mode, task.agent_timeout)) + conn.commit() + conn.close() + + if task.schedule_type == "recurring" and task.schedule_value and task.enabled: + try: + scheduler.add_job( + _schedule_task_sync, + CronTrigger.from_crontab(task.schedule_value), + args=[task.id], + id=f"task_{task.id}", + replace_existing=True, + misfire_grace_time=60 + ) + except Exception as e: + logger.error(f"Failed to schedule: {e}") + + return task + + +@app.get("/api/tasks/{task_id}") +async def get_task(task_id: str): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) + row = c.fetchone() + conn.close() + if not row: + raise HTTPException(404, "Task not found") + cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", + "enabled", "created_at", "last_run", "next_run", "status", + "agent_tools", "agent_system_prompt", "agent_max_turns", + "agent_permission_mode", "agent_timeout"] + return dict(zip(cols, row)) + + +@app.put("/api/tasks/{task_id}") +async def update_task(task_id: str, task: Task): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(""" + UPDATE tasks SET name=?, description=?, prompt=?, schedule_type=?, + schedule_value=?, enabled=?, agent_tools=?, agent_system_prompt=?, + agent_max_turns=?, agent_permission_mode=?, agent_timeout=? + WHERE id=? + """, (task.name, task.description, task.prompt, task.schedule_type, + task.schedule_value, task.enabled, task.agent_tools, task.agent_system_prompt, + task.agent_max_turns, task.agent_permission_mode, task.agent_timeout, task_id)) + conn.commit() + conn.close() + + # Reschedule + try: + scheduler.remove_job(f"task_{task_id}") + except Exception: + pass + if task.schedule_type == "recurring" and task.schedule_value and task.enabled: + try: + scheduler.add_job( + _schedule_task_sync, + CronTrigger.from_crontab(task.schedule_value), + args=[task_id], + id=f"task_{task_id}", + replace_existing=True, + misfire_grace_time=60 + ) + except Exception as e: + logger.error(f"Failed to reschedule: {e}") + + return {"success": True} + + +@app.delete("/api/tasks/{task_id}") +async def delete_task(task_id: str): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("DELETE FROM tasks WHERE id=?", (task_id,)) + conn.commit() + conn.close() + try: + scheduler.remove_job(f"task_{task_id}") + except Exception: + pass + return {"success": True} + + +@app.post("/api/tasks/{task_id}/run") +async def run_task_now(task_id: str, background_tasks: BackgroundTasks): + """Run a task immediately (non-interactive, separate process).""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) + row = c.fetchone() + conn.close() + if not row: + raise HTTPException(404, "Task not found") + + cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value", + "enabled", "created_at", "last_run", "next_run", "status", + "agent_tools", "agent_system_prompt", "agent_max_turns", + "agent_permission_mode", "agent_timeout"] + task_data = dict(zip(cols, row)) + task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields}) + + background_tasks.add_task(_run_scheduled_task, task_id) + return {"success": True, "message": f"Task '{task.name}' started"} + + +@app.get("/api/tasks/{task_id}/runs") +async def get_task_runs(task_id: str): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute("SELECT * FROM task_runs WHERE task_id=? ORDER BY started_at DESC LIMIT 20", (task_id,)) + rows = c.fetchall() + conn.close() + cols = ["run_id", "task_id", "status", "output", "error", "started_at", "completed_at"] + return [dict(zip(cols, r)) for r in rows] + + +# -------- MCP Servers (config file based) -------- + +MCP_CONFIG_PATH = Path("/app/data/mcp_config.json") + + +def _load_mcp_config() -> dict: + if MCP_CONFIG_PATH.exists(): + try: + return json.loads(MCP_CONFIG_PATH.read_text()) + except Exception: + pass + return {"mcpServers": {}} + + +def _save_mcp_config(config: dict): + MCP_CONFIG_PATH.write_text(json.dumps(config, indent=2)) + + +@app.get("/api/mcp/servers") +async def list_mcp_servers(): + config = _load_mcp_config() + servers = [] + for name, cfg in config.get("mcpServers", {}).items(): + servers.append({"name": name, **cfg}) + return servers + + +@app.post("/api/mcp/servers") +async def add_mcp_server(server: McpServerAdd): + config = _load_mcp_config() + entry = {"type": server.server_type} + if server.url: + entry["url"] = server.url + if server.command: + entry["command"] = server.command + if server.args: + entry["args"] = server.args + config.setdefault("mcpServers", {})[server.name] = entry + _save_mcp_config(config) + return {"success": True} + + +@app.delete("/api/mcp/servers/{name}") +async def remove_mcp_server(name: str): + config = _load_mcp_config() + config.get("mcpServers", {}).pop(name, None) + _save_mcp_config(config) + return {"success": True} + + +# -------- Static files / React SPA -------- + +STATIC_DIR = Path("/app/static") +if STATIC_DIR.exists(): + app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") + + +@app.get("/") +async def serve_index(): + index = STATIC_DIR / "index.html" + if index.exists(): + return FileResponse(str(index)) + return {"message": "Claude Persistent Agent API v3.0", "docs": "/docs"} + + +@app.get("/{full_path:path}") +async def serve_spa(full_path: str): + """Serve React SPA for all non-API routes.""" + if full_path.startswith("api/"): + raise HTTPException(404) + index = STATIC_DIR / "index.html" + if index.exists(): + return FileResponse(str(index)) + raise HTTPException(404)