1191 lines
41 KiB
Python
1191 lines
41 KiB
Python
"""
|
|
Claude Persistent Agent - FastAPI Backend
|
|
Main application with task scheduling, live chat, and agent orchestration
|
|
"""
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict, Any
|
|
import asyncio
|
|
import json
|
|
import sqlite3
|
|
import subprocess
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
import uuid
|
|
from pathlib import Path
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
import os
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(title="Claude Persistent Agent", version="2.0.0")
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Database setup
|
|
DB_PATH = Path("/app/data/tasks.db")
|
|
LOGS_PATH = Path("/app/logs")
|
|
TASKS_PATH = Path("/app/tasks")
|
|
TOKEN_FILE = Path("/app/data/.claude_token")
|
|
CHAT_HISTORY_PATH = Path("/app/data/chat_history.json")
|
|
|
|
# Ensure directories exist
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
LOGS_PATH.mkdir(parents=True, exist_ok=True)
|
|
TASKS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
# ============ Models ============
|
|
|
|
class Task(BaseModel):
|
|
id: Optional[str] = None
|
|
name: str
|
|
description: Optional[str] = None
|
|
prompt: str
|
|
schedule_type: str = "manual" # "once", "recurring", "manual"
|
|
schedule_value: Optional[str] = None
|
|
enabled: bool = True
|
|
created_at: Optional[str] = None
|
|
last_run: Optional[str] = None
|
|
next_run: Optional[str] = None
|
|
status: str = "idle"
|
|
# Agent orchestration fields
|
|
agent_model: Optional[str] = None # "sonnet", "opus", "haiku"
|
|
agent_tools: Optional[str] = None # comma-separated: "Bash,Read,Write,Edit"
|
|
agent_mcp_servers: Optional[str] = None # comma-separated MCP server names
|
|
agent_system_prompt: Optional[str] = None # custom system prompt
|
|
agent_max_turns: Optional[int] = None # max conversation turns
|
|
agent_permission_mode: str = "auto" # "auto", "acceptEdits", "plan"
|
|
agent_timeout: int = 300 # seconds
|
|
|
|
|
|
class TaskRun(BaseModel):
|
|
task_id: str
|
|
run_id: str
|
|
status: str
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
class ChatMessage(BaseModel):
|
|
message: str
|
|
model: Optional[str] = None
|
|
system_prompt: Optional[str] = None
|
|
tools: Optional[str] = None
|
|
session_id: Optional[str] = None
|
|
|
|
|
|
class TokenSubmit(BaseModel):
|
|
token: str
|
|
token_type: str = "oauth_token"
|
|
|
|
|
|
class McpServerAdd(BaseModel):
|
|
name: str
|
|
server_type: str = "sse"
|
|
url: Optional[str] = None
|
|
command: Optional[str] = None
|
|
args: Optional[List[str]] = None
|
|
|
|
|
|
# ============ Database ============
|
|
|
|
def init_db():
|
|
"""Initialize SQLite database"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
prompt TEXT NOT NULL,
|
|
schedule_type TEXT NOT NULL,
|
|
schedule_value TEXT,
|
|
enabled BOOLEAN DEFAULT 1,
|
|
created_at TEXT,
|
|
last_run TEXT,
|
|
next_run TEXT,
|
|
status TEXT DEFAULT 'idle',
|
|
agent_model TEXT,
|
|
agent_tools TEXT,
|
|
agent_mcp_servers TEXT,
|
|
agent_system_prompt TEXT,
|
|
agent_max_turns INTEGER,
|
|
agent_permission_mode TEXT DEFAULT 'auto',
|
|
agent_timeout INTEGER DEFAULT 300
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS task_runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
task_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
output TEXT,
|
|
error TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
FOREIGN KEY (task_id) REFERENCES tasks(id)
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS chat_messages (
|
|
id TEXT PRIMARY KEY,
|
|
session_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL,
|
|
model TEXT,
|
|
metadata TEXT
|
|
)
|
|
""")
|
|
|
|
# Migration: add new columns if they don't exist
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_model TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_tools TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_mcp_servers TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_system_prompt TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_max_turns INTEGER")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_permission_mode TEXT DEFAULT 'auto'")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
try:
|
|
cursor.execute("ALTER TABLE tasks ADD COLUMN agent_timeout INTEGER DEFAULT 300")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ============ Auth Helpers ============
|
|
|
|
def _get_claude_env():
|
|
"""Get environment with auth tokens set for Claude CLI"""
|
|
env = os.environ.copy()
|
|
if TOKEN_FILE.exists():
|
|
try:
|
|
saved = json.loads(TOKEN_FILE.read_text())
|
|
if saved.get("type") == "oauth_token" and saved.get("token"):
|
|
env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
|
|
elif saved.get("type") == "api_key" and saved.get("token"):
|
|
env["ANTHROPIC_API_KEY"] = saved["token"]
|
|
except Exception as e:
|
|
logger.error(f"Failed to load saved token: {e}")
|
|
return env
|
|
|
|
|
|
def _build_claude_cmd(prompt: str, model: str = None, tools: str = None,
|
|
system_prompt: str = None, permission_mode: str = "auto",
|
|
resume_session_id: str = None) -> list:
|
|
"""Build a claude CLI command with agent configuration.
|
|
|
|
Note: --session-id requires a valid UUID and creates a persistent session.
|
|
For chat continuity, use resume_session_id with --resume.
|
|
For one-shot tasks, omit resume_session_id.
|
|
"""
|
|
cmd = ["claude", "-p"]
|
|
|
|
if model:
|
|
cmd.extend(["--model", model])
|
|
|
|
if tools:
|
|
cmd.extend(["--allowedTools", tools])
|
|
else:
|
|
cmd.extend(["--allowedTools", "Bash,Read,Write,Edit"])
|
|
|
|
if permission_mode:
|
|
cmd.extend(["--permission-mode", permission_mode])
|
|
|
|
if system_prompt:
|
|
cmd.extend(["--system-prompt", system_prompt])
|
|
|
|
# Resume an existing Claude session for multi-turn chat
|
|
if resume_session_id:
|
|
cmd.extend(["--resume", resume_session_id])
|
|
|
|
cmd.append(prompt)
|
|
return cmd
|
|
|
|
|
|
# ============ Task Execution ============
|
|
|
|
async def run_claude_task(task: Task, run_id: str):
|
|
"""Execute a Claude Code task as an agent"""
|
|
started = datetime.now().isoformat()
|
|
try:
|
|
env = _get_claude_env()
|
|
|
|
cmd = _build_claude_cmd(
|
|
prompt=task.prompt,
|
|
model=task.agent_model,
|
|
tools=task.agent_tools,
|
|
system_prompt=task.agent_system_prompt,
|
|
permission_mode=task.agent_permission_mode
|
|
)
|
|
|
|
timeout = task.agent_timeout or 300
|
|
|
|
logger.info(f"Running task {task.name}: {' '.join(cmd)}")
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL, # Fix stdin warning
|
|
env=env
|
|
)
|
|
|
|
stdout, stderr = await asyncio.wait_for(
|
|
process.communicate(), timeout=timeout
|
|
)
|
|
|
|
output = stdout.decode(errors="replace") if stdout else ""
|
|
error = stderr.decode(errors="replace") if stderr else ""
|
|
|
|
# Filter out non-critical warnings from stderr
|
|
error_lines = [l for l in error.split("\n")
|
|
if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()]
|
|
filtered_error = "\n".join(error_lines)
|
|
|
|
status = "completed" if process.returncode == 0 else "failed"
|
|
|
|
save_task_run(TaskRun(
|
|
task_id=task.id, run_id=run_id, status=status,
|
|
output=output,
|
|
error=filtered_error if status == "failed" else None,
|
|
started_at=started, completed_at=datetime.now().isoformat()
|
|
))
|
|
update_task_status(task.id, status)
|
|
|
|
except asyncio.TimeoutError:
|
|
save_task_run(TaskRun(
|
|
task_id=task.id, run_id=run_id, status="failed",
|
|
error=f"Task timeout (>{task.agent_timeout or 300}s)",
|
|
started_at=started, completed_at=datetime.now().isoformat()
|
|
))
|
|
update_task_status(task.id, "failed")
|
|
except Exception as e:
|
|
save_task_run(TaskRun(
|
|
task_id=task.id, run_id=run_id, status="failed",
|
|
error=str(e),
|
|
started_at=started, completed_at=datetime.now().isoformat()
|
|
))
|
|
update_task_status(task.id, "failed")
|
|
|
|
|
|
# ============ DB Operations ============
|
|
|
|
def save_task(task: Task):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
if not task.id:
|
|
task.id = str(uuid.uuid4())
|
|
if not task.created_at:
|
|
task.created_at = datetime.now().isoformat()
|
|
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO tasks
|
|
(id, name, description, prompt, schedule_type, schedule_value, enabled,
|
|
created_at, status, agent_model, agent_tools, agent_mcp_servers,
|
|
agent_system_prompt, agent_max_turns, agent_permission_mode, agent_timeout)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
task.id, task.name, task.description, task.prompt,
|
|
task.schedule_type, task.schedule_value, task.enabled,
|
|
task.created_at, task.status,
|
|
task.agent_model, task.agent_tools, task.agent_mcp_servers,
|
|
task.agent_system_prompt, task.agent_max_turns,
|
|
task.agent_permission_mode, task.agent_timeout
|
|
))
|
|
conn.commit()
|
|
conn.close()
|
|
return task
|
|
|
|
|
|
def save_task_run(run: TaskRun):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO task_runs
|
|
(run_id, task_id, status, output, error, started_at, completed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (run.run_id, run.task_id, run.status, run.output,
|
|
run.error, run.started_at, run.completed_at))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def update_task_status(task_id: str, status: str):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?",
|
|
(status, datetime.now().isoformat(), task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Task]:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row:
|
|
return Task(**dict(row))
|
|
return None
|
|
|
|
|
|
def get_all_tasks() -> List[Task]:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [Task(**dict(row)) for row in rows]
|
|
|
|
|
|
# ============ Scheduler ============
|
|
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
|
def schedule_task(task: Task):
|
|
if not task.enabled or task.schedule_type == "manual":
|
|
return
|
|
if task.schedule_type == "recurring" and task.schedule_value:
|
|
try:
|
|
scheduler.add_job(
|
|
_sync_run_task, CronTrigger.from_crontab(task.schedule_value),
|
|
id=task.id, args=[task], replace_existing=True
|
|
)
|
|
logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule task {task.name}: {e}")
|
|
|
|
|
|
def _sync_run_task(task: Task):
|
|
"""Sync wrapper for scheduled tasks (APScheduler doesn't support async natively)"""
|
|
loop = asyncio.new_event_loop()
|
|
run_id = str(uuid.uuid4())
|
|
update_task_status(task.id, "running")
|
|
loop.run_until_complete(run_claude_task(task, run_id))
|
|
loop.close()
|
|
|
|
|
|
# ============ Chat Sessions ============
|
|
|
|
# Active chat sessions: our_session_id -> process
|
|
_chat_sessions: Dict[str, asyncio.subprocess.Process] = {}
|
|
# Mapping: our_session_id -> claude_session_id (for --resume)
|
|
_claude_session_map: Dict[str, str] = {}
|
|
|
|
|
|
def save_chat_message(session_id: str, role: str, content: str, model: str = None, metadata: dict = None):
|
|
"""Save a chat message to the database"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO chat_messages (id, session_id, role, content, timestamp, model, metadata)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (str(uuid.uuid4()), session_id, role, content,
|
|
datetime.now().isoformat(), model, json.dumps(metadata) if metadata else None))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _get_claude_session_id(our_session_id: str) -> Optional[str]:
|
|
"""Get the Claude CLI session ID mapped to our session ID"""
|
|
if our_session_id in _claude_session_map:
|
|
return _claude_session_map[our_session_id]
|
|
# Also check DB metadata
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT metadata FROM chat_messages
|
|
WHERE session_id = ? AND metadata IS NOT NULL
|
|
ORDER BY timestamp DESC LIMIT 1
|
|
""", (our_session_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row and row[0]:
|
|
try:
|
|
meta = json.loads(row[0])
|
|
claude_sid = meta.get("claude_session_id")
|
|
if claude_sid:
|
|
_claude_session_map[our_session_id] = claude_sid
|
|
return claude_sid
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_chat_history(session_id: str, limit: int = 50) -> List[Dict]:
|
|
"""Get chat history for a session"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT * FROM chat_messages WHERE session_id = ?
|
|
ORDER BY timestamp ASC LIMIT ?
|
|
""", (session_id, limit))
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
def list_chat_sessions() -> List[Dict]:
|
|
"""List all chat sessions with latest message"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT session_id,
|
|
COUNT(*) as message_count,
|
|
MIN(timestamp) as started_at,
|
|
MAX(timestamp) as last_message,
|
|
(SELECT content FROM chat_messages cm2
|
|
WHERE cm2.session_id = cm.session_id AND cm2.role = 'user'
|
|
ORDER BY timestamp ASC LIMIT 1) as first_message
|
|
FROM chat_messages cm
|
|
GROUP BY session_id
|
|
ORDER BY MAX(timestamp) DESC
|
|
LIMIT 20
|
|
""")
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
# ============ API Routes ============
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
init_db()
|
|
scheduler.start()
|
|
|
|
if TOKEN_FILE.exists():
|
|
try:
|
|
saved = json.loads(TOKEN_FILE.read_text())
|
|
if saved.get("type") == "oauth_token" and saved.get("token"):
|
|
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
|
|
logger.info("Loaded saved OAuth token")
|
|
elif saved.get("type") == "api_key" and saved.get("token"):
|
|
os.environ["ANTHROPIC_API_KEY"] = saved["token"]
|
|
logger.info("Loaded saved API key")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load saved token: {e}")
|
|
|
|
for task in get_all_tasks():
|
|
if task.enabled:
|
|
schedule_task(task)
|
|
|
|
logger.info("Claude Persistent Agent v2.0 started")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown():
|
|
scheduler.shutdown()
|
|
# Kill any active chat sessions
|
|
for sid, proc in _chat_sessions.items():
|
|
try:
|
|
proc.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "timestamp": datetime.now().isoformat(), "version": "2.0.0"}
|
|
|
|
|
|
# ---- Task CRUD ----
|
|
|
|
@app.post("/api/tasks")
|
|
async def create_task(task: Task) -> Task:
|
|
saved_task = save_task(task)
|
|
schedule_task(saved_task)
|
|
return saved_task
|
|
|
|
|
|
@app.get("/api/tasks")
|
|
async def list_tasks() -> List[Task]:
|
|
return get_all_tasks()
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}")
|
|
async def get_task_endpoint(task_id: str) -> Task:
|
|
task = get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return task
|
|
|
|
|
|
@app.put("/api/tasks/{task_id}")
|
|
async def update_task_endpoint(task_id: str, task: Task) -> Task:
|
|
task.id = task_id
|
|
saved_task = save_task(task)
|
|
schedule_task(saved_task)
|
|
return saved_task
|
|
|
|
|
|
@app.delete("/api/tasks/{task_id}")
|
|
async def delete_task_endpoint(task_id: str):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
try:
|
|
if scheduler.get_job(task_id):
|
|
scheduler.remove_job(task_id)
|
|
except Exception:
|
|
pass
|
|
return {"status": "deleted"}
|
|
|
|
|
|
@app.post("/api/tasks/{task_id}/run")
|
|
async def run_task_manual(task_id: str, background_tasks: BackgroundTasks):
|
|
task = get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
run_id = str(uuid.uuid4())
|
|
update_task_status(task_id, "running")
|
|
background_tasks.add_task(run_claude_task, task, run_id)
|
|
return {"run_id": run_id, "status": "started"}
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}/runs")
|
|
async def get_task_runs(task_id: str) -> List[Dict]:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50",
|
|
(task_id,)
|
|
)
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
# ---- System Info ----
|
|
|
|
@app.get("/api/system/info")
|
|
async def system_info() -> Dict:
|
|
tasks = get_all_tasks()
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM task_runs")
|
|
total_runs = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'")
|
|
completed_runs = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'")
|
|
failed_runs = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'")
|
|
running_runs = cursor.fetchone()[0]
|
|
conn.close()
|
|
return {
|
|
"app_name": "Claude Persistent Agent",
|
|
"version": "2.0.0",
|
|
"uptime": datetime.now().isoformat(),
|
|
"scheduler_running": scheduler.running,
|
|
"task_count": len(tasks),
|
|
"total_runs": total_runs,
|
|
"completed_runs": completed_runs,
|
|
"failed_runs": failed_runs,
|
|
"running_runs": running_runs,
|
|
"active_chat_sessions": len(_chat_sessions),
|
|
}
|
|
|
|
|
|
@app.get("/api/system/usage")
|
|
async def usage_stats() -> Dict:
|
|
usage = {
|
|
"models_used": [], "session_count": 0,
|
|
"last_reset": None, "next_reset": None,
|
|
"note": "Usage data sourced from ~/.claude session cache"
|
|
}
|
|
try:
|
|
claude_dir = Path("/root/.claude")
|
|
sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl"))
|
|
usage["session_count"] = len(sessions)
|
|
now = datetime.now()
|
|
next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1)
|
|
reset = next_month.replace(hour=0, minute=0, second=0)
|
|
usage["next_reset"] = reset.isoformat()
|
|
usage["days_until_reset"] = (reset - now).days
|
|
except Exception as e:
|
|
usage["error"] = str(e)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs")
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
usage["claude_runs_total"] = row[0]
|
|
usage["first_run"] = row[1]
|
|
usage["last_run"] = row[2]
|
|
return usage
|
|
|
|
|
|
# ---- Auth ----
|
|
|
|
@app.get("/api/auth/status")
|
|
async def auth_status():
|
|
account = None
|
|
auth_method = None
|
|
status = "logged_out"
|
|
has_saved_token = TOKEN_FILE.exists()
|
|
token_type = None
|
|
|
|
if has_saved_token:
|
|
try:
|
|
saved = json.loads(TOKEN_FILE.read_text())
|
|
token_type = saved.get("type")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
env = _get_claude_env()
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"claude", "auth", "status", "--json",
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
env=env
|
|
)
|
|
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
output = stdout.decode(errors="replace").strip()
|
|
data = json.loads(output)
|
|
if data.get("loggedIn"):
|
|
status = "logged_in"
|
|
account = data.get("email") or data.get("account", {}).get("emailAddress")
|
|
auth_method = data.get("authMethod")
|
|
except Exception as e:
|
|
logger.error(f"Auth status check failed: {e}")
|
|
|
|
return {
|
|
"status": status, "account": account, "auth_method": auth_method,
|
|
"has_saved_token": has_saved_token, "token_type": token_type
|
|
}
|
|
|
|
|
|
@app.post("/api/auth/token")
|
|
async def auth_set_token(payload: TokenSubmit):
|
|
token = payload.token.strip()
|
|
token_type = payload.token_type
|
|
|
|
if not token:
|
|
return {"status": "error", "message": "Token cannot be empty"}
|
|
|
|
# Auto-detect token type from prefix
|
|
if token.startswith("sk-ant-oat"):
|
|
token_type = "oauth_token"
|
|
elif token.startswith("sk-ant-api"):
|
|
token_type = "api_key"
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid token format. Expected 'sk-ant-oat01-...' (setup token) or 'sk-ant-api03-...' (API key). Got: '{token[:12]}...'"
|
|
}
|
|
|
|
# Set env vars BEFORE saving so we can test
|
|
if token_type == "oauth_token":
|
|
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token
|
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
|
elif token_type == "api_key":
|
|
os.environ["ANTHROPIC_API_KEY"] = token
|
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
|
|
|
# Actually verify by making a real API call (not just checking auth status which only looks at env vars)
|
|
try:
|
|
env = _get_claude_env()
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"claude", "-p", "--output-format", "json", "--no-session-persistence",
|
|
"--max-budget-usd", "0.01",
|
|
"Reply with just the word VERIFIED",
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL, env=env
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
|
|
output = stdout.decode(errors="replace").strip()
|
|
|
|
try:
|
|
data = json.loads(output)
|
|
result = data.get("result", "")
|
|
is_error = data.get("is_error", False)
|
|
|
|
if is_error:
|
|
# Token was rejected — don't save it
|
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
|
return {
|
|
"status": "error",
|
|
"message": f"Token rejected by Claude API: {result}"
|
|
}
|
|
else:
|
|
# Token works! Save it.
|
|
TOKEN_FILE.write_text(json.dumps({
|
|
"type": token_type, "token": token,
|
|
"saved_at": datetime.now().isoformat()
|
|
}))
|
|
return {
|
|
"status": "logged_in",
|
|
"message": "Token verified and saved! Claude is ready.",
|
|
"auth_method": token_type
|
|
}
|
|
except json.JSONDecodeError:
|
|
# Couldn't parse JSON but got some output
|
|
if proc.returncode == 0:
|
|
TOKEN_FILE.write_text(json.dumps({
|
|
"type": token_type, "token": token,
|
|
"saved_at": datetime.now().isoformat()
|
|
}))
|
|
return {
|
|
"status": "logged_in",
|
|
"message": "Token appears to work. Saved!",
|
|
"auth_method": token_type
|
|
}
|
|
else:
|
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
|
return {
|
|
"status": "error",
|
|
"message": f"Token verification failed: {output or stderr.decode(errors='replace')}"
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
# If it timed out, it probably got through auth at least
|
|
TOKEN_FILE.write_text(json.dumps({
|
|
"type": token_type, "token": token,
|
|
"saved_at": datetime.now().isoformat()
|
|
}))
|
|
return {"status": "token_saved", "message": "Token saved but verification timed out. Try sending a chat message to confirm."}
|
|
except Exception as e:
|
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
|
return {"status": "error", "message": f"Token verification error: {str(e)}"}
|
|
|
|
|
|
@app.post("/api/auth/logout")
|
|
async def auth_logout():
|
|
if TOKEN_FILE.exists():
|
|
TOKEN_FILE.unlink()
|
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"claude", "auth", "logout",
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL
|
|
)
|
|
await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
except Exception:
|
|
pass
|
|
return {"status": "logged_out"}
|
|
|
|
|
|
# ---- Chat ----
|
|
|
|
@app.post("/api/chat/send")
|
|
async def chat_send(msg: ChatMessage):
|
|
"""Send a message to Claude and get a response (non-streaming).
|
|
|
|
Uses --output-format json to capture the Claude session_id for
|
|
conversation continuity via --resume on subsequent messages.
|
|
"""
|
|
session_id = msg.session_id or str(uuid.uuid4())
|
|
|
|
# Save user message
|
|
save_chat_message(session_id, "user", msg.message, model=msg.model)
|
|
|
|
env = _get_claude_env()
|
|
|
|
# Check if we have an existing Claude session to resume
|
|
claude_sid = _get_claude_session_id(session_id)
|
|
|
|
cmd = _build_claude_cmd(
|
|
prompt=msg.message,
|
|
model=msg.model,
|
|
tools=msg.tools,
|
|
system_prompt=msg.system_prompt,
|
|
resume_session_id=claude_sid # None for first message, session_id for follow-ups
|
|
)
|
|
|
|
# Insert --output-format json after "-p" so we can parse the session_id
|
|
try:
|
|
p_idx = cmd.index("-p")
|
|
cmd.insert(p_idx + 1, "--output-format")
|
|
cmd.insert(p_idx + 2, "json")
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
logger.info(f"Chat send cmd: {' '.join(cmd[:6])}...")
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
env=env
|
|
)
|
|
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
|
|
|
|
raw_output = stdout.decode(errors="replace").strip() if stdout else ""
|
|
error = stderr.decode(errors="replace").strip() if stderr else ""
|
|
|
|
# Filter warnings from stderr
|
|
error_lines = [l for l in error.split("\n")
|
|
if l.strip() and "stdin" not in l.lower() and "warning" not in l.lower()]
|
|
filtered_error = "\n".join(error_lines)
|
|
|
|
# Try to parse JSON output for session_id and result
|
|
output = raw_output
|
|
try:
|
|
data = json.loads(raw_output)
|
|
output = data.get("result", raw_output)
|
|
# Capture Claude's session ID for future --resume calls
|
|
new_claude_sid = data.get("session_id")
|
|
if new_claude_sid:
|
|
_claude_session_map[session_id] = new_claude_sid
|
|
logger.info(f"Mapped session {session_id[:8]}... -> Claude {new_claude_sid[:8]}...")
|
|
# Check if Claude reported an error
|
|
if data.get("is_error"):
|
|
save_chat_message(session_id, "error", output, metadata={"claude_session_id": new_claude_sid})
|
|
return {"session_id": session_id, "response": output, "status": "error"}
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
if process.returncode == 0 and output:
|
|
new_claude_sid = _claude_session_map.get(session_id)
|
|
save_chat_message(session_id, "assistant", output, model=msg.model,
|
|
metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None)
|
|
return {
|
|
"session_id": session_id,
|
|
"response": output,
|
|
"status": "ok"
|
|
}
|
|
else:
|
|
error_msg = filtered_error or output or "No response from Claude"
|
|
save_chat_message(session_id, "error", error_msg)
|
|
return {
|
|
"session_id": session_id,
|
|
"response": error_msg,
|
|
"status": "error"
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
save_chat_message(session_id, "error", "Response timeout (>120s)")
|
|
return {"session_id": session_id, "response": "Response timeout (>120s)", "status": "error"}
|
|
except Exception as e:
|
|
save_chat_message(session_id, "error", str(e))
|
|
return {"session_id": session_id, "response": str(e), "status": "error"}
|
|
|
|
|
|
@app.websocket("/api/chat/ws/{session_id}")
|
|
async def chat_websocket(websocket: WebSocket, session_id: str):
|
|
"""WebSocket endpoint for streaming chat.
|
|
|
|
Uses --output-format stream-json for streaming, and captures the
|
|
Claude session_id from the final 'result' message for --resume on
|
|
subsequent messages in the same chat session.
|
|
"""
|
|
await websocket.accept()
|
|
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_text()
|
|
msg = json.loads(data)
|
|
|
|
user_message = msg.get("message", "")
|
|
model = msg.get("model")
|
|
tools = msg.get("tools")
|
|
system_prompt = msg.get("system_prompt")
|
|
|
|
if not user_message:
|
|
await websocket.send_json({"type": "error", "content": "Empty message"})
|
|
continue
|
|
|
|
save_chat_message(session_id, "user", user_message, model=model)
|
|
|
|
env = _get_claude_env()
|
|
|
|
# Check for existing Claude session to resume
|
|
claude_sid = _get_claude_session_id(session_id)
|
|
|
|
cmd = _build_claude_cmd(
|
|
prompt=user_message,
|
|
model=model,
|
|
tools=tools,
|
|
system_prompt=system_prompt,
|
|
resume_session_id=claude_sid
|
|
)
|
|
|
|
# Use stream-json for streaming output
|
|
try:
|
|
p_idx = cmd.index("-p")
|
|
cmd.insert(p_idx + 1, "--output-format")
|
|
cmd.insert(p_idx + 2, "stream-json")
|
|
except ValueError:
|
|
pass
|
|
|
|
await websocket.send_json({"type": "status", "content": "thinking"})
|
|
|
|
try:
|
|
logger.info(f"Chat WS cmd: {' '.join(cmd[:6])}...")
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
env=env
|
|
)
|
|
|
|
_chat_sessions[session_id] = process
|
|
|
|
# Stream stdout line by line (stream-json emits one JSON object per line)
|
|
full_output = ""
|
|
while True:
|
|
try:
|
|
line = await asyncio.wait_for(
|
|
process.stdout.readline(), timeout=180
|
|
)
|
|
except asyncio.TimeoutError:
|
|
await websocket.send_json({"type": "error", "content": "Response timeout (>180s)"})
|
|
break
|
|
if not line:
|
|
break
|
|
text = line.decode(errors="replace").strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Try to parse stream-json events
|
|
try:
|
|
event = json.loads(text)
|
|
event_type = event.get("type", "")
|
|
|
|
if event_type == "assistant":
|
|
# Assistant message with content blocks
|
|
content = event.get("message", {}).get("content", [])
|
|
for block in content:
|
|
if block.get("type") == "text":
|
|
chunk = block.get("text", "")
|
|
full_output += chunk
|
|
await websocket.send_json({"type": "chunk", "content": chunk})
|
|
elif event_type == "content_block_delta":
|
|
delta = event.get("delta", {})
|
|
if delta.get("type") == "text_delta":
|
|
chunk = delta.get("text", "")
|
|
full_output += chunk
|
|
await websocket.send_json({"type": "chunk", "content": chunk})
|
|
elif event_type == "result":
|
|
# Final result — capture session_id for --resume
|
|
result_text = event.get("result", "")
|
|
new_claude_sid = event.get("session_id")
|
|
is_error = event.get("is_error", False)
|
|
|
|
if new_claude_sid:
|
|
_claude_session_map[session_id] = new_claude_sid
|
|
logger.info(f"WS mapped {session_id[:8]}... -> Claude {new_claude_sid[:8]}...")
|
|
|
|
if is_error:
|
|
error_content = result_text or full_output or "Claude returned an error"
|
|
save_chat_message(session_id, "error", error_content,
|
|
metadata={"claude_session_id": new_claude_sid})
|
|
await websocket.send_json({"type": "error", "content": error_content})
|
|
elif result_text and not full_output:
|
|
# If we got no streaming chunks but have a result
|
|
full_output = result_text
|
|
await websocket.send_json({"type": "chunk", "content": result_text})
|
|
else:
|
|
# Unknown event type — might contain text content
|
|
pass
|
|
except json.JSONDecodeError:
|
|
# Not JSON — treat as raw text output
|
|
full_output += text + "\n"
|
|
await websocket.send_json({"type": "chunk", "content": text + "\n"})
|
|
|
|
await process.wait()
|
|
|
|
stderr_data = await process.stderr.read()
|
|
stderr_text = stderr_data.decode(errors="replace") if stderr_data else ""
|
|
|
|
if process.returncode == 0 and full_output.strip():
|
|
new_claude_sid = _claude_session_map.get(session_id)
|
|
save_chat_message(session_id, "assistant", full_output.strip(), model=model,
|
|
metadata={"claude_session_id": new_claude_sid} if new_claude_sid else None)
|
|
await websocket.send_json({
|
|
"type": "done",
|
|
"content": full_output.strip()
|
|
})
|
|
elif not full_output.strip():
|
|
# No output at all
|
|
error_lines = [l for l in stderr_text.split("\n")
|
|
if l.strip() and "stdin" not in l.lower()]
|
|
clean_error = "\n".join(error_lines) or "No response from Claude"
|
|
save_chat_message(session_id, "error", clean_error)
|
|
await websocket.send_json({"type": "error", "content": clean_error})
|
|
else:
|
|
# Had output but non-zero return code — still send done
|
|
await websocket.send_json({
|
|
"type": "done",
|
|
"content": full_output.strip()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Chat WS error: {e}")
|
|
await websocket.send_json({"type": "error", "content": str(e)})
|
|
finally:
|
|
_chat_sessions.pop(session_id, None)
|
|
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Chat WebSocket disconnected: {session_id}")
|
|
proc = _chat_sessions.pop(session_id, None)
|
|
if proc and proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.get("/api/chat/sessions")
|
|
async def get_chat_sessions():
|
|
"""List all chat sessions"""
|
|
return list_chat_sessions()
|
|
|
|
|
|
@app.get("/api/chat/history/{session_id}")
|
|
async def get_chat_session_history(session_id: str, limit: int = 50):
|
|
"""Get chat history for a session"""
|
|
return get_chat_history(session_id, limit)
|
|
|
|
|
|
@app.delete("/api/chat/sessions/{session_id}")
|
|
async def delete_chat_session(session_id: str):
|
|
"""Delete a chat session"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM chat_messages WHERE session_id = ?", (session_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"status": "deleted"}
|
|
|
|
|
|
# ---- MCP Servers ----
|
|
|
|
@app.get("/api/mcp/servers")
|
|
async def list_mcp_servers():
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"claude", "mcp", "list",
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
output = stdout.decode(errors="replace").strip()
|
|
|
|
servers = []
|
|
if "No MCP servers configured" in output:
|
|
return {"servers": [], "raw": output}
|
|
try:
|
|
data = json.loads(output)
|
|
if isinstance(data, list):
|
|
servers = data
|
|
elif isinstance(data, dict):
|
|
servers = list(data.values()) if data else []
|
|
except Exception:
|
|
for line in output.split("\n"):
|
|
line = line.strip()
|
|
if line and not line.startswith("-") and not line.startswith("Name"):
|
|
parts = line.split()
|
|
if len(parts) >= 1:
|
|
servers.append({"name": parts[0], "details": " ".join(parts[1:])})
|
|
return {"servers": servers, "raw": output}
|
|
except Exception as e:
|
|
return {"servers": [], "error": str(e)}
|
|
|
|
|
|
@app.post("/api/mcp/servers")
|
|
async def add_mcp_server(server: McpServerAdd):
|
|
try:
|
|
cmd = ["claude", "mcp", "add", server.name]
|
|
if server.server_type == "sse" and server.url:
|
|
cmd.extend(["--transport", "sse", server.url])
|
|
elif server.server_type == "stdio" and server.command:
|
|
cmd.extend(["--transport", "stdio", server.command])
|
|
if server.args:
|
|
cmd.extend(server.args)
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
|
|
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
|
|
return {"status": "ok" if proc.returncode == 0 else "error",
|
|
"message": output.strip(), "name": server.name}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
@app.delete("/api/mcp/servers/{name}")
|
|
async def remove_mcp_server(name: str):
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"claude", "mcp", "remove", name,
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
|
|
return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
# Serve static frontend — MUST be last
|
|
app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|