This repository has been archived on 2026-04-05. You can view files and clone it, but cannot push or open issues or pull requests.
claude-persistent-agent/backend/main.py

782 lines
24 KiB
Python

"""
Claude Persistent Agent - FastAPI Backend
Main application with task scheduling and management
"""
from fastapi import FastAPI, WebSocket, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import asyncio
import json
import sqlite3
import subprocess
import logging
from datetime import datetime, timedelta
import uuid
from pathlib import Path
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import os
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Claude Persistent Agent", version="1.0.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database setup
DB_PATH = Path("/app/data/tasks.db")
LOGS_PATH = Path("/app/logs")
TASKS_PATH = Path("/app/tasks")
# Ensure directories exist
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
LOGS_PATH.mkdir(parents=True, exist_ok=True)
TASKS_PATH.mkdir(parents=True, exist_ok=True)
class Task(BaseModel):
id: Optional[str] = None
name: str
description: Optional[str] = None
prompt: str
schedule_type: str # "once", "recurring", "manual"
schedule_value: Optional[str] = None # cron expression or ISO datetime
enabled: bool = True
created_at: Optional[str] = None
last_run: Optional[str] = None
next_run: Optional[str] = None
status: str = "idle" # idle, running, completed, failed
class TaskRun(BaseModel):
task_id: str
run_id: str
status: str # running, completed, failed
output: Optional[str] = None
error: Optional[str] = None
started_at: str
completed_at: Optional[str] = None
def init_db():
"""Initialize SQLite database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
prompt TEXT NOT NULL,
schedule_type TEXT NOT NULL,
schedule_value TEXT,
enabled BOOLEAN DEFAULT 1,
created_at TEXT,
last_run TEXT,
next_run TEXT,
status TEXT DEFAULT 'idle'
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_runs (
run_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
status TEXT NOT NULL,
output TEXT,
error TEXT,
started_at TEXT,
completed_at TEXT,
FOREIGN KEY (task_id) REFERENCES tasks(id)
)
""")
conn.commit()
conn.close()
async def run_claude_task(task: Task, run_id: str):
"""Execute a Claude Code task"""
try:
# Save task prompt to temp file
prompt_file = TASKS_PATH / f"{run_id}.md"
prompt_file.write_text(task.prompt)
# Run Claude Code
process = await asyncio.create_subprocess_exec(
"claude",
"task",
str(prompt_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
output = stdout.decode() if stdout else ""
error = stderr.decode() if stderr else ""
# Save run result
save_task_run(TaskRun(
task_id=task.id,
run_id=run_id,
status="completed" if process.returncode == 0 else "failed",
output=output,
error=error if process.returncode != 0 else None,
started_at=datetime.now().isoformat(),
completed_at=datetime.now().isoformat()
))
# Update task
update_task_status(task.id, "completed")
except asyncio.TimeoutError:
save_task_run(TaskRun(
task_id=task.id,
run_id=run_id,
status="failed",
error="Task timeout (>5 minutes)",
started_at=datetime.now().isoformat(),
completed_at=datetime.now().isoformat()
))
update_task_status(task.id, "failed")
except Exception as e:
save_task_run(TaskRun(
task_id=task.id,
run_id=run_id,
status="failed",
error=str(e),
started_at=datetime.now().isoformat(),
completed_at=datetime.now().isoformat()
))
update_task_status(task.id, "failed")
def save_task(task: Task):
"""Save task to database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if not task.id:
task.id = str(uuid.uuid4())
if not task.created_at:
task.created_at = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO tasks
(id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task.id, task.name, task.description, task.prompt,
task.schedule_type, task.schedule_value, task.enabled,
task.created_at, task.status
))
conn.commit()
conn.close()
return task
def save_task_run(run: TaskRun):
"""Save task run result"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO task_runs
(run_id, task_id, status, output, error, started_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
run.run_id, run.task_id, run.status, run.output,
run.error, run.started_at, run.completed_at
))
conn.commit()
conn.close()
def update_task_status(task_id: str, status: str):
"""Update task status"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?",
(status, datetime.now().isoformat(), task_id))
conn.commit()
conn.close()
def get_task(task_id: str) -> Optional[Task]:
"""Get task from database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
row = cursor.fetchone()
conn.close()
if row:
return Task(
id=row[0], name=row[1], description=row[2], prompt=row[3],
schedule_type=row[4], schedule_value=row[5], enabled=row[6],
created_at=row[7], last_run=row[8], next_run=row[9], status=row[10]
)
return None
def get_all_tasks() -> List[Task]:
"""Get all tasks"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [
Task(
id=row[0], name=row[1], description=row[2], prompt=row[3],
schedule_type=row[4], schedule_value=row[5], enabled=row[6],
created_at=row[7], last_run=row[8], next_run=row[9], status=row[10]
)
for row in rows
]
# Initialize scheduler
scheduler = BackgroundScheduler()
def schedule_task(task: Task):
"""Schedule a task with APScheduler"""
if not task.enabled or task.schedule_type == "manual":
return
if task.schedule_type == "recurring" and task.schedule_value:
try:
scheduler.add_job(
run_task_job,
CronTrigger.from_crontab(task.schedule_value),
id=task.id,
args=[task],
replace_existing=True
)
logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}")
except Exception as e:
logger.error(f"Failed to schedule task {task.name}: {e}")
async def run_task_job(task: Task):
"""Background job to run a task"""
run_id = str(uuid.uuid4())
update_task_status(task.id, "running")
await run_claude_task(task, run_id)
# ============ API Routes ============
@app.on_event("startup")
async def startup():
"""Initialize on startup"""
init_db()
scheduler.start()
# Schedule existing tasks
for task in get_all_tasks():
if task.enabled:
schedule_task(task)
logger.info("Claude Persistent Agent started")
@app.on_event("shutdown")
async def shutdown():
"""Cleanup on shutdown"""
scheduler.shutdown()
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
@app.post("/api/tasks")
async def create_task(task: Task) -> Task:
"""Create a new task"""
saved_task = save_task(task)
schedule_task(saved_task)
return saved_task
@app.get("/api/tasks")
async def list_tasks() -> List[Task]:
"""List all tasks"""
return get_all_tasks()
@app.get("/api/tasks/{task_id}")
async def get_task_endpoint(task_id: str) -> Task:
"""Get a specific task"""
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/api/tasks/{task_id}")
async def update_task_endpoint(task_id: str, task: Task) -> Task:
"""Update a task"""
task.id = task_id
saved_task = save_task(task)
schedule_task(saved_task)
return saved_task
@app.delete("/api/tasks/{task_id}")
async def delete_task_endpoint(task_id: str):
"""Delete a task"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
conn.commit()
conn.close()
if scheduler.get_job(task_id):
scheduler.remove_job(task_id)
return {"status": "deleted"}
@app.post("/api/tasks/{task_id}/run")
async def run_task_manual(task_id: str, background_tasks: BackgroundTasks):
"""Manually trigger a task"""
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
run_id = str(uuid.uuid4())
update_task_status(task_id, "running")
background_tasks.add_task(run_claude_task, task, run_id)
return {"run_id": run_id, "status": "started"}
@app.get("/api/tasks/{task_id}/runs")
async def get_task_runs(task_id: str) -> List[Dict]:
"""Get runs for a task"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50",
(task_id,)
)
rows = cursor.fetchall()
conn.close()
return [
{
"run_id": row[0],
"task_id": row[1],
"status": row[2],
"output": row[3],
"error": row[4],
"started_at": row[5],
"completed_at": row[6]
}
for row in rows
]
@app.get("/api/system/info")
async def system_info() -> Dict:
"""Get system information"""
tasks = get_all_tasks()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM task_runs")
total_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'")
completed_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'")
failed_runs = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'")
running_runs = cursor.fetchone()[0]
conn.close()
return {
"app_name": "Claude Persistent Agent",
"version": "1.0.0",
"uptime": datetime.now().isoformat(),
"scheduler_running": scheduler.running,
"task_count": len(tasks),
"total_runs": total_runs,
"completed_runs": completed_runs,
"failed_runs": failed_runs,
"running_runs": running_runs,
}
@app.get("/api/system/usage")
async def usage_stats() -> Dict:
"""Get Claude API usage stats from session files if available"""
import glob, json as _json
usage = {
"models_used": [],
"session_count": 0,
"last_reset": None,
"next_reset": None,
"note": "Usage data sourced from ~/.claude session cache"
}
try:
# Claude Code stores session info under ~/.claude
claude_dir = Path("/root/.claude")
sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl"))
usage["session_count"] = len(sessions)
# Try to parse any usage metadata
stats_file = claude_dir / "usage_stats.json"
if stats_file.exists():
with open(stats_file) as f:
saved = _json.load(f)
usage.update(saved)
else:
# Estimate reset time: Anthropic resets usage monthly
now = datetime.now()
if now.day <= 1:
reset = now.replace(day=1, hour=0, minute=0, second=0)
else:
next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1)
reset = next_month.replace(hour=0, minute=0, second=0)
usage["next_reset"] = reset.isoformat()
usage["days_until_reset"] = (reset - now).days
except Exception as e:
usage["error"] = str(e)
# Add run stats for context
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs")
row = cursor.fetchone()
conn.close()
usage["claude_runs_total"] = row[0]
usage["first_run"] = row[1]
usage["last_run"] = row[2]
return usage
# Auth state stored in memory (persists while container runs)
_auth_process = None
_auth_url = None
_auth_status = "unknown" # unknown | logged_in | logged_out | pending
async def _check_claude_auth() -> str:
"""Check if claude is authenticated by running claude auth status"""
global _auth_status
try:
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "status",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace").strip()
logger.info(f"claude auth status output: {output}")
try:
import json as _json
data = _json.loads(output)
if data.get("loggedIn"):
_auth_status = "logged_in"
return "logged_in"
except Exception:
pass
_auth_status = "logged_out"
return "logged_out"
except Exception as e:
logger.error(f"Auth check failed: {e}")
_auth_status = "unknown"
return "unknown"
@app.get("/api/auth/status")
async def auth_status():
"""Check Claude auth status using claude auth status JSON"""
global _auth_status, _auth_url
account = None
auth_method = None
try:
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "status",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace").strip()
import json as _json
data = _json.loads(output)
if data.get("loggedIn"):
_auth_status = "logged_in"
account = data.get("email") or data.get("account", {}).get("emailAddress")
auth_method = data.get("authMethod")
else:
_auth_status = "logged_out"
except Exception as e:
logger.error(f"Auth status check failed: {e}")
# Fall back to existing status
pass
return {
"status": _auth_status,
"account": account,
"auth_method": auth_method,
"auth_url": _auth_url if _auth_status == "pending" else None
}
@app.post("/api/auth/login")
async def auth_login():
"""Initiate Claude OAuth login - returns the URL to open in browser"""
global _auth_process, _auth_url, _auth_status
# Kill any existing auth process
if _auth_process and _auth_process.returncode is None:
try:
_auth_process.kill()
except Exception:
pass
_auth_status = "pending"
_auth_url = None
try:
# Run claude auth login --claudeai with stdin PIPE so we can send the code later
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "login", "--claudeai",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE
)
_auth_process = proc
# Read output looking for the OAuth URL
url = None
try:
async def _read_url():
collected = ""
while True:
line = await proc.stdout.readline()
if not line:
break
text = line.decode(errors="replace")
collected += text
logger.info(f"claude auth: {text.strip()}")
for part in text.split():
cleaned = part.strip().rstrip("\\n")
if cleaned.startswith("https://") and ("oauth" in cleaned or "claude.com" in cleaned):
return cleaned
logger.info(f"Full auth output: {collected}")
return None
url = await asyncio.wait_for(_read_url(), timeout=15)
except asyncio.TimeoutError:
logger.warning("Timed out waiting for auth URL")
if url:
_auth_url = url
return {
"status": "pending",
"auth_url": url,
"message": "Open this URL, authorize, then paste the code you receive back here."
}
else:
return {
"status": "error",
"message": "Could not extract login URL. Try: docker exec -it claude-persistent-agent claude auth login --claudeai"
}
except Exception as e:
logger.error(f"Auth login failed: {e}")
return {"status": "error", "message": str(e)}
class AuthCode(BaseModel):
code: str
@app.post("/api/auth/code")
async def auth_submit_code(payload: AuthCode):
"""Submit the OAuth authorization code from the browser callback"""
global _auth_process, _auth_status
if not _auth_process or _auth_process.returncode is not None:
return {"status": "error", "message": "No active login session. Click 'Login' first."}
try:
# Send the code to the waiting claude auth login process via stdin
code = payload.code.strip() + "\n"
_auth_process.stdin.write(code.encode())
await _auth_process.stdin.drain()
_auth_process.stdin.close()
# Wait for the process to finish
try:
stdout, _ = await asyncio.wait_for(_auth_process.communicate(), timeout=30)
output = stdout.decode(errors="replace") if stdout else ""
logger.info(f"Auth code result: {output}")
except asyncio.TimeoutError:
logger.warning("Auth process timed out after submitting code")
# Check if login succeeded
check_proc = await asyncio.create_subprocess_exec(
"claude", "auth", "status",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
check_stdout, _ = await asyncio.wait_for(check_proc.communicate(), timeout=10)
check_output = check_stdout.decode(errors="replace").strip()
try:
import json as _json
data = _json.loads(check_output)
if data.get("loggedIn"):
_auth_status = "logged_in"
return {"status": "logged_in", "message": "Successfully logged in!", "account": data.get("email")}
except Exception:
pass
_auth_status = "logged_out"
return {"status": "error", "message": f"Login may have failed. Auth status: {check_output}"}
except Exception as e:
logger.error(f"Auth code submission failed: {e}")
return {"status": "error", "message": str(e)}
@app.post("/api/auth/logout")
async def auth_logout():
"""Log out of Claude"""
global _auth_status
try:
proc = await asyncio.create_subprocess_exec(
"claude", "auth", "logout",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.wait_for(proc.communicate(), timeout=10)
_auth_status = "logged_out"
return {"status": "logged_out"}
except Exception as e:
return {"status": "error", "message": str(e)}
# ============ MCP Server Management ============
@app.get("/api/mcp/servers")
async def list_mcp_servers():
"""List configured MCP servers"""
try:
proc = await asyncio.create_subprocess_exec(
"claude", "mcp", "list",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace").strip()
# Parse the output — claude mcp list shows servers in a table or JSON
servers = []
if "No MCP servers configured" in output:
return {"servers": [], "raw": output}
# Try JSON parse first
try:
import json as _json
data = _json.loads(output)
if isinstance(data, list):
servers = data
elif isinstance(data, dict):
servers = list(data.values()) if data else []
except Exception:
# Parse text output line by line
for line in output.split("\n"):
line = line.strip()
if line and not line.startswith("-") and not line.startswith("Name"):
parts = line.split()
if len(parts) >= 1:
servers.append({"name": parts[0], "details": " ".join(parts[1:])})
return {"servers": servers, "raw": output}
except Exception as e:
return {"servers": [], "error": str(e)}
class McpServerAdd(BaseModel):
name: str
server_type: str = "sse" # sse | stdio
url: Optional[str] = None
command: Optional[str] = None
args: Optional[List[str]] = None
@app.post("/api/mcp/servers")
async def add_mcp_server(server: McpServerAdd):
"""Add an MCP server"""
try:
cmd = ["claude", "mcp", "add", server.name]
if server.server_type == "sse" and server.url:
cmd.extend(["--transport", "sse", server.url])
elif server.server_type == "stdio" and server.command:
cmd.extend(["--transport", "stdio", server.command])
if server.args:
cmd.extend(server.args)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
return {
"status": "ok" if proc.returncode == 0 else "error",
"message": output.strip(),
"name": server.name
}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.delete("/api/mcp/servers/{name}")
async def remove_mcp_server(name: str):
"""Remove an MCP server"""
try:
proc = await asyncio.create_subprocess_exec(
"claude", "mcp", "remove", name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode(errors="replace") + stderr.decode(errors="replace")
return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()}
except Exception as e:
return {"status": "error", "message": str(e)}
# Serve static frontend
app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)