This repository has been archived on 2026-04-05. You can view files and clone it, but cannot push or open issues or pull requests.
claude-persistent-agent/backend/main.py

1191 lines
41 KiB
Python

"""
Claude Persistent Agent - FastAPI Backend
Main application with task scheduling, live chat, and agent orchestration
"""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import asyncio
import json
import sqlite3
import subprocess
import logging
from datetime import datetime, timedelta
import uuid
from pathlib import Path
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import os
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Claude Persistent Agent", version="2.0.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database setup
DB_PATH = Path("/app/data/tasks.db")
LOGS_PATH = Path("/app/logs")
TASKS_PATH = Path("/app/tasks")
TOKEN_FILE = Path("/app/data/.claude_token")
CHAT_HISTORY_PATH = Path("/app/data/chat_history.json")
# Ensure directories exist
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
LOGS_PATH.mkdir(parents=True, exist_ok=True)
TASKS_PATH.mkdir(parents=True, exist_ok=True)
# ============ Models ============
class Task(BaseModel):
id: Optional[str] = None
name: str
description: Optional[str] = None
prompt: str
schedule_type: str = "manual" # "once", "recurring", "manual"
schedule_value: Optional[str] = None
enabled: bool = True
created_at: Optional[str] = None
last_run: Optional[str] = None
next_run: Optional[str] = None
status: str = "idle"
# Agent orchestration fields
agent_model: Optional[str] = None # "sonnet", "opus", "haiku"
agent_tools: Optional[str] = None # comma-separated: "Bash,Read,Write,Edit"
agent_mcp_servers: Optional[str] = None # comma-separated MCP server names
agent_system_prompt: Optional[str] = None # custom system prompt
agent_max_turns: Optional[int] = None # max conversation turns
agent_permission_mode: str = "auto" # "auto", "acceptEdits", "plan"
agent_timeout: int = 300 # seconds
class TaskRun(BaseModel):
task_id: str
run_id: str
status: str
output: Optional[str] = None
error: Optional[str] = None
started_at: str
completed_at: Optional[str] = None
class ChatMessage(BaseModel):
message: str
model: Optional[str] = None
system_prompt: Optional[str] = None
tools: Optional[str] = None
session_id: Optional[str] = None
class TokenSubmit(BaseModel):
token: str
token_type: str = "oauth_token"
class McpServerAdd(BaseModel):
name: str
server_type: str = "sse"
url: Optional[str] = None
command: Optional[str] = None
args: Optional[List[str]] = None
# ============ Database ============
def init_db():
"""Initialize SQLite database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
prompt TEXT NOT NULL,
schedule_type TEXT NOT NULL,
schedule_value TEXT,
enabled BOOLEAN DEFAULT 1,
created_at TEXT,
last_run TEXT,
next_run TEXT,
status TEXT DEFAULT 'idle',
agent_model TEXT,
agent_tools TEXT,
agent_mcp_servers TEXT,
agent_system_prompt TEXT,
agent_max_turns INTEGER,
agent_permission_mode TEXT DEFAULT 'auto',
agent_timeout INTEGER DEFAULT 300
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_runs (
run_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
status TEXT NOT NULL,
output TEXT,
error TEXT,
started_at TEXT,
completed_at TEXT,
FOREIGN KEY (task_id) REFERENCES tasks(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
model TEXT,
metadata TEXT
)
""")
# Migration: add new columns if they don't exist
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_model TEXT")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_tools TEXT")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_mcp_servers TEXT")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_system_prompt TEXT")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_max_turns INTEGER")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_permission_mode TEXT DEFAULT 'auto'")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_timeout INTEGER DEFAULT 300")
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
# ============ Auth Helpers ============
def _get_claude_env():
"""Get environment with auth tokens set for Claude CLI"""
env = os.environ.copy()
if TOKEN_FILE.exists():
try:
saved = json.loads(TOKEN_FILE.read_text())
if saved.get("type") == "oauth_token" and saved.get("token"):
env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
elif saved.get("type") == "api_key" and saved.get("token"):
env["ANTHROPIC_API_KEY"] = saved["token"]
except Exception as e:
logger.error(f"Failed to load saved token: {e}")
return env
def _build_claude_cmd(prompt: str, model: str = None, tools: str = None,
system_prompt: str = None, permission_mode: str = "auto",
resume_session_id: str = None) -> list:
"""Build a claude CLI command with agent configuration.
Note: --session-id requires a valid UUID and creates a persistent session.
For chat continuity, use resume_session_id with --resume.
For one-shot tasks, omit resume_session_id.
"""
cmd = ["claude", "-p"]
if model:
cmd.extend(["--model", model])
if tools:
cmd.extend(["--allowedTools", tools])
else:
cmd.extend(["--allowedTools", "Bash,Read,Write,Edit"])
if permission_mode:
cmd.extend(["--permission-mode", permission_mode])
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
# Resume an existing Claude session for multi-turn chat
if resume_session_id:
cmd.extend(["--resume", resume_session_id])
cmd.append(prompt)
return cmd
# ============ Task Execution ============
async def run_claude_task(task: Task, run_id: str):
"""Execute a Claude Code task as an agent"""
started = datetime.now().isoformat()
try:
env = _get_claude_env()
cmd = _build_claude_cmd(
prompt=task.prompt,
model=task.agent_model,
tools=task.agent_tools,
system_prompt=task.agent_system_prompt,
permission_mode=task.agent_permission_mode
)
timeout = task.agent_timeout or 300
logger.info(f"Running task {task.name}: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL, # Fix stdin warning
env=env
)
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
output = stdout.decode(errors="replace") if stdout else ""
error = stderr.decode(errors="replace") if stderr else ""
# Filter out non-critical warnings from stderr
error_lines = [l for l in error.split("\n")
if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()]
filtered_error = "\n".join(error_lines)
status = "completed" if process.returncode == 0 else "failed"
save_task_run(TaskRun(
task_id=task.id, run_id=run_id, status=status,
output=output,
error=filtered_error if status == "failed" else None,
started_at=started, completed_at=datetime.now().isoformat()
))
update_task_status(task.id, status)
except asyncio.TimeoutError:
save_task_run(TaskRun(
task_id=task.id, run_id=run_id, status="failed",
error=f"Task timeout (>{task.agent_timeout or 300}s)",
started_at=started, completed_at=datetime.now().isoformat()
))
update_task_status(task.id, "failed")
except Exception as e:
save_task_run(TaskRun(
task_id=task.id, run_id=run_id, status="failed",
error=str(e),
started_at=started, completed_at=datetime.now().isoformat()
))
update_task_status(task.id, "failed")
# ============ DB Operations ============
def save_task(task: Task):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if not task.id:
task.id = str(uuid.uuid4())
if not task.created_at:
task.created_at = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO tasks
(id, name, description, prompt, schedule_type, schedule_value, enabled,
created_at, status, agent_model, agent_tools, agent_mcp_servers,
agent_system_prompt, agent_max_turns, agent_permission_mode, agent_timeout)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task.id, task.name, task.description, task.prompt,
task.schedule_type, task.schedule_value, task.enabled,
task.created_at, task.status,
task.agent_model, task.agent_tools, task.agent_mcp_servers,
task.agent_system_prompt, task.agent_max_turns,
task.agent_permission_mode, task.agent_timeout
))
conn.commit()
conn.close()
return task
def save_task_run(run: TaskRun):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO task_runs
(run_id, task_id, status, output, error, started_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (run.run_id, run.task_id, run.status, run.output,
run.error, run.started_at, run.completed_at))
conn.commit()
conn.close()
def update_task_status(task_id: str, status: str):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?",
(status, datetime.now().isoformat(), task_id))
conn.commit()
conn.close()
def get_task(task_id: str) -> Optional[Task]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
row = cursor.fetchone()
conn.close()
if row:
return Task(**dict(row))
return None
def get_all_tasks() -> List[Task]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [Task(**dict(row)) for row in rows]
# ============ Scheduler ============
scheduler = BackgroundScheduler()
def schedule_task(task: Task):
if not task.enabled or task.schedule_type == "manual":
return
if task.schedule_type == "recurring" and task.schedule_value:
try:
scheduler.add_job(
_sync_run_task, CronTrigger.from_crontab(task.schedule_value),
id=task.id, args=[task], replace_existing=True
)
logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}")
except Exception as e:
logger.error(f"Failed to schedule task {task.name}: {e}")
def _sync_run_task(task: Task):
"""Sync wrapper for scheduled tasks (APScheduler doesn't support async natively)"""
loop = asyncio.new_event_loop()
run_id = str(uuid.uuid4())
update_task_status(task.id, "running")
loop.run_until_complete(run_claude_task(task, run_id))
loop.close()
# ============ Chat Sessions ============
# Active chat sessions: our_session_id -> process
_chat_sessions: Dict[str, asyncio.subprocess.Process] = {}
# Mapping: our_session_id -> claude_session_id (for --resume)
_claude_session_map: Dict[str, str] = {}
def save_chat_message(session_id: str, role: str, content: str, model: str = None, metadata: dict = None):
"""Save a chat message to the database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO chat_messages (id, session_id, role, content, timestamp, model, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (str(uuid.uuid4()), session_id, role, content,
datetime.now().isoformat(), model, json.dumps(metadata) if metadata else None))
conn.commit()
conn.close()
def _get_claude_session_id(our_session_id: str) -> Optional[str]:
"""Get the Claude CLI session ID mapped to our session ID"""
if our_session_id in _claude_session_map:
return _claude_session_map[our_session_id]
# Also check DB metadata
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT metadata FROM chat_messages
WHERE session_id = ? AND metadata IS NOT NULL
ORDER BY timestamp DESC LIMIT 1
""", (our_session_id,))
row = cursor.fetchone()
conn.close()
if row and row[0]:
try:
meta = json.loads(row[0])
claude_sid = meta.get("claude_session_id")
if claude_sid:
_claude_session_map[our_session_id] = claude_sid
return claude_sid
except Exception:
pass
return None
def get_chat_history(session_id: str, limit: int = 50) -> List[Dict]:
"""Get chat history for a session"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM chat_messages WHERE session_id = ?
ORDER BY timestamp ASC LIMIT ?
""", (session_id, limit))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def list_chat_sessions() -> List[Dict]:
"""List all chat sessions with latest message"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT session_id,
COUNT(*) as message_count,
MIN(timestamp) as started_at,
MAX(timestamp) as last_message,
(SELECT content FROM chat_messages cm2
WHERE cm2.session_id = cm.session_id AND cm2.role = 'user'
ORDER BY timestamp ASC LIMIT 1) as first_message
FROM chat_messages cm
GROUP BY session_id
ORDER BY MAX(timestamp) DESC
LIMIT 20
""")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
# ============ API Routes ============
@app.on_event("startup")
async def startup():
init_db()
scheduler.start()
if TOKEN_FILE.exists():
try:
saved = json.loads(TOKEN_FILE.read_text())
if saved.get("type") == "oauth_token" and saved.get("token"):
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
logger.info("Loaded saved OAuth token")
elif saved.get("type") == "api_key" and saved.get("token"):
os.environ["ANTHROPIC_API_KEY"] = saved["token"]
logger.info("Loaded saved API key")
except Exception as e:
logger.error(f"Failed to load saved token: {e}")
for task in get_all_tasks():
if task.enabled:
schedule_task(task)
logger.info("Claude Persistent Agent v2.0 started")
@app.on_event("shutdown")
async def shutdown():
scheduler.shutdown()
# Kill any active chat sessions
for sid, proc in _chat_sessions.items():
try:
proc.kill()
except Exception:
pass
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.now().isoformat(), "version": "2.0.0"}
# ---- Task CRUD ----
@app.post("/api/tasks")
async def create_task(task: Task) -> Task:
saved_task = save_task(task)
schedule_task(saved_task)
return saved_task
@app.get("/api/tasks")
async def list_tasks() -> List[Task]:
return get_all_tasks()
@app.get("/api/tasks/{task_id}")
async def get_task_endpoint(task_id: str) -> Task:
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/api/tasks/{task_id}")
async def update_task_endpoint(task_id: str, task: Task) -> Task:
task.id = task_id
saved_task = save_task(task)
schedule_task(saved_task)
return saved_task
@app.delete("/api/tasks/{task_id}")
async def delete_task_endpoint(task_id: str):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
conn.commit()
conn.close()
try:
if scheduler.get_job(task_id):
scheduler.remove_job(task_id)
except Exception:
pass
return {"status": "deleted"}
@app.post("/api/tasks/{task_id}/run")
async def run_task_manual(task_id: str, background_tasks: BackgroundTasks):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
run_id = str(uuid.uuid4())
update_task_status(task_id, "running")
background_tasks.add_task(run_claude_task, task, run_id)
return {"run_id": run_id, "status": "started"}
@app.get("/api/tasks/{task_id}/runs")
async def get_task_runs(task_id: str) -> List[Dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50",
(task_id,)
)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
# ---- System Info ----
@app.get("/api/system/info")
async def system_info() -> Dict:
tasks = get_all_tasks()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM task_runs")
total_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'")
completed_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'")
failed_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'")
running_runs = cursor.fetchone()[0]
conn.close()
return {
"app_name": "Claude Persistent Agent",
"version": "2.0.0",
"uptime": datetime.now().isoformat(),
"scheduler_running": scheduler.running,
"task_count": len(tasks),
"total_runs": total_runs,
"completed_runs": completed_runs,
"failed_runs": failed_runs,
"running_runs": running_runs,
"active_chat_sessions": len(_chat_sessions),
}
@app.get("/api/system/usage")
async def usage_stats() -> Dict:
usage = {
"models_used": [], "session_count": 0,
"last_reset": None, "next_reset": None,
"note": "Usage data sourced from ~/.claude session cache"
}
try:
claude_dir = Path("/root/.claude")
sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl"))
usage["session_count"] = len(sessions)
now = datetime.now()
next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1)
reset = next_month.replace(hour=0, minute=0, second=0)
usage["next_reset"] = reset.isoformat()
usage["days_until_reset"] = (reset - now).days
except Exception as e:
usage["error"] = str(e)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs")
row = cursor.fetchone()
conn.close()
usage["claude_runs_total"] = row[0]
usage["first_run"] = row[1]
usage["last_run"] = row[2]
return usage
# ---- Auth ----
@app.get("/api/auth/status")
async def auth_status():
account = None
auth_method = None
status = "logged_out"
has_saved_token = TOKEN_FILE.exists()
token_type = None
if has_saved_token:
try:
saved = json.loads(TOKEN_FILE.read_text())
token_type = saved.get("type")
except Exception:
pass
try:
env = _get_claude_env()
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "status", "--json",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
env=env
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace").strip()
data = json.loads(output)
if data.get("loggedIn"):
status = "logged_in"
account = data.get("email") or data.get("account", {}).get("emailAddress")
auth_method = data.get("authMethod")
except Exception as e:
logger.error(f"Auth status check failed: {e}")
return {
"status": status, "account": account, "auth_method": auth_method,
"has_saved_token": has_saved_token, "token_type": token_type
}
@app.post("/api/auth/token")
async def auth_set_token(payload: TokenSubmit):
token = payload.token.strip()
token_type = payload.token_type
if not token:
return {"status": "error", "message": "Token cannot be empty"}
# Auto-detect token type from prefix
if token.startswith("sk-ant-oat"):
token_type = "oauth_token"
elif token.startswith("sk-ant-api"):
token_type = "api_key"
else:
return {
"status": "error",
"message": f"Invalid token format. Expected 'sk-ant-oat01-...' (setup token) or 'sk-ant-api03-...' (API key). Got: '{token[:12]}...'"
}
# Set env vars BEFORE saving so we can test
if token_type == "oauth_token":
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token
os.environ.pop("ANTHROPIC_API_KEY", None)
elif token_type == "api_key":
os.environ["ANTHROPIC_API_KEY"] = token
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
# Actually verify by making a real API call (not just checking auth status which only looks at env vars)
try:
env = _get_claude_env()
proc = await asyncio.create_subprocess_exec(
"claude", "-p", "--output-format", "json", "--no-session-persistence",
"--max-budget-usd", "0.01",
"Reply with just the word VERIFIED",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL, env=env
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode(errors="replace").strip()
try:
data = json.loads(output)
result = data.get("result", "")
is_error = data.get("is_error", False)
if is_error:
# Token was rejected — don't save it
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
os.environ.pop("ANTHROPIC_API_KEY", None)
return {
"status": "error",
"message": f"Token rejected by Claude API: {result}"
}
else:
# Token works! Save it.
TOKEN_FILE.write_text(json.dumps({
"type": token_type, "token": token,
"saved_at": datetime.now().isoformat()
}))
return {
"status": "logged_in",
"message": "Token verified and saved! Claude is ready.",
"auth_method": token_type
}
except json.JSONDecodeError:
# Couldn't parse JSON but got some output
if proc.returncode == 0:
TOKEN_FILE.write_text(json.dumps({
"type": token_type, "token": token,
"saved_at": datetime.now().isoformat()
}))
return {
"status": "logged_in",
"message": "Token appears to work. Saved!",
"auth_method": token_type
}
else:
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
os.environ.pop("ANTHROPIC_API_KEY", None)
return {
"status": "error",
"message": f"Token verification failed: {output or stderr.decode(errors='replace')}"
}
except asyncio.TimeoutError:
# If it timed out, it probably got through auth at least
TOKEN_FILE.write_text(json.dumps({
"type": token_type, "token": token,
"saved_at": datetime.now().isoformat()
}))
return {"status": "token_saved", "message": "Token saved but verification timed out. Try sending a chat message to confirm."}
except Exception as e:
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
os.environ.pop("ANTHROPIC_API_KEY", None)
return {"status": "error", "message": f"Token verification error: {str(e)}"}
@app.post("/api/auth/logout")
async def auth_logout():
if TOKEN_FILE.exists():
TOKEN_FILE.unlink()
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
os.environ.pop("ANTHROPIC_API_KEY", None)
try:
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "logout",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL
)
await asyncio.wait_for(proc.communicate(), timeout=10)
except Exception:
pass
return {"status": "logged_out"}
# ---- Chat ----
@app.post("/api/chat/send")
async def chat_send(msg: ChatMessage):
"""Send a message to Claude and get a response (non-streaming).
Uses --output-format json to capture the Claude session_id for
conversation continuity via --resume on subsequent messages.
"""
session_id = msg.session_id or str(uuid.uuid4())
# Save user message
save_chat_message(session_id, "user", msg.message, model=msg.model)
env = _get_claude_env()
# Check if we have an existing Claude session to resume
claude_sid = _get_claude_session_id(session_id)
cmd = _build_claude_cmd(
prompt=msg.message,
model=msg.model,
tools=msg.tools,
system_prompt=msg.system_prompt,
resume_session_id=claude_sid # None for first message, session_id for follow-ups
)
# Insert --output-format json after "-p" so we can parse the session_id
try:
p_idx = cmd.index("-p")
cmd.insert(p_idx + 1, "--output-format")
cmd.insert(p_idx + 2, "json")
except ValueError:
pass
try:
logger.info(f"Chat send cmd: {' '.join(cmd[:6])}...")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
env=env
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
raw_output = stdout.decode(errors="replace").strip() if stdout else ""
error = stderr.decode(errors="replace").strip() if stderr else ""
# Filter warnings from stderr
error_lines = [l for l in error.split("\n")
if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()]
filtered_error = "\n".join(error_lines)
# Try to parse JSON output for session_id and result
output = raw_output
try:
data = json.loads(raw_output)
output = data.get("result", raw_output)
# Capture Claude's session ID for future --resume calls
new_claude_sid = data.get("session_id")
if new_claude_sid:
_claude_session_map[session_id] = new_claude_sid
logger.info(f"Mapped session {session_id[:8]}... -> Claude {new_claude_sid[:8]}...")
# Check if Claude reported an error
if data.get("is_error"):
save_chat_message(session_id, "error", output, metadata={"claude_session_id": new_claude_sid})
return {"session_id": session_id, "response": output, "status": "error"}
except (json.JSONDecodeError, TypeError):
pass
if process.returncode == 0 and output:
new_claude_sid = _claude_session_map.get(session_id)
save_chat_message(session_id, "assistant", output, model=msg.model,
metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None)
return {
"session_id": session_id,
"response": output,
"status": "ok"
}
else:
error_msg = filtered_error or output or "No response from Claude"
save_chat_message(session_id, "error", error_msg)
return {
"session_id": session_id,
"response": error_msg,
"status": "error"
}
except asyncio.TimeoutError:
save_chat_message(session_id, "error", "Response timeout (>120s)")
return {"session_id": session_id, "response": "Response timeout (>120s)", "status": "error"}
except Exception as e:
save_chat_message(session_id, "error", str(e))
return {"session_id": session_id, "response": str(e), "status": "error"}
@app.websocket("/api/chat/ws/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str):
"""WebSocket endpoint for streaming chat.
Uses --output-format stream-json for streaming, and captures the
Claude session_id from the final 'result' message for --resume on
subsequent messages in the same chat session.
"""
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
user_message = msg.get("message", "")
model = msg.get("model")
tools = msg.get("tools")
system_prompt = msg.get("system_prompt")
if not user_message:
await websocket.send_json({"type": "error", "content": "Empty message"})
continue
save_chat_message(session_id, "user", user_message, model=model)
env = _get_claude_env()
# Check for existing Claude session to resume
claude_sid = _get_claude_session_id(session_id)
cmd = _build_claude_cmd(
prompt=user_message,
model=model,
tools=tools,
system_prompt=system_prompt,
resume_session_id=claude_sid
)
# Use stream-json for streaming output
try:
p_idx = cmd.index("-p")
cmd.insert(p_idx + 1, "--output-format")
cmd.insert(p_idx + 2, "stream-json")
except ValueError:
pass
await websocket.send_json({"type": "status", "content": "thinking"})
try:
logger.info(f"Chat WS cmd: {' '.join(cmd[:6])}...")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
env=env
)
_chat_sessions[session_id] = process
# Stream stdout line by line (stream-json emits one JSON object per line)
full_output = ""
while True:
try:
line = await asyncio.wait_for(
process.stdout.readline(), timeout=180
)
except asyncio.TimeoutError:
await websocket.send_json({"type": "error", "content": "Response timeout (>180s)"})
break
if not line:
break
text = line.decode(errors="replace").strip()
if not text:
continue
# Try to parse stream-json events
try:
event = json.loads(text)
event_type = event.get("type", "")
if event_type == "assistant":
# Assistant message with content blocks
content = event.get("message", {}).get("content", [])
for block in content:
if block.get("type") == "text":
chunk = block.get("text", "")
full_output += chunk
await websocket.send_json({"type": "chunk", "content": chunk})
elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
chunk = delta.get("text", "")
full_output += chunk
await websocket.send_json({"type": "chunk", "content": chunk})
elif event_type == "result":
# Final result — capture session_id for --resume
result_text = event.get("result", "")
new_claude_sid = event.get("session_id")
is_error = event.get("is_error", False)
if new_claude_sid:
_claude_session_map[session_id] = new_claude_sid
logger.info(f"WS mapped {session_id[:8]}... -> Claude {new_claude_sid[:8]}...")
if is_error:
error_content = result_text or full_output or "Claude returned an error"
save_chat_message(session_id, "error", error_content,
metadata={"claude_session_id": new_claude_sid})
await websocket.send_json({"type": "error", "content": error_content})
elif result_text and not full_output:
# If we got no streaming chunks but have a result
full_output = result_text
await websocket.send_json({"type": "chunk", "content": result_text})
else:
# Unknown event type — might contain text content
pass
except json.JSONDecodeError:
# Not JSON — treat as raw text output
full_output += text + "\n"
await websocket.send_json({"type": "chunk", "content": text + "\n"})
await process.wait()
stderr_data = await process.stderr.read()
stderr_text = stderr_data.decode(errors="replace") if stderr_data else ""
if process.returncode == 0 and full_output.strip():
new_claude_sid = _claude_session_map.get(session_id)
save_chat_message(session_id, "assistant", full_output.strip(), model=model,
metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None)
await websocket.send_json({
"type": "done",
"content": full_output.strip()
})
elif not full_output.strip():
# No output at all
error_lines = [l for l in stderr_text.split("\n")
if l.strip() and "stdin" not in l.lower()]
clean_error = "\n".join(error_lines) or "No response from Claude"
save_chat_message(session_id, "error", clean_error)
await websocket.send_json({"type": "error", "content": clean_error})
else:
# Had output but non-zero return code — still send done
await websocket.send_json({
"type": "done",
"content": full_output.strip()
})
except Exception as e:
logger.error(f"Chat WS error: {e}")
await websocket.send_json({"type": "error", "content": str(e)})
finally:
_chat_sessions.pop(session_id, None)
except WebSocketDisconnect:
logger.info(f"Chat WebSocket disconnected: {session_id}")
proc = _chat_sessions.pop(session_id, None)
if proc and proc.returncode is None:
try:
proc.kill()
except Exception:
pass
@app.get("/api/chat/sessions")
async def get_chat_sessions():
"""List all chat sessions"""
return list_chat_sessions()
@app.get("/api/chat/history/{session_id}")
async def get_chat_session_history(session_id: str, limit: int = 50):
"""Get chat history for a session"""
return get_chat_history(session_id, limit)
@app.delete("/api/chat/sessions/{session_id}")
async def delete_chat_session(session_id: str):
"""Delete a chat session"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM chat_messages WHERE session_id = ?", (session_id,))
conn.commit()
conn.close()
return {"status": "deleted"}
# ---- MCP Servers ----
@app.get("/api/mcp/servers")
async def list_mcp_servers():
try:
proc = await asyncio.create_subprocess_exec(
"claude", "mcp", "list",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace").strip()
servers = []
if "No MCP servers configured" in output:
return {"servers": [], "raw": output}
try:
data = json.loads(output)
if isinstance(data, list):
servers = data
elif isinstance(data, dict):
servers = list(data.values()) if data else []
except Exception:
for line in output.split("\n"):
line = line.strip()
if line and not line.startswith("-") and not line.startswith("Name"):
parts = line.split()
if len(parts) >= 1:
servers.append({"name": parts[0], "details": " ".join(parts[1:])})
return {"servers": servers, "raw": output}
except Exception as e:
return {"servers": [], "error": str(e)}
@app.post("/api/mcp/servers")
async def add_mcp_server(server: McpServerAdd):
try:
cmd = ["claude", "mcp", "add", server.name]
if server.server_type == "sse" and server.url:
cmd.extend(["--transport", "sse", server.url])
elif server.server_type == "stdio" and server.command:
cmd.extend(["--transport", "stdio", server.command])
if server.args:
cmd.extend(server.args)
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
return {"status": "ok" if proc.returncode == 0 else "error",
"message": output.strip(), "name": server.name}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.delete("/api/mcp/servers/{name}")
async def remove_mcp_server(name: str):
try:
proc = await asyncio.create_subprocess_exec(
"claude", "mcp", "remove", name,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()}
except Exception as e:
return {"status": "error", "message": str(e)}
# Serve static frontend — MUST be last
app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)