""" Claude Persistent Agent - FastAPI Backend Main application with task scheduling, live chat, and agent orchestration """ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List, Dict, Any import asyncio import json import sqlite3 import subprocess import logging from datetime import datetime, timedelta import uuid from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import os # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI(title="Claude Persistent Agent", version="2.0.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database setup DB_PATH = Path("/app/data/tasks.db") LOGS_PATH = Path("/app/logs") TASKS_PATH = Path("/app/tasks") TOKEN_FILE = Path("/app/data/.claude_token") CHAT_HISTORY_PATH = Path("/app/data/chat_history.json") # Ensure directories exist DB_PATH.parent.mkdir(parents=True, exist_ok=True) LOGS_PATH.mkdir(parents=True, exist_ok=True) TASKS_PATH.mkdir(parents=True, exist_ok=True) # ============ Models ============ class Task(BaseModel): id: Optional[str] = None name: str description: Optional[str] = None prompt: str schedule_type: str = "manual" # "once", "recurring", "manual" schedule_value: Optional[str] = None enabled: bool = True created_at: Optional[str] = None last_run: Optional[str] = None next_run: Optional[str] = None status: str = "idle" # Agent orchestration fields agent_model: Optional[str] = None # "sonnet", "opus", "haiku" agent_tools: Optional[str] = None # comma-separated: "Bash,Read,Write,Edit" agent_mcp_servers: Optional[str] = None # comma-separated MCP server names agent_system_prompt: Optional[str] = None # custom system prompt agent_max_turns: Optional[int] = None # max conversation turns agent_permission_mode: str = "auto" # "auto", "acceptEdits", "plan" agent_timeout: int = 300 # seconds class TaskRun(BaseModel): task_id: str run_id: str status: str output: Optional[str] = None error: Optional[str] = None started_at: str completed_at: Optional[str] = None class ChatMessage(BaseModel): message: str model: Optional[str] = None system_prompt: Optional[str] = None tools: Optional[str] = None session_id: Optional[str] = None class TokenSubmit(BaseModel): token: str token_type: str = "oauth_token" class McpServerAdd(BaseModel): name: str server_type: str = "sse" url: Optional[str] = None command: Optional[str] = None args: Optional[List[str]] = None # ============ Database ============ def init_db(): """Initialize SQLite database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, prompt TEXT NOT NULL, schedule_type TEXT NOT NULL, schedule_value TEXT, enabled BOOLEAN DEFAULT 1, created_at TEXT, last_run TEXT, next_run TEXT, status TEXT DEFAULT 'idle', agent_model TEXT, agent_tools TEXT, agent_mcp_servers TEXT, agent_system_prompt TEXT, agent_max_turns INTEGER, agent_permission_mode TEXT DEFAULT 'auto', agent_timeout INTEGER DEFAULT 300 ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS task_runs ( run_id TEXT PRIMARY KEY, task_id TEXT NOT NULL, status TEXT NOT NULL, output TEXT, error TEXT, started_at TEXT, completed_at TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS chat_messages ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp TEXT NOT NULL, model TEXT, metadata TEXT ) """) # Migration: add new columns if they don't exist try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_model TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_tools TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_mcp_servers TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_system_prompt TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_max_turns INTEGER") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_permission_mode TEXT DEFAULT 'auto'") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE tasks ADD COLUMN agent_timeout INTEGER DEFAULT 300") except sqlite3.OperationalError: pass conn.commit() conn.close() # ============ Auth Helpers ============ def _get_claude_env(): """Get environment with auth tokens set for Claude CLI""" env = os.environ.copy() if TOKEN_FILE.exists(): try: saved = json.loads(TOKEN_FILE.read_text()) if saved.get("type") == "oauth_token" and saved.get("token"): env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] elif saved.get("type") == "api_key" and saved.get("token"): env["ANTHROPIC_API_KEY"] = saved["token"] except Exception as e: logger.error(f"Failed to load saved token: {e}") return env def _build_claude_cmd(prompt: str, model: str = None, tools: str = None, system_prompt: str = None, permission_mode: str = "auto", resume_session_id: str = None) -> list: """Build a claude CLI command with agent configuration. Note: --session-id requires a valid UUID and creates a persistent session. For chat continuity, use resume_session_id with --resume. For one-shot tasks, omit resume_session_id. """ cmd = ["claude", "-p"] if model: cmd.extend(["--model", model]) if tools: cmd.extend(["--allowedTools", tools]) else: cmd.extend(["--allowedTools", "Bash,Read,Write,Edit"]) if permission_mode: cmd.extend(["--permission-mode", permission_mode]) if system_prompt: cmd.extend(["--system-prompt", system_prompt]) # Resume an existing Claude session for multi-turn chat if resume_session_id: cmd.extend(["--resume", resume_session_id]) cmd.append(prompt) return cmd # ============ Task Execution ============ async def run_claude_task(task: Task, run_id: str): """Execute a Claude Code task as an agent""" started = datetime.now().isoformat() try: env = _get_claude_env() cmd = _build_claude_cmd( prompt=task.prompt, model=task.agent_model, tools=task.agent_tools, system_prompt=task.agent_system_prompt, permission_mode=task.agent_permission_mode ) timeout = task.agent_timeout or 300 logger.info(f"Running task {task.name}: {' '.join(cmd)}") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, # Fix stdin warning env=env ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) output = stdout.decode(errors="replace") if stdout else "" error = stderr.decode(errors="replace") if stderr else "" # Filter out non-critical warnings from stderr error_lines = [l for l in error.split("\n") if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()] filtered_error = "\n".join(error_lines) status = "completed" if process.returncode == 0 else "failed" save_task_run(TaskRun( task_id=task.id, run_id=run_id, status=status, output=output, error=filtered_error if status == "failed" else None, started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, status) except asyncio.TimeoutError: save_task_run(TaskRun( task_id=task.id, run_id=run_id, status="failed", error=f"Task timeout (>{task.agent_timeout or 300}s)", started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, "failed") except Exception as e: save_task_run(TaskRun( task_id=task.id, run_id=run_id, status="failed", error=str(e), started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, "failed") # ============ DB Operations ============ def save_task(task: Task): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() if not task.id: task.id = str(uuid.uuid4()) if not task.created_at: task.created_at = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO tasks (id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status, agent_model, agent_tools, agent_mcp_servers, agent_system_prompt, agent_max_turns, agent_permission_mode, agent_timeout) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task.id, task.name, task.description, task.prompt, task.schedule_type, task.schedule_value, task.enabled, task.created_at, task.status, task.agent_model, task.agent_tools, task.agent_mcp_servers, task.agent_system_prompt, task.agent_max_turns, task.agent_permission_mode, task.agent_timeout )) conn.commit() conn.close() return task def save_task_run(run: TaskRun): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT INTO task_runs (run_id, task_id, status, output, error, started_at, completed_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (run.run_id, run.task_id, run.status, run.output, run.error, run.started_at, run.completed_at)) conn.commit() conn.close() def update_task_status(task_id: str, status: str): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?", (status, datetime.now().isoformat(), task_id)) conn.commit() conn.close() def get_task(task_id: str) -> Optional[Task]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) row = cursor.fetchone() conn.close() if row: return Task(**dict(row)) return None def get_all_tasks() -> List[Task]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") rows = cursor.fetchall() conn.close() return [Task(**dict(row)) for row in rows] # ============ Scheduler ============ scheduler = BackgroundScheduler() def schedule_task(task: Task): if not task.enabled or task.schedule_type == "manual": return if task.schedule_type == "recurring" and task.schedule_value: try: scheduler.add_job( _sync_run_task, CronTrigger.from_crontab(task.schedule_value), id=task.id, args=[task], replace_existing=True ) logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}") except Exception as e: logger.error(f"Failed to schedule task {task.name}: {e}") def _sync_run_task(task: Task): """Sync wrapper for scheduled tasks (APScheduler doesn't support async natively)""" loop = asyncio.new_event_loop() run_id = str(uuid.uuid4()) update_task_status(task.id, "running") loop.run_until_complete(run_claude_task(task, run_id)) loop.close() # ============ Chat Sessions ============ # Active chat sessions: our_session_id -> process _chat_sessions: Dict[str, asyncio.subprocess.Process] = {} # Mapping: our_session_id -> claude_session_id (for --resume) _claude_session_map: Dict[str, str] = {} def save_chat_message(session_id: str, role: str, content: str, model: str = None, metadata: dict = None): """Save a chat message to the database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT INTO chat_messages (id, session_id, role, content, timestamp, model, metadata) VALUES (?, ?, ?, ?, ?, ?, ?) """, (str(uuid.uuid4()), session_id, role, content, datetime.now().isoformat(), model, json.dumps(metadata) if metadata else None)) conn.commit() conn.close() def _get_claude_session_id(our_session_id: str) -> Optional[str]: """Get the Claude CLI session ID mapped to our session ID""" if our_session_id in _claude_session_map: return _claude_session_map[our_session_id] # Also check DB metadata conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT metadata FROM chat_messages WHERE session_id = ? AND metadata IS NOT NULL ORDER BY timestamp DESC LIMIT 1 """, (our_session_id,)) row = cursor.fetchone() conn.close() if row and row[0]: try: meta = json.loads(row[0]) claude_sid = meta.get("claude_session_id") if claude_sid: _claude_session_map[our_session_id] = claude_sid return claude_sid except Exception: pass return None def get_chat_history(session_id: str, limit: int = 50) -> List[Dict]: """Get chat history for a session""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT * FROM chat_messages WHERE session_id = ? ORDER BY timestamp ASC LIMIT ? """, (session_id, limit)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def list_chat_sessions() -> List[Dict]: """List all chat sessions with latest message""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT session_id, COUNT(*) as message_count, MIN(timestamp) as started_at, MAX(timestamp) as last_message, (SELECT content FROM chat_messages cm2 WHERE cm2.session_id = cm.session_id AND cm2.role = 'user' ORDER BY timestamp ASC LIMIT 1) as first_message FROM chat_messages cm GROUP BY session_id ORDER BY MAX(timestamp) DESC LIMIT 20 """) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] # ============ API Routes ============ @app.on_event("startup") async def startup(): init_db() scheduler.start() if TOKEN_FILE.exists(): try: saved = json.loads(TOKEN_FILE.read_text()) if saved.get("type") == "oauth_token" and saved.get("token"): os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] logger.info("Loaded saved OAuth token") elif saved.get("type") == "api_key" and saved.get("token"): os.environ["ANTHROPIC_API_KEY"] = saved["token"] logger.info("Loaded saved API key") except Exception as e: logger.error(f"Failed to load saved token: {e}") for task in get_all_tasks(): if task.enabled: schedule_task(task) logger.info("Claude Persistent Agent v2.0 started") @app.on_event("shutdown") async def shutdown(): scheduler.shutdown() # Kill any active chat sessions for sid, proc in _chat_sessions.items(): try: proc.kill() except Exception: pass @app.get("/health") async def health(): return {"status": "healthy", "timestamp": datetime.now().isoformat(), "version": "2.0.0"} # ---- Task CRUD ---- @app.post("/api/tasks") async def create_task(task: Task) -> Task: saved_task = save_task(task) schedule_task(saved_task) return saved_task @app.get("/api/tasks") async def list_tasks() -> List[Task]: return get_all_tasks() @app.get("/api/tasks/{task_id}") async def get_task_endpoint(task_id: str) -> Task: task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @app.put("/api/tasks/{task_id}") async def update_task_endpoint(task_id: str, task: Task) -> Task: task.id = task_id saved_task = save_task(task) schedule_task(saved_task) return saved_task @app.delete("/api/tasks/{task_id}") async def delete_task_endpoint(task_id: str): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) conn.commit() conn.close() try: if scheduler.get_job(task_id): scheduler.remove_job(task_id) except Exception: pass return {"status": "deleted"} @app.post("/api/tasks/{task_id}/run") async def run_task_manual(task_id: str, background_tasks: BackgroundTasks): task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") run_id = str(uuid.uuid4()) update_task_status(task_id, "running") background_tasks.add_task(run_claude_task, task, run_id) return {"run_id": run_id, "status": "started"} @app.get("/api/tasks/{task_id}/runs") async def get_task_runs(task_id: str) -> List[Dict]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50", (task_id,) ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] # ---- System Info ---- @app.get("/api/system/info") async def system_info() -> Dict: tasks = get_all_tasks() conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM task_runs") total_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'") completed_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'") failed_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'") running_runs = cursor.fetchone()[0] conn.close() return { "app_name": "Claude Persistent Agent", "version": "2.0.0", "uptime": datetime.now().isoformat(), "scheduler_running": scheduler.running, "task_count": len(tasks), "total_runs": total_runs, "completed_runs": completed_runs, "failed_runs": failed_runs, "running_runs": running_runs, "active_chat_sessions": len(_chat_sessions), } @app.get("/api/system/usage") async def usage_stats() -> Dict: usage = { "models_used": [], "session_count": 0, "last_reset": None, "next_reset": None, "note": "Usage data sourced from ~/.claude session cache" } try: claude_dir = Path("/root/.claude") sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl")) usage["session_count"] = len(sessions) now = datetime.now() next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1) reset = next_month.replace(hour=0, minute=0, second=0) usage["next_reset"] = reset.isoformat() usage["days_until_reset"] = (reset - now).days except Exception as e: usage["error"] = str(e) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs") row = cursor.fetchone() conn.close() usage["claude_runs_total"] = row[0] usage["first_run"] = row[1] usage["last_run"] = row[2] return usage # ---- Auth ---- @app.get("/api/auth/status") async def auth_status(): account = None auth_method = None status = "logged_out" has_saved_token = TOKEN_FILE.exists() token_type = None if has_saved_token: try: saved = json.loads(TOKEN_FILE.read_text()) token_type = saved.get("type") except Exception: pass try: env = _get_claude_env() proc = await asyncio.create_subprocess_exec( "claude", "auth", "status", "--json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, env=env ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace").strip() data = json.loads(output) if data.get("loggedIn"): status = "logged_in" account = data.get("email") or data.get("account", {}).get("emailAddress") auth_method = data.get("authMethod") except Exception as e: logger.error(f"Auth status check failed: {e}") return { "status": status, "account": account, "auth_method": auth_method, "has_saved_token": has_saved_token, "token_type": token_type } @app.post("/api/auth/token") async def auth_set_token(payload: TokenSubmit): token = payload.token.strip() token_type = payload.token_type if not token: return {"status": "error", "message": "Token cannot be empty"} # Auto-detect token type from prefix if token.startswith("sk-ant-oat"): token_type = "oauth_token" elif token.startswith("sk-ant-api"): token_type = "api_key" else: return { "status": "error", "message": f"Invalid token format. Expected 'sk-ant-oat01-...' (setup token) or 'sk-ant-api03-...' (API key). Got: '{token[:12]}...'" } # Set env vars BEFORE saving so we can test if token_type == "oauth_token": os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token os.environ.pop("ANTHROPIC_API_KEY", None) elif token_type == "api_key": os.environ["ANTHROPIC_API_KEY"] = token os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) # Actually verify by making a real API call (not just checking auth status which only looks at env vars) try: env = _get_claude_env() proc = await asyncio.create_subprocess_exec( "claude", "-p", "--output-format", "json", "--no-session-persistence", "--max-budget-usd", "0.01", "Reply with just the word VERIFIED", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, env=env ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) output = stdout.decode(errors="replace").strip() try: data = json.loads(output) result = data.get("result", "") is_error = data.get("is_error", False) if is_error: # Token was rejected — don't save it os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) os.environ.pop("ANTHROPIC_API_KEY", None) return { "status": "error", "message": f"Token rejected by Claude API: {result}" } else: # Token works! Save it. TOKEN_FILE.write_text(json.dumps({ "type": token_type, "token": token, "saved_at": datetime.now().isoformat() })) return { "status": "logged_in", "message": "Token verified and saved! Claude is ready.", "auth_method": token_type } except json.JSONDecodeError: # Couldn't parse JSON but got some output if proc.returncode == 0: TOKEN_FILE.write_text(json.dumps({ "type": token_type, "token": token, "saved_at": datetime.now().isoformat() })) return { "status": "logged_in", "message": "Token appears to work. Saved!", "auth_method": token_type } else: os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) os.environ.pop("ANTHROPIC_API_KEY", None) return { "status": "error", "message": f"Token verification failed: {output or stderr.decode(errors='replace')}" } except asyncio.TimeoutError: # If it timed out, it probably got through auth at least TOKEN_FILE.write_text(json.dumps({ "type": token_type, "token": token, "saved_at": datetime.now().isoformat() })) return {"status": "token_saved", "message": "Token saved but verification timed out. Try sending a chat message to confirm."} except Exception as e: os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) os.environ.pop("ANTHROPIC_API_KEY", None) return {"status": "error", "message": f"Token verification error: {str(e)}"} @app.post("/api/auth/logout") async def auth_logout(): if TOKEN_FILE.exists(): TOKEN_FILE.unlink() os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) os.environ.pop("ANTHROPIC_API_KEY", None) try: proc = await asyncio.create_subprocess_exec( "claude", "auth", "logout", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL ) await asyncio.wait_for(proc.communicate(), timeout=10) except Exception: pass return {"status": "logged_out"} # ---- Chat ---- @app.post("/api/chat/send") async def chat_send(msg: ChatMessage): """Send a message to Claude and get a response (non-streaming). Uses --output-format json to capture the Claude session_id for conversation continuity via --resume on subsequent messages. """ session_id = msg.session_id or str(uuid.uuid4()) # Save user message save_chat_message(session_id, "user", msg.message, model=msg.model) env = _get_claude_env() # Check if we have an existing Claude session to resume claude_sid = _get_claude_session_id(session_id) cmd = _build_claude_cmd( prompt=msg.message, model=msg.model, tools=msg.tools, system_prompt=msg.system_prompt, resume_session_id=claude_sid # None for first message, session_id for follow-ups ) # Insert --output-format json after "-p" so we can parse the session_id try: p_idx = cmd.index("-p") cmd.insert(p_idx + 1, "--output-format") cmd.insert(p_idx + 2, "json") except ValueError: pass try: logger.info(f"Chat send cmd: {' '.join(cmd[:6])}...") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, env=env ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120) raw_output = stdout.decode(errors="replace").strip() if stdout else "" error = stderr.decode(errors="replace").strip() if stderr else "" # Filter warnings from stderr error_lines = [l for l in error.split("\n") if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()] filtered_error = "\n".join(error_lines) # Try to parse JSON output for session_id and result output = raw_output try: data = json.loads(raw_output) output = data.get("result", raw_output) # Capture Claude's session ID for future --resume calls new_claude_sid = data.get("session_id") if new_claude_sid: _claude_session_map[session_id] = new_claude_sid logger.info(f"Mapped session {session_id[:8]}... -> Claude {new_claude_sid[:8]}...") # Check if Claude reported an error if data.get("is_error"): save_chat_message(session_id, "error", output, metadata={"claude_session_id": new_claude_sid}) return {"session_id": session_id, "response": output, "status": "error"} except (json.JSONDecodeError, TypeError): pass if process.returncode == 0 and output: new_claude_sid = _claude_session_map.get(session_id) save_chat_message(session_id, "assistant", output, model=msg.model, metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None) return { "session_id": session_id, "response": output, "status": "ok" } else: error_msg = filtered_error or output or "No response from Claude" save_chat_message(session_id, "error", error_msg) return { "session_id": session_id, "response": error_msg, "status": "error" } except asyncio.TimeoutError: save_chat_message(session_id, "error", "Response timeout (>120s)") return {"session_id": session_id, "response": "Response timeout (>120s)", "status": "error"} except Exception as e: save_chat_message(session_id, "error", str(e)) return {"session_id": session_id, "response": str(e), "status": "error"} @app.websocket("/api/chat/ws/{session_id}") async def chat_websocket(websocket: WebSocket, session_id: str): """WebSocket endpoint for streaming chat. Uses --output-format stream-json for streaming, and captures the Claude session_id from the final 'result' message for --resume on subsequent messages in the same chat session. """ await websocket.accept() try: while True: data = await websocket.receive_text() msg = json.loads(data) user_message = msg.get("message", "") model = msg.get("model") tools = msg.get("tools") system_prompt = msg.get("system_prompt") if not user_message: await websocket.send_json({"type": "error", "content": "Empty message"}) continue save_chat_message(session_id, "user", user_message, model=model) env = _get_claude_env() # Check for existing Claude session to resume claude_sid = _get_claude_session_id(session_id) cmd = _build_claude_cmd( prompt=user_message, model=model, tools=tools, system_prompt=system_prompt, resume_session_id=claude_sid ) # Use stream-json for streaming output try: p_idx = cmd.index("-p") cmd.insert(p_idx + 1, "--output-format") cmd.insert(p_idx + 2, "stream-json") except ValueError: pass await websocket.send_json({"type": "status", "content": "thinking"}) try: logger.info(f"Chat WS cmd: {' '.join(cmd[:6])}...") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, env=env ) _chat_sessions[session_id] = process # Stream stdout line by line (stream-json emits one JSON object per line) full_output = "" while True: try: line = await asyncio.wait_for( process.stdout.readline(), timeout=180 ) except asyncio.TimeoutError: await websocket.send_json({"type": "error", "content": "Response timeout (>180s)"}) break if not line: break text = line.decode(errors="replace").strip() if not text: continue # Try to parse stream-json events try: event = json.loads(text) event_type = event.get("type", "") if event_type == "assistant": # Assistant message with content blocks content = event.get("message", {}).get("content", []) for block in content: if block.get("type") == "text": chunk = block.get("text", "") full_output += chunk await websocket.send_json({"type": "chunk", "content": chunk}) elif event_type == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": chunk = delta.get("text", "") full_output += chunk await websocket.send_json({"type": "chunk", "content": chunk}) elif event_type == "result": # Final result — capture session_id for --resume result_text = event.get("result", "") new_claude_sid = event.get("session_id") is_error = event.get("is_error", False) if new_claude_sid: _claude_session_map[session_id] = new_claude_sid logger.info(f"WS mapped {session_id[:8]}... -> Claude {new_claude_sid[:8]}...") if is_error: error_content = result_text or full_output or "Claude returned an error" save_chat_message(session_id, "error", error_content, metadata={"claude_session_id": new_claude_sid}) await websocket.send_json({"type": "error", "content": error_content}) elif result_text and not full_output: # If we got no streaming chunks but have a result full_output = result_text await websocket.send_json({"type": "chunk", "content": result_text}) else: # Unknown event type — might contain text content pass except json.JSONDecodeError: # Not JSON — treat as raw text output full_output += text + "\n" await websocket.send_json({"type": "chunk", "content": text + "\n"}) await process.wait() stderr_data = await process.stderr.read() stderr_text = stderr_data.decode(errors="replace") if stderr_data else "" if process.returncode == 0 and full_output.strip(): new_claude_sid = _claude_session_map.get(session_id) save_chat_message(session_id, "assistant", full_output.strip(), model=model, metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None) await websocket.send_json({ "type": "done", "content": full_output.strip() }) elif not full_output.strip(): # No output at all error_lines = [l for l in stderr_text.split("\n") if l.strip() and "stdin" not in l.lower()] clean_error = "\n".join(error_lines) or "No response from Claude" save_chat_message(session_id, "error", clean_error) await websocket.send_json({"type": "error", "content": clean_error}) else: # Had output but non-zero return code — still send done await websocket.send_json({ "type": "done", "content": full_output.strip() }) except Exception as e: logger.error(f"Chat WS error: {e}") await websocket.send_json({"type": "error", "content": str(e)}) finally: _chat_sessions.pop(session_id, None) except WebSocketDisconnect: logger.info(f"Chat WebSocket disconnected: {session_id}") proc = _chat_sessions.pop(session_id, None) if proc and proc.returncode is None: try: proc.kill() except Exception: pass @app.get("/api/chat/sessions") async def get_chat_sessions(): """List all chat sessions""" return list_chat_sessions() @app.get("/api/chat/history/{session_id}") async def get_chat_session_history(session_id: str, limit: int = 50): """Get chat history for a session""" return get_chat_history(session_id, limit) @app.delete("/api/chat/sessions/{session_id}") async def delete_chat_session(session_id: str): """Delete a chat session""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM chat_messages WHERE session_id = ?", (session_id,)) conn.commit() conn.close() return {"status": "deleted"} # ---- MCP Servers ---- @app.get("/api/mcp/servers") async def list_mcp_servers(): try: proc = await asyncio.create_subprocess_exec( "claude", "mcp", "list", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace").strip() servers = [] if "No MCP servers configured" in output: return {"servers": [], "raw": output} try: data = json.loads(output) if isinstance(data, list): servers = data elif isinstance(data, dict): servers = list(data.values()) if data else [] except Exception: for line in output.split("\n"): line = line.strip() if line and not line.startswith("-") and not line.startswith("Name"): parts = line.split() if len(parts) >= 1: servers.append({"name": parts[0], "details": " ".join(parts[1:])}) return {"servers": servers, "raw": output} except Exception as e: return {"servers": [], "error": str(e)} @app.post("/api/mcp/servers") async def add_mcp_server(server: McpServerAdd): try: cmd = ["claude", "mcp", "add", server.name] if server.server_type == "sse" and server.url: cmd.extend(["--transport", "sse", server.url]) elif server.server_type == "stdio" and server.command: cmd.extend(["--transport", "stdio", server.command]) if server.args: cmd.extend(server.args) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) output = stdout.decode(errors="replace") + stderr.decode(errors="replace") return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip(), "name": server.name} except Exception as e: return {"status": "error", "message": str(e)} @app.delete("/api/mcp/servers/{name}") async def remove_mcp_server(name: str): try: proc = await asyncio.create_subprocess_exec( "claude", "mcp", "remove", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace") + stderr.decode(errors="replace") return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()} except Exception as e: return {"status": "error", "message": str(e)} # Serve static frontend — MUST be last app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)