""" Claude Persistent Agent - FastAPI Backend Main application with task scheduling and management """ from fastapi import FastAPI, WebSocket, HTTPException, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List, Dict, Any import asyncio import json import sqlite3 import subprocess import logging from datetime import datetime, timedelta import uuid from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import os # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI(title="Claude Persistent Agent", version="1.0.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database setup DB_PATH = Path("/app/data/tasks.db") LOGS_PATH = Path("/app/logs") TASKS_PATH = Path("/app/tasks") TOKEN_FILE = Path("/app/data/.claude_token") # persist token across restarts # Ensure directories exist DB_PATH.parent.mkdir(parents=True, exist_ok=True) LOGS_PATH.mkdir(parents=True, exist_ok=True) TASKS_PATH.mkdir(parents=True, exist_ok=True) class Task(BaseModel): id: Optional[str] = None name: str description: Optional[str] = None prompt: str schedule_type: str # "once", "recurring", "manual" schedule_value: Optional[str] = None # cron expression or ISO datetime enabled: bool = True created_at: Optional[str] = None last_run: Optional[str] = None next_run: Optional[str] = None status: str = "idle" # idle, running, completed, failed class TaskRun(BaseModel): task_id: str run_id: str status: str # running, completed, failed output: Optional[str] = None error: Optional[str] = None started_at: str completed_at: Optional[str] = None def init_db(): """Initialize SQLite database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, prompt TEXT NOT NULL, schedule_type TEXT NOT NULL, schedule_value TEXT, enabled BOOLEAN DEFAULT 1, created_at TEXT, last_run TEXT, next_run TEXT, status TEXT DEFAULT 'idle' ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS task_runs ( run_id TEXT PRIMARY KEY, task_id TEXT NOT NULL, status TEXT NOT NULL, output TEXT, error TEXT, started_at TEXT, completed_at TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) conn.commit() conn.close() def _get_claude_env(): """Get environment with auth tokens set for Claude CLI""" env = os.environ.copy() # Load saved token if it exists if TOKEN_FILE.exists(): try: saved = json.loads(TOKEN_FILE.read_text()) if saved.get("type") == "oauth_token" and saved.get("token"): env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] elif saved.get("type") == "api_key" and saved.get("token"): env["ANTHROPIC_API_KEY"] = saved["token"] except Exception as e: logger.error(f"Failed to load saved token: {e}") return env async def run_claude_task(task: Task, run_id: str): """Execute a Claude Code task""" started = datetime.now().isoformat() try: env = _get_claude_env() # Run Claude Code in print mode (non-interactive) process = await asyncio.create_subprocess_exec( "claude", "-p", "--allowedTools", "Bash,Read,Write,Edit", "--permission-mode", "auto", task.prompt, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) output = stdout.decode(errors="replace") if stdout else "" error = stderr.decode(errors="replace") if stderr else "" status = "completed" if process.returncode == 0 else "failed" # Save run result save_task_run(TaskRun( task_id=task.id, run_id=run_id, status=status, output=output, error=error if status == "failed" else None, started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, status) except asyncio.TimeoutError: save_task_run(TaskRun( task_id=task.id, run_id=run_id, status="failed", error="Task timeout (>5 minutes)", started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, "failed") except Exception as e: save_task_run(TaskRun( task_id=task.id, run_id=run_id, status="failed", error=str(e), started_at=started, completed_at=datetime.now().isoformat() )) update_task_status(task.id, "failed") def save_task(task: Task): """Save task to database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() if not task.id: task.id = str(uuid.uuid4()) if not task.created_at: task.created_at = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO tasks (id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task.id, task.name, task.description, task.prompt, task.schedule_type, task.schedule_value, task.enabled, task.created_at, task.status )) conn.commit() conn.close() return task def save_task_run(run: TaskRun): """Save task run result""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT INTO task_runs (run_id, task_id, status, output, error, started_at, completed_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( run.run_id, run.task_id, run.status, run.output, run.error, run.started_at, run.completed_at )) conn.commit() conn.close() def update_task_status(task_id: str, status: str): """Update task status""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?", (status, datetime.now().isoformat(), task_id)) conn.commit() conn.close() def get_task(task_id: str) -> Optional[Task]: """Get task from database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) row = cursor.fetchone() conn.close() if row: return Task( id=row[0], name=row[1], description=row[2], prompt=row[3], schedule_type=row[4], schedule_value=row[5], enabled=row[6], created_at=row[7], last_run=row[8], next_run=row[9], status=row[10] ) return None def get_all_tasks() -> List[Task]: """Get all tasks""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") rows = cursor.fetchall() conn.close() return [ Task( id=row[0], name=row[1], description=row[2], prompt=row[3], schedule_type=row[4], schedule_value=row[5], enabled=row[6], created_at=row[7], last_run=row[8], next_run=row[9], status=row[10] ) for row in rows ] # Initialize scheduler scheduler = BackgroundScheduler() def schedule_task(task: Task): """Schedule a task with APScheduler""" if not task.enabled or task.schedule_type == "manual": return if task.schedule_type == "recurring" and task.schedule_value: try: scheduler.add_job( run_task_job, CronTrigger.from_crontab(task.schedule_value), id=task.id, args=[task], replace_existing=True ) logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}") except Exception as e: logger.error(f"Failed to schedule task {task.name}: {e}") async def run_task_job(task: Task): """Background job to run a task""" run_id = str(uuid.uuid4()) update_task_status(task.id, "running") await run_claude_task(task, run_id) # ============ API Routes ============ @app.on_event("startup") async def startup(): """Initialize on startup""" init_db() scheduler.start() # Load saved token into environment on startup if TOKEN_FILE.exists(): try: saved = json.loads(TOKEN_FILE.read_text()) if saved.get("type") == "oauth_token" and saved.get("token"): os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"] logger.info("Loaded saved OAuth token") elif saved.get("type") == "api_key" and saved.get("token"): os.environ["ANTHROPIC_API_KEY"] = saved["token"] logger.info("Loaded saved API key") except Exception as e: logger.error(f"Failed to load saved token: {e}") # Schedule existing tasks for task in get_all_tasks(): if task.enabled: schedule_task(task) logger.info("Claude Persistent Agent started") @app.on_event("shutdown") async def shutdown(): """Cleanup on shutdown""" scheduler.shutdown() @app.get("/health") async def health(): """Health check endpoint""" return {"status": "healthy", "timestamp": datetime.now().isoformat()} @app.post("/api/tasks") async def create_task(task: Task) -> Task: """Create a new task""" saved_task = save_task(task) schedule_task(saved_task) return saved_task @app.get("/api/tasks") async def list_tasks() -> List[Task]: """List all tasks""" return get_all_tasks() @app.get("/api/tasks/{task_id}") async def get_task_endpoint(task_id: str) -> Task: """Get a specific task""" task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @app.put("/api/tasks/{task_id}") async def update_task_endpoint(task_id: str, task: Task) -> Task: """Update a task""" task.id = task_id saved_task = save_task(task) schedule_task(saved_task) return saved_task @app.delete("/api/tasks/{task_id}") async def delete_task_endpoint(task_id: str): """Delete a task""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) conn.commit() conn.close() if scheduler.get_job(task_id): scheduler.remove_job(task_id) return {"status": "deleted"} @app.post("/api/tasks/{task_id}/run") async def run_task_manual(task_id: str, background_tasks: BackgroundTasks): """Manually trigger a task""" task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") run_id = str(uuid.uuid4()) update_task_status(task_id, "running") background_tasks.add_task(run_claude_task, task, run_id) return {"run_id": run_id, "status": "started"} @app.get("/api/tasks/{task_id}/runs") async def get_task_runs(task_id: str) -> List[Dict]: """Get runs for a task""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute( "SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50", (task_id,) ) rows = cursor.fetchall() conn.close() return [ { "run_id": row[0], "task_id": row[1], "status": row[2], "output": row[3], "error": row[4], "started_at": row[5], "completed_at": row[6] } for row in rows ] @app.get("/api/system/info") async def system_info() -> Dict: """Get system information""" tasks = get_all_tasks() conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM task_runs") total_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='completed'") completed_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='failed'") failed_runs = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM task_runs WHERE status='running'") running_runs = cursor.fetchone()[0] conn.close() return { "app_name": "Claude Persistent Agent", "version": "1.0.0", "uptime": datetime.now().isoformat(), "scheduler_running": scheduler.running, "task_count": len(tasks), "total_runs": total_runs, "completed_runs": completed_runs, "failed_runs": failed_runs, "running_runs": running_runs, } @app.get("/api/system/usage") async def usage_stats() -> Dict: """Get Claude API usage stats from session files if available""" usage = { "models_used": [], "session_count": 0, "last_reset": None, "next_reset": None, "note": "Usage data sourced from ~/.claude session cache" } try: claude_dir = Path("/root/.claude") sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl")) usage["session_count"] = len(sessions) stats_file = claude_dir / "usage_stats.json" if stats_file.exists(): with open(stats_file) as f: saved = json.load(f) usage.update(saved) else: now = datetime.now() if now.day <= 1: reset = now.replace(day=1, hour=0, minute=0, second=0) else: next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1) reset = next_month.replace(hour=0, minute=0, second=0) usage["next_reset"] = reset.isoformat() usage["days_until_reset"] = (reset - now).days except Exception as e: usage["error"] = str(e) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs") row = cursor.fetchone() conn.close() usage["claude_runs_total"] = row[0] usage["first_run"] = row[1] usage["last_run"] = row[2] return usage # ============ Auth Endpoints ============ @app.get("/api/auth/status") async def auth_status(): """Check Claude auth status""" account = None auth_method = None status = "logged_out" has_saved_token = TOKEN_FILE.exists() token_type = None if has_saved_token: try: saved = json.loads(TOKEN_FILE.read_text()) token_type = saved.get("type") except Exception: pass try: env = _get_claude_env() proc = await asyncio.create_subprocess_exec( "claude", "auth", "status", "--json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace").strip() logger.info(f"claude auth status: {output}") data = json.loads(output) if data.get("loggedIn"): status = "logged_in" account = data.get("email") or data.get("account", {}).get("emailAddress") auth_method = data.get("authMethod") except Exception as e: logger.error(f"Auth status check failed: {e}") return { "status": status, "account": account, "auth_method": auth_method, "has_saved_token": has_saved_token, "token_type": token_type } class TokenSubmit(BaseModel): token: str token_type: str = "oauth_token" # "oauth_token" or "api_key" @app.post("/api/auth/token") async def auth_set_token(payload: TokenSubmit): """Save an auth token (OAuth setup token or API key)""" token = payload.token.strip() token_type = payload.token_type if not token: return {"status": "error", "message": "Token cannot be empty"} # Auto-detect token type if token.startswith("sk-ant-oat"): token_type = "oauth_token" elif token.startswith("sk-ant-api"): token_type = "api_key" # Save to file for persistence TOKEN_FILE.write_text(json.dumps({ "type": token_type, "token": token, "saved_at": datetime.now().isoformat() })) # Set in current process environment if token_type == "oauth_token": os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token os.environ.pop("ANTHROPIC_API_KEY", None) elif token_type == "api_key": os.environ["ANTHROPIC_API_KEY"] = token os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) # Verify auth works try: env = _get_claude_env() proc = await asyncio.create_subprocess_exec( "claude", "auth", "status", "--json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace").strip() data = json.loads(output) if data.get("loggedIn"): return { "status": "logged_in", "message": "Token saved and verified!", "account": data.get("email"), "auth_method": data.get("authMethod") } else: return { "status": "token_saved", "message": "Token saved but auth status shows not logged in. The token may still work for API calls.", "raw": output } except Exception as e: return { "status": "token_saved", "message": f"Token saved. Could not verify: {e}" } @app.post("/api/auth/logout") async def auth_logout(): """Clear saved auth token""" if TOKEN_FILE.exists(): TOKEN_FILE.unlink() os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None) os.environ.pop("ANTHROPIC_API_KEY", None) try: proc = await asyncio.create_subprocess_exec( "claude", "auth", "logout", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await asyncio.wait_for(proc.communicate(), timeout=10) except Exception: pass return {"status": "logged_out"} # ============ MCP Server Management ============ @app.get("/api/mcp/servers") async def list_mcp_servers(): """List configured MCP servers""" try: proc = await asyncio.create_subprocess_exec( "claude", "mcp", "list", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace").strip() servers = [] if "No MCP servers configured" in output: return {"servers": [], "raw": output} try: data = json.loads(output) if isinstance(data, list): servers = data elif isinstance(data, dict): servers = list(data.values()) if data else [] except Exception: for line in output.split("\n"): line = line.strip() if line and not line.startswith("-") and not line.startswith("Name"): parts = line.split() if len(parts) >= 1: servers.append({"name": parts[0], "details": " ".join(parts[1:])}) return {"servers": servers, "raw": output} except Exception as e: return {"servers": [], "error": str(e)} class McpServerAdd(BaseModel): name: str server_type: str = "sse" # sse | stdio url: Optional[str] = None command: Optional[str] = None args: Optional[List[str]] = None @app.post("/api/mcp/servers") async def add_mcp_server(server: McpServerAdd): """Add an MCP server""" try: cmd = ["claude", "mcp", "add", server.name] if server.server_type == "sse" and server.url: cmd.extend(["--transport", "sse", server.url]) elif server.server_type == "stdio" and server.command: cmd.extend(["--transport", "stdio", server.command]) if server.args: cmd.extend(server.args) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) output = stdout.decode(errors="replace") + stderr.decode(errors="replace") return { "status": "ok" if proc.returncode == 0 else "error", "message": output.strip(), "name": server.name } except Exception as e: return {"status": "error", "message": str(e)} @app.delete("/api/mcp/servers/{name}") async def remove_mcp_server(name: str): """Remove an MCP server""" try: proc = await asyncio.create_subprocess_exec( "claude", "mcp", "remove", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode(errors="replace") + stderr.decode(errors="replace") return {"status": "ok" if proc.returncode == 0 else "error", "message": output.strip()} except Exception as e: return {"status": "error", "message": str(e)} # Serve static frontend app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)