diff --git a/backend/main.py b/backend/main.py index ffd26b4..e69de29 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,1191 +0,0 @@ -""" -Claude Persistent Agent - FastAPI Backend -Main application with task scheduling, live chat, and agent orchestration -""" - -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks -from fastapi.staticfiles import StaticFiles -from fastapi.responses import FileResponse, StreamingResponse -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -from typing import Optional, List, Dict, Any -import asyncio -import json -import sqlite3 -import subprocess -import logging -from datetime import datetime, timedelta -import uuid -from pathlib import Path -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.cron import CronTrigger -import os - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Initialize FastAPI app -app = FastAPI(title="Claude Persistent Agent", version="2.0.0") - -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Database setup -DB_PATH = Path("/app/data/tasks.db") -LOGS_PATH = Path("/app/logs") -TASKS_PATH = Path("/app/tasks") -TOKEN_FILE = Path("/app/data/.claude_token") -CHAT_HISTORY_PATH = Path("/app/data/chat_history.json") - -# Ensure directories exist -DB_PATH.parent.mkdir(parents=True, exist_ok=True) -LOGS_PATH.mkdir(parents=True, exist_ok=True) -TASKS_PATH.mkdir(parents=True, exist_ok=True) - - -# ============ Models ============ - -class Task(BaseModel): - id: Optional[str] = None - name: str - description: Optional[str] = None - prompt: str - schedule_type: str = "manual" # "once", "recurring", "manual" - schedule_value: Optional[str] = None - enabled: bool = True - created_at: Optional[str] = None - last_run: Optional[str] = None - next_run: Optional[str] = None - status: str = "idle" - # Agent orchestration fields - agent_model: Optional[str] = None # "sonnet", "opus", "haiku" - agent_tools: Optional[str] = None # comma-separated: "Bash,Read,Write,Edit" - agent_mcp_servers: Optional[str] = None # comma-separated MCP server names - agent_system_prompt: Optional[str] = None # custom system prompt - agent_max_turns: Optional[int] = None # max conversation turns - agent_permission_mode: str = "auto" # "auto", "acceptEdits", "plan" - agent_timeout: int = 300 # seconds - - -class TaskRun(BaseModel): - task_id: str - run_id: str - status: str - output: Optional[str] = None - error: Optional[str] = None - started_at: str - completed_at: Optional[str] = None - - -class ChatMessage(BaseModel): - message: str - model: Optional[str] = None - system_prompt: Optional[str] = None - tools: Optional[str] = None - session_id: Optional[str] = None - - -class TokenSubmit(BaseModel): - token: str - token_type: str = "oauth_token" - - -class McpServerAdd(BaseModel): - name: str - server_type: str = "sse" - url: Optional[str] = None - command: Optional[str] = None - args: Optional[List[str]] = None - - -# ============ Database ============ - -def init_db(): - """Initialize SQLite database""" - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - - cursor.execute(""" - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - description TEXT, - prompt TEXT NOT NULL, - schedule_type TEXT NOT NULL, - schedule_value TEXT, - enabled BOOLEAN DEFAULT 1, - created_at TEXT, - last_run TEXT, - next_run TEXT, - status TEXT DEFAULT 'idle', - agent_model TEXT, - agent_tools TEXT, - agent_mcp_servers TEXT, - agent_system_prompt TEXT, - agent_max_turns INTEGER, - agent_permission_mode TEXT DEFAULT 'auto', - agent_timeout INTEGER DEFAULT 300 - ) - """) - - cursor.execute(""" - CREATE TABLE IF NOT EXISTS task_runs ( - run_id TEXT PRIMARY KEY, - task_id TEXT NOT NULL, - status TEXT NOT NULL, - output TEXT, - error TEXT, - started_at TEXT, - completed_at TEXT, - FOREIGN KEY (task_id) REFERENCES tasks(id) - ) - """) - - cursor.execute(""" - CREATE TABLE IF NOT EXISTS chat_messages ( - id TEXT PRIMARY KEY, - session_id TEXT NOT NULL, - role TEXT NOT NULL, - content TEXT NOT NULL, - timestamp TEXT NOT NULL, - model TEXT, - metadata TEXT - ) - """) - - # Migration: add new columns if they don't exist - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_model TEXT") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_tools TEXT") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_mcp_servers TEXT") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_system_prompt TEXT") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_max_turns INTEGER") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_permission_mode TEXT DEFAULT 'auto'") - except sqlite3.OperationalError: - pass - try: - cursor.execute("ALTER TABLE tasks ADD COLUMN agent_timeout INTEGER DEFAULT 300") - except sqlite3.OperationalError: - pass - - conn.commit() - conn.close() - - -# ============ Auth Helpers ============ - -def _get_claude_env(): - """Get environment with auth tokens set for Claude CLI""" - env = os.environ.copy() - if TOKEN_FILE.exists(): - try: - saved = json.loads(TOKEN_FILE.read_text()) - if saved.get("type") == "oauth_token" and saved.get("token"): - env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] - elif saved.get("type") == "api_key" and saved.get("token"): - env["ANTHROPIC_API_KEY"] = saved["token"] - except Exception as e: - logger.error(f"Failed to load saved token: {e}") - return env - - -def _build_claude_cmd(prompt: str, model: str = None, tools: str = None, - system_prompt: str = None, permission_mode: str = "auto", - resume_session_id: str = None) -> list: - """Build a claude CLI command with agent configuration. - - Note: --session-id requires a valid UUID and creates a persistent session. - For chat continuity, use resume_session_id with --resume. - For one-shot tasks, omit resume_session_id. - """ - cmd = ["claude", "-p"] - - if model: - cmd.extend(["--model", model]) - - if tools: - cmd.extend(["--allowedTools", tools]) - else: - cmd.extend(["--allowedTools", "Bash,Read,Write,Edit"]) - - if permission_mode: - cmd.extend(["--permission-mode", permission_mode]) - - if system_prompt: - cmd.extend(["--system-prompt", system_prompt]) - - # Resume an existing Claude session for multi-turn chat - if resume_session_id: - cmd.extend(["--resume", resume_session_id]) - - cmd.append(prompt) - return cmd - - -# ============ Task Execution ============ - -async def run_claude_task(task: Task, run_id: str): - """Execute a Claude Code task as an agent""" - started = datetime.now().isoformat() - try: - env = _get_claude_env() - - cmd = _build_claude_cmd( - prompt=task.prompt, - model=task.agent_model, - tools=task.agent_tools, - system_prompt=task.agent_system_prompt, - permission_mode=task.agent_permission_mode - ) - - timeout = task.agent_timeout or 300 - - logger.info(f"Running task {task.name}: {' '.join(cmd)}") - - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL, # Fix stdin warning - env=env - ) - - stdout, stderr = await asyncio.wait_for( - process.communicate(), timeout=timeout - ) - - output = stdout.decode(errors="replace") if stdout else "" - error = stderr.decode(errors="replace") if stderr else "" - - # Filter out non-critical warnings from stderr - error_lines = [l for l in error.split("\n") - if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()] - filtered_error = "\n".join(error_lines) - - status = "completed" if process.returncode == 0 else "failed" - - save_task_run(TaskRun( - task_id=task.id, run_id=run_id, status=status, - output=output, - error=filtered_error if status == "failed" else None, - started_at=started, completed_at=datetime.now().isoformat() - )) - update_task_status(task.id, status) - - except asyncio.TimeoutError: - save_task_run(TaskRun( - task_id=task.id, run_id=run_id, status="failed", - error=f"Task timeout (>{task.agent_timeout or 300}s)", - started_at=started, completed_at=datetime.now().isoformat() - )) - update_task_status(task.id, "failed") - except Exception as e: - save_task_run(TaskRun( - task_id=task.id, run_id=run_id, status="failed", - error=str(e), - started_at=started, completed_at=datetime.now().isoformat() - )) - update_task_status(task.id, "failed") - - -# ============ DB Operations ============ - -def save_task(task: Task): - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - if not task.id: - task.id = str(uuid.uuid4()) - if not task.created_at: - task.created_at = datetime.now().isoformat() - - cursor.execute(""" - INSERT OR REPLACE INTO tasks - (id, name, description, prompt, schedule_type, schedule_value, enabled, - created_at, status, agent_model, agent_tools, agent_mcp_servers, - agent_system_prompt, agent_max_turns, agent_permission_mode, agent_timeout) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - task.id, task.name, task.description, task.prompt, - task.schedule_type, task.schedule_value, task.enabled, - task.created_at, task.status, - task.agent_model, task.agent_tools, task.agent_mcp_servers, - task.agent_system_prompt, task.agent_max_turns, - task.agent_permission_mode, task.agent_timeout - )) - conn.commit() - conn.close() - return task - - -def save_task_run(run: TaskRun): - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO task_runs - (run_id, task_id, status, output, error, started_at, completed_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, (run.run_id, run.task_id, run.status, run.output, - run.error, run.started_at, run.completed_at)) - conn.commit() - conn.close() - - -def update_task_status(task_id: str, status: str): - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?", - (status, datetime.now().isoformat(), task_id)) - conn.commit() - conn.close() - - -def get_task(task_id: str) -> Optional[Task]: - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) - row = cursor.fetchone() - conn.close() - if row: - return Task(**dict(row)) - return None - - -def get_all_tasks() -> List[Task]: - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") - rows = cursor.fetchall() - conn.close() - return [Task(**dict(row)) for row in rows] - - -# ============ Scheduler ============ - -scheduler = BackgroundScheduler() - - -def schedule_task(task: Task): - if not task.enabled or task.schedule_type == "manual": - return - if task.schedule_type == "recurring" and task.schedule_value: - try: - scheduler.add_job( - _sync_run_task, CronTrigger.from_crontab(task.schedule_value), - id=task.id, args=[task], replace_existing=True - ) - logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}") - except Exception as e: - logger.error(f"Failed to schedule task {task.name}: {e}") - - -def _sync_run_task(task: Task): - """Sync wrapper for scheduled tasks (APScheduler doesn't support async natively)""" - loop = asyncio.new_event_loop() - run_id = str(uuid.uuid4()) - update_task_status(task.id, "running") - loop.run_until_complete(run_claude_task(task, run_id)) - loop.close() - - -# ============ Chat Sessions ============ - -# Active chat sessions: our_session_id -> process -_chat_sessions: Dict[str, asyncio.subprocess.Process] = {} -# Mapping: our_session_id -> claude_session_id (for --resume) -_claude_session_map: Dict[str, str] = {} - - -def save_chat_message(session_id: str, role: str, content: str, model: str = None, metadata: dict = None): - """Save a chat message to the database""" - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO chat_messages (id, session_id, role, content, timestamp, model, metadata) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, (str(uuid.uuid4()), session_id, role, content, - datetime.now().isoformat(), model, json.dumps(metadata) if metadata else None)) - conn.commit() - conn.close() - - -def _get_claude_session_id(our_session_id: str) -> Optional[str]: - """Get the Claude CLI session ID mapped to our session ID""" - if our_session_id in _claude_session_map: - return _claude_session_map[our_session_id] - # Also check DB metadata - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - SELECT metadata FROM chat_messages - WHERE session_id = ? AND metadata IS NOT NULL - ORDER BY timestamp DESC LIMIT 1 - """, (our_session_id,)) - row = cursor.fetchone() - conn.close() - if row and row[0]: - try: - meta = json.loads(row[0]) - claude_sid = meta.get("claude_session_id") - if claude_sid: - _claude_session_map[our_session_id] = claude_sid - return claude_sid - except Exception: - pass - return None - - -def get_chat_history(session_id: str, limit: int = 50) -> List[Dict]: - """Get chat history for a session""" - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute(""" - SELECT * FROM chat_messages WHERE session_id = ? - ORDER BY timestamp ASC LIMIT ? - """, (session_id, limit)) - rows = cursor.fetchall() - conn.close() - return [dict(row) for row in rows] - - -def list_chat_sessions() -> List[Dict]: - """List all chat sessions with latest message""" - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute(""" - SELECT session_id, - COUNT(*) as message_count, - MIN(timestamp) as started_at, - MAX(timestamp) as last_message, - (SELECT content FROM chat_messages cm2 - WHERE cm2.session_id = cm.session_id AND cm2.role = 'user' - ORDER BY timestamp ASC LIMIT 1) as first_message - FROM chat_messages cm - GROUP BY session_id - ORDER BY MAX(timestamp) DESC - LIMIT 20 - """) - rows = cursor.fetchall() - conn.close() - return [dict(row) for row in rows] - - -# ============ API Routes ============ - -@app.on_event("startup") -async def startup(): - init_db() - scheduler.start() - - if TOKEN_FILE.exists(): - try: - saved = json.loads(TOKEN_FILE.read_text()) - if saved.get("type") == "oauth_token" and saved.get("token"): - os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] - logger.info("Loaded saved OAuth token") - elif saved.get("type") == "api_key" and saved.get("token"): - os.environ["ANTHROPIC_API_KEY"] = saved["token"] - logger.info("Loaded saved API key") - except Exception as e: - logger.error(f"Failed to load saved token: {e}") - - for task in get_all_tasks(): - if task.enabled: - schedule_task(task) - - logger.info("Claude Persistent Agent v2.0 started") - - -@app.on_event("shutdown") -async def shutdown(): - scheduler.shutdown() - # Kill any active chat sessions - for sid, proc in _chat_sessions.items(): - try: - proc.kill() - except Exception: - pass - - -@app.get("/health") -async def health(): - return {"status": "healthy", "timestamp": datetime.now().isoformat(), "version": "2.0.0"} - - -# ---- Task CRUD ---- - -@app.post("/api/tasks") -async def create_task(task: Task) -> Task: - saved_task = save_task(task) - schedule_task(saved_task) - return saved_task - - -@app.get("/api/tasks") -async def list_tasks() -> List[Task]: - return get_all_tasks() - - -@app.get("/api/tasks/{task_id}") -async def get_task_endpoint(task_id: str) -> Task: - task = get_task(task_id) - if not task: - raise HTTPException(status_code=404, detail="Task not found") - return task - - -@app.put("/api/tasks/{task_id}") -async def update_task_endpoint(task_id: str, task: Task) -> Task: - task.id = task_id - saved_task = save_task(task) - schedule_task(saved_task) - return saved_task - - -@app.delete("/api/tasks/{task_id}") -async def delete_task_endpoint(task_id: str): - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) - conn.commit() - conn.close() - try: - if scheduler.get_job(task_id): - scheduler.remove_job(task_id) - except Exception: - pass - return {"status": "deleted"} - - -@app.post("/api/tasks/{task_id}/run") -async def run_task_manual(task_id: str, background_tasks: BackgroundTasks): - task = get_task(task_id) - if not task: - raise HTTPException(status_code=404, detail="Task not found") - run_id = str(uuid.uuid4()) - update_task_status(task_id, "running") - background_tasks.add_task(run_claude_task, task, run_id) - return {"run_id": run_id, "status": "started"} - - -@app.get("/api/tasks/{task_id}/runs") -async def get_task_runs(task_id: str) -> List[Dict]: - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50", - (task_id,) - ) - rows = cursor.fetchall() - conn.close() - return [dict(row) for row in rows] - - -# ---- System Info ---- - -@app.get("/api/system/info") -async def system_info() -> Dict: - tasks = get_all_tasks() - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM task_runs") - total_runs = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'") - completed_runs = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'") - failed_runs = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'") - running_runs = cursor.fetchone()[0] - conn.close() - return { - "app_name": "Claude Persistent Agent", - "version": "2.0.0", - "uptime": datetime.now().isoformat(), - "scheduler_running": scheduler.running, - "task_count": len(tasks), - "total_runs": total_runs, - "completed_runs": completed_runs, - "failed_runs": failed_runs, - "running_runs": running_runs, - "active_chat_sessions": len(_chat_sessions), - } - - -@app.get("/api/system/usage") -async def usage_stats() -> Dict: - usage = { - "models_used": [], "session_count": 0, - "last_reset": None, "next_reset": None, - "note": "Usage data sourced from ~/.claude session cache" - } - try: - claude_dir = Path("/root/.claude") - sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl")) - usage["session_count"] = len(sessions) - now = datetime.now() - next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1) - reset = next_month.replace(hour=0, minute=0, second=0) - usage["next_reset"] = reset.isoformat() - usage["days_until_reset"] = (reset - now).days - except Exception as e: - usage["error"] = str(e) - - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs") - row = cursor.fetchone() - conn.close() - usage["claude_runs_total"] = row[0] - usage["first_run"] = row[1] - usage["last_run"] = row[2] - return usage - - -# ---- Auth ---- - -@app.get("/api/auth/status") -async def auth_status(): - account = None - auth_method = None - status = "logged_out" - has_saved_token = TOKEN_FILE.exists() - token_type = None - - if has_saved_token: - try: - saved = json.loads(TOKEN_FILE.read_text()) - token_type = saved.get("type") - except Exception: - pass - - try: - env = _get_claude_env() - proc = await asyncio.create_subprocess_exec( - "claude", "auth", "status", "--json", - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL, - env=env - ) - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) - output = stdout.decode(errors="replace").strip() - data = json.loads(output) - if data.get("loggedIn"): - status = "logged_in" - account = data.get("email") or data.get("account", {}).get("emailAddress") - auth_method = data.get("authMethod") - except Exception as e: - logger.error(f"Auth status check failed: {e}") - - return { - "status": status, "account": account, "auth_method": auth_method, - "has_saved_token": has_saved_token, "token_type": token_type - } - - -@app.post("/api/auth/token") -async def auth_set_token(payload: TokenSubmit): - token = payload.token.strip() - token_type = payload.token_type - - if not token: - return {"status": "error", "message": "Token cannot be empty"} - - # Auto-detect token type from prefix - if token.startswith("sk-ant-oat"): - token_type = "oauth_token" - elif token.startswith("sk-ant-api"): - token_type = "api_key" - else: - return { - "status": "error", - "message": f"Invalid token format. Expected 'sk-ant-oat01-...' (setup token) or 'sk-ant-api03-...' (API key). Got: '{token[:12]}...'" - } - - # Set env vars BEFORE saving so we can test - if token_type == "oauth_token": - os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token - os.environ.pop("ANTHROPIC_API_KEY", None) - elif token_type == "api_key": - os.environ["ANTHROPIC_API_KEY"] = token - os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) - - # Actually verify by making a real API call (not just checking auth status which only looks at env vars) - try: - env = _get_claude_env() - proc = await asyncio.create_subprocess_exec( - "claude", "-p", "--output-format", "json", "--no-session-persistence", - "--max-budget-usd", "0.01", - "Reply with just the word VERIFIED", - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL, env=env - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) - output = stdout.decode(errors="replace").strip() - - try: - data = json.loads(output) - result = data.get("result", "") - is_error = data.get("is_error", False) - - if is_error: - # Token was rejected — don't save it - os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) - os.environ.pop("ANTHROPIC_API_KEY", None) - return { - "status": "error", - "message": f"Token rejected by Claude API: {result}" - } - else: - # Token works! Save it. - TOKEN_FILE.write_text(json.dumps({ - "type": token_type, "token": token, - "saved_at": datetime.now().isoformat() - })) - return { - "status": "logged_in", - "message": "Token verified and saved! Claude is ready.", - "auth_method": token_type - } - except json.JSONDecodeError: - # Couldn't parse JSON but got some output - if proc.returncode == 0: - TOKEN_FILE.write_text(json.dumps({ - "type": token_type, "token": token, - "saved_at": datetime.now().isoformat() - })) - return { - "status": "logged_in", - "message": "Token appears to work. Saved!", - "auth_method": token_type - } - else: - os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) - os.environ.pop("ANTHROPIC_API_KEY", None) - return { - "status": "error", - "message": f"Token verification failed: {output or stderr.decode(errors='replace')}" - } - - except asyncio.TimeoutError: - # If it timed out, it probably got through auth at least - TOKEN_FILE.write_text(json.dumps({ - "type": token_type, "token": token, - "saved_at": datetime.now().isoformat() - })) - return {"status": "token_saved", "message": "Token saved but verification timed out. Try sending a chat message to confirm."} - except Exception as e: - os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) - os.environ.pop("ANTHROPIC_API_KEY", None) - return {"status": "error", "message": f"Token verification error: {str(e)}"} - - -@app.post("/api/auth/logout") -async def auth_logout(): - if TOKEN_FILE.exists(): - TOKEN_FILE.unlink() - os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) - os.environ.pop("ANTHROPIC_API_KEY", None) - try: - proc = await asyncio.create_subprocess_exec( - "claude", "auth", "logout", - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL - ) - await asyncio.wait_for(proc.communicate(), timeout=10) - except Exception: - pass - return {"status": "logged_out"} - - -# ---- Chat ---- - -@app.post("/api/chat/send") -async def chat_send(msg: ChatMessage): - """Send a message to Claude and get a response (non-streaming). - - Uses --output-format json to capture the Claude session_id for - conversation continuity via --resume on subsequent messages. - """ - session_id = msg.session_id or str(uuid.uuid4()) - - # Save user message - save_chat_message(session_id, "user", msg.message, model=msg.model) - - env = _get_claude_env() - - # Check if we have an existing Claude session to resume - claude_sid = _get_claude_session_id(session_id) - - cmd = _build_claude_cmd( - prompt=msg.message, - model=msg.model, - tools=msg.tools, - system_prompt=msg.system_prompt, - resume_session_id=claude_sid # None for first message, session_id for follow-ups - ) - - # Insert --output-format json after "-p" so we can parse the session_id - try: - p_idx = cmd.index("-p") - cmd.insert(p_idx + 1, "--output-format") - cmd.insert(p_idx + 2, "json") - except ValueError: - pass - - try: - logger.info(f"Chat send cmd: {' '.join(cmd[:6])}...") - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL, - env=env - ) - - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120) - - raw_output = stdout.decode(errors="replace").strip() if stdout else "" - error = stderr.decode(errors="replace").strip() if stderr else "" - - # Filter warnings from stderr - error_lines = [l for l in error.split("\n") - if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()] - filtered_error = "\n".join(error_lines) - - # Try to parse JSON output for session_id and result - output = raw_output - try: - data = json.loads(raw_output) - output = data.get("result", raw_output) - # Capture Claude's session ID for future --resume calls - new_claude_sid = data.get("session_id") - if new_claude_sid: - _claude_session_map[session_id] = new_claude_sid - logger.info(f"Mapped session {session_id[:8]}... -> Claude {new_claude_sid[:8]}...") - # Check if Claude reported an error - if data.get("is_error"): - save_chat_message(session_id, "error", output, metadata={"claude_session_id": new_claude_sid}) - return {"session_id": session_id, "response": output, "status": "error"} - except (json.JSONDecodeError, TypeError): - pass - - if process.returncode == 0 and output: - new_claude_sid = _claude_session_map.get(session_id) - save_chat_message(session_id, "assistant", output, model=msg.model, - metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None) - return { - "session_id": session_id, - "response": output, - "status": "ok" - } - else: - error_msg = filtered_error or output or "No response from Claude" - save_chat_message(session_id, "error", error_msg) - return { - "session_id": session_id, - "response": error_msg, - "status": "error" - } - - except asyncio.TimeoutError: - save_chat_message(session_id, "error", "Response timeout (>120s)") - return {"session_id": session_id, "response": "Response timeout (>120s)", "status": "error"} - except Exception as e: - save_chat_message(session_id, "error", str(e)) - return {"session_id": session_id, "response": str(e), "status": "error"} - - -@app.websocket("/api/chat/ws/{session_id}") -async def chat_websocket(websocket: WebSocket, session_id: str): - """WebSocket endpoint for streaming chat. - - Uses --output-format stream-json for streaming, and captures the - Claude session_id from the final 'result' message for --resume on - subsequent messages in the same chat session. - """ - await websocket.accept() - - try: - while True: - data = await websocket.receive_text() - msg = json.loads(data) - - user_message = msg.get("message", "") - model = msg.get("model") - tools = msg.get("tools") - system_prompt = msg.get("system_prompt") - - if not user_message: - await websocket.send_json({"type": "error", "content": "Empty message"}) - continue - - save_chat_message(session_id, "user", user_message, model=model) - - env = _get_claude_env() - - # Check for existing Claude session to resume - claude_sid = _get_claude_session_id(session_id) - - cmd = _build_claude_cmd( - prompt=user_message, - model=model, - tools=tools, - system_prompt=system_prompt, - resume_session_id=claude_sid - ) - - # Use stream-json for streaming output - try: - p_idx = cmd.index("-p") - cmd.insert(p_idx + 1, "--output-format") - cmd.insert(p_idx + 2, "stream-json") - except ValueError: - pass - - await websocket.send_json({"type": "status", "content": "thinking"}) - - try: - logger.info(f"Chat WS cmd: {' '.join(cmd[:6])}...") - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL, - env=env - ) - - _chat_sessions[session_id] = process - - # Stream stdout line by line (stream-json emits one JSON object per line) - full_output = "" - while True: - try: - line = await asyncio.wait_for( - process.stdout.readline(), timeout=180 - ) - except asyncio.TimeoutError: - await websocket.send_json({"type": "error", "content": "Response timeout (>180s)"}) - break - if not line: - break - text = line.decode(errors="replace").strip() - if not text: - continue - - # Try to parse stream-json events - try: - event = json.loads(text) - event_type = event.get("type", "") - - if event_type == "assistant": - # Assistant message with content blocks - content = event.get("message", {}).get("content", []) - for block in content: - if block.get("type") == "text": - chunk = block.get("text", "") - full_output += chunk - await websocket.send_json({"type": "chunk", "content": chunk}) - elif event_type == "content_block_delta": - delta = event.get("delta", {}) - if delta.get("type") == "text_delta": - chunk = delta.get("text", "") - full_output += chunk - await websocket.send_json({"type": "chunk", "content": chunk}) - elif event_type == "result": - # Final result — capture session_id for --resume - result_text = event.get("result", "") - new_claude_sid = event.get("session_id") - is_error = event.get("is_error", False) - - if new_claude_sid: - _claude_session_map[session_id] = new_claude_sid - logger.info(f"WS mapped {session_id[:8]}... -> Claude {new_claude_sid[:8]}...") - - if is_error: - error_content = result_text or full_output or "Claude returned an error" - save_chat_message(session_id, "error", error_content, - metadata={"claude_session_id": new_claude_sid}) - await websocket.send_json({"type": "error", "content": error_content}) - elif result_text and not full_output: - # If we got no streaming chunks but have a result - full_output = result_text - await websocket.send_json({"type": "chunk", "content": result_text}) - else: - # Unknown event type — might contain text content - pass - except json.JSONDecodeError: - # Not JSON — treat as raw text output - full_output += text + "\n" - await websocket.send_json({"type": "chunk", "content": text + "\n"}) - - await process.wait() - - stderr_data = await process.stderr.read() - stderr_text = stderr_data.decode(errors="replace") if stderr_data else "" - - if process.returncode == 0 and full_output.strip(): - new_claude_sid = _claude_session_map.get(session_id) - save_chat_message(session_id, "assistant", full_output.strip(), model=model, - metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None) - await websocket.send_json({ - "type": "done", - "content": full_output.strip() - }) - elif not full_output.strip(): - # No output at all - error_lines = [l for l in stderr_text.split("\n") - if l.strip() and "stdin" not in l.lower()] - clean_error = "\n".join(error_lines) or "No response from Claude" - save_chat_message(session_id, "error", clean_error) - await websocket.send_json({"type": "error", "content": clean_error}) - else: - # Had output but non-zero return code — still send done - await websocket.send_json({ - "type": "done", - "content": full_output.strip() - }) - - except Exception as e: - logger.error(f"Chat WS error: {e}") - await websocket.send_json({"type": "error", "content": str(e)}) - finally: - _chat_sessions.pop(session_id, None) - - except WebSocketDisconnect: - logger.info(f"Chat WebSocket disconnected: {session_id}") - proc = _chat_sessions.pop(session_id, None) - if proc and proc.returncode is None: - try: - proc.kill() - except Exception: - pass - - -@app.get("/api/chat/sessions") -async def get_chat_sessions(): - """List all chat sessions""" - return list_chat_sessions() - - -@app.get("/api/chat/history/{session_id}") -async def get_chat_session_history(session_id: str, limit: int = 50): - """Get chat history for a session""" - return get_chat_history(session_id, limit) - - -@app.delete("/api/chat/sessions/{session_id}") -async def delete_chat_session(session_id: str): - """Delete a chat session""" - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute("DELETE FROM chat_messages WHERE session_id = ?", (session_id,)) - conn.commit() - conn.close() - return {"status": "deleted"} - - -# ---- MCP Servers ---- - -@app.get("/api/mcp/servers") -async def list_mcp_servers(): - try: - proc = await asyncio.create_subprocess_exec( - "claude", "mcp", "list", - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) - output = stdout.decode(errors="replace").strip() - - servers = [] - if "No MCP servers configured" in output: - return {"servers": [], "raw": output} - try: - data = json.loads(output) - if isinstance(data, list): - servers = data - elif isinstance(data, dict): - servers = list(data.values()) if data else [] - except Exception: - for line in output.split("\n"): - line = line.strip() - if line and not line.startswith("-") and not line.startswith("Name"): - parts = line.split() - if len(parts) >= 1: - servers.append({"name": parts[0], "details": " ".join(parts[1:])}) - return {"servers": servers, "raw": output} - except Exception as e: - return {"servers": [], "error": str(e)} - - -@app.post("/api/mcp/servers") -async def add_mcp_server(server: McpServerAdd): - try: - cmd = ["claude", "mcp", "add", server.name] - if server.server_type == "sse" and server.url: - cmd.extend(["--transport", "sse", server.url]) - elif server.server_type == "stdio" and server.command: - cmd.extend(["--transport", "stdio", server.command]) - if server.args: - cmd.extend(server.args) - - proc = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) - output = stdout.decode(errors="replace") + stderr.decode(errors="replace") - return {"status": "ok" if proc.returncode == 0 else "error", - "message": output.strip(), "name": server.name} - except Exception as e: - return {"status": "error", "message": str(e)} - - -@app.delete("/api/mcp/servers/{name}") -async def remove_mcp_server(name: str): - try: - proc = await asyncio.create_subprocess_exec( - "claude", "mcp", "remove", name, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.DEVNULL - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) - output = stdout.decode(errors="replace") + stderr.decode(errors="replace") - return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()} - except Exception as e: - return {"status": "error", "message": str(e)} - - -# Serve static frontend — MUST be last -app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static") - - -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000)