From 34998a5ef1d77743978f6e75164b5a5930136753 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sat, 4 Apr 2026 22:40:49 -0400 Subject: [PATCH] Add backend/main.py --- backend/main.py | 423 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 backend/main.py diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000..5cab2f8 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,423 @@ +""" +Claude Persistent Agent - FastAPI Backend +Main application with task scheduling and management +""" + +from fastapi import FastAPI, WebSocket, HTTPException, BackgroundTasks +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Optional, List, Dict, Any +import asyncio +import json +import sqlite3 +import subprocess +import logging +from datetime import datetime, timedelta +import uuid +from pathlib import Path +import APScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +import os + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI(title="Claude Persistent Agent", version="1.0.0") + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Database setup +DB_PATH = Path("/app/data/tasks.db") +LOGS_PATH = Path("/app/logs") +TASKS_PATH = Path("/app/tasks") + +# Ensure directories exist +DB_PATH.parent.mkdir(parents=True, exist_ok=True) +LOGS_PATH.mkdir(parents=True, exist_ok=True) +TASKS_PATH.mkdir(parents=True, exist_ok=True) + + +class Task(BaseModel): + id: Optional[str] = None + name: str + description: Optional[str] = None + prompt: str + schedule_type: str # "once", "recurring", "manual" + schedule_value: Optional[str] = None # cron expression or ISO datetime + enabled: bool = True + created_at: Optional[str] = None + last_run: Optional[str] = None + next_run: Optional[str] = None + status: str = "idle" # idle, running, completed, failed + + +class TaskRun(BaseModel): + task_id: str + run_id: str + status: str # running, completed, failed + output: Optional[str] = None + error: Optional[str] = None + started_at: str + completed_at: Optional[str] = None + + +def init_db(): + """Initialize SQLite database""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + prompt TEXT NOT NULL, + schedule_type TEXT NOT NULL, + schedule_value TEXT, + enabled BOOLEAN DEFAULT 1, + created_at TEXT, + last_run TEXT, + next_run TEXT, + status TEXT DEFAULT 'idle' + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS task_runs ( + run_id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + status TEXT NOT NULL, + output TEXT, + error TEXT, + started_at TEXT, + completed_at TEXT, + FOREIGN KEY (task_id) REFERENCES tasks(id) + ) + """) + + conn.commit() + conn.close() + + +async def run_claude_task(task: Task, run_id: str): + """Execute a Claude Code task""" + try: + # Save task prompt to temp file + prompt_file = TASKS_PATH / f"{run_id}.md" + prompt_file.write_text(task.prompt) + + # Run Claude Code + process = await asyncio.create_subprocess_exec( + "claude", + "task", + str(prompt_file), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) + + output = stdout.decode() if stdout else "" + error = stderr.decode() if stderr else "" + + # Save run result + save_task_run(TaskRun( + task_id=task.id, + run_id=run_id, + status="completed" if process.returncode == 0 else "failed", + output=output, + error=error if process.returncode != 0 else None, + started_at=datetime.now().isoformat(), + completed_at=datetime.now().isoformat() + )) + + # Update task + update_task_status(task.id, "completed") + + except asyncio.TimeoutError: + save_task_run(TaskRun( + task_id=task.id, + run_id=run_id, + status="failed", + error="Task timeout (>5 minutes)", + started_at=datetime.now().isoformat(), + completed_at=datetime.now().isoformat() + )) + update_task_status(task.id, "failed") + except Exception as e: + save_task_run(TaskRun( + task_id=task.id, + run_id=run_id, + status="failed", + error=str(e), + started_at=datetime.now().isoformat(), + completed_at=datetime.now().isoformat() + )) + update_task_status(task.id, "failed") + + +def save_task(task: Task): + """Save task to database""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + if not task.id: + task.id = str(uuid.uuid4()) + + if not task.created_at: + task.created_at = datetime.now().isoformat() + + cursor.execute(""" + INSERT OR REPLACE INTO tasks + (id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + task.id, task.name, task.description, task.prompt, + task.schedule_type, task.schedule_value, task.enabled, + task.created_at, task.status + )) + + conn.commit() + conn.close() + return task + + +def save_task_run(run: TaskRun): + """Save task run result""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + INSERT INTO task_runs + (run_id, task_id, status, output, error, started_at, completed_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + run.run_id, run.task_id, run.status, run.output, + run.error, run.started_at, run.completed_at + )) + + conn.commit() + conn.close() + + +def update_task_status(task_id: str, status: str): + """Update task status""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?", + (status, datetime.now().isoformat(), task_id)) + conn.commit() + conn.close() + + +def get_task(task_id: str) -> Optional[Task]: + """Get task from database""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) + row = cursor.fetchone() + conn.close() + + if row: + return Task( + id=row[0], name=row[1], description=row[2], prompt=row[3], + schedule_type=row[4], schedule_value=row[5], enabled=row[6], + created_at=row[7], last_run=row[8], next_run=row[9], status=row[10] + ) + return None + + +def get_all_tasks() -> List[Task]: + """Get all tasks""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") + rows = cursor.fetchall() + conn.close() + + return [ + Task( + id=row[0], name=row[1], description=row[2], prompt=row[3], + schedule_type=row[4], schedule_value=row[5], enabled=row[6], + created_at=row[7], last_run=row[8], next_run=row[9], status=row[10] + ) + for row in rows + ] + + +# Initialize scheduler +scheduler = BackgroundScheduler() + + +def schedule_task(task: Task): + """Schedule a task with APScheduler""" + if not task.enabled or task.schedule_type == "manual": + return + + if task.schedule_type == "recurring" and task.schedule_value: + try: + scheduler.add_job( + run_task_job, + CronTrigger.from_crontab(task.schedule_value), + id=task.id, + args=[task], + replace_existing=True + ) + logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}") + except Exception as e: + logger.error(f"Failed to schedule task {task.name}: {e}") + + +async def run_task_job(task: Task): + """Background job to run a task""" + run_id = str(uuid.uuid4()) + update_task_status(task.id, "running") + await run_claude_task(task, run_id) + + +# ============ API Routes ============ + +@app.on_event("startup") +async def startup(): + """Initialize on startup""" + init_db() + scheduler.start() + + # Schedule existing tasks + for task in get_all_tasks(): + if task.enabled: + schedule_task(task) + + logger.info("Claude Persistent Agent started") + + +@app.on_event("shutdown") +async def shutdown(): + """Cleanup on shutdown""" + scheduler.shutdown() + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return {"status": "healthy", "timestamp": datetime.now().isoformat()} + + +@app.post("/api/tasks") +async def create_task(task: Task) -> Task: + """Create a new task""" + saved_task = save_task(task) + schedule_task(saved_task) + return saved_task + + +@app.get("/api/tasks") +async def list_tasks() -> List[Task]: + """List all tasks""" + return get_all_tasks() + + +@app.get("/api/tasks/{task_id}") +async def get_task_endpoint(task_id: str) -> Task: + """Get a specific task""" + task = get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + return task + + +@app.put("/api/tasks/{task_id}") +async def update_task_endpoint(task_id: str, task: Task) -> Task: + """Update a task""" + task.id = task_id + saved_task = save_task(task) + schedule_task(saved_task) + return saved_task + + +@app.delete("/api/tasks/{task_id}") +async def delete_task_endpoint(task_id: str): + """Delete a task""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) + conn.commit() + conn.close() + + if scheduler.get_job(task_id): + scheduler.remove_job(task_id) + + return {"status": "deleted"} + + +@app.post("/api/tasks/{task_id}/run") +async def run_task_manual(task_id: str, background_tasks: BackgroundTasks): + """Manually trigger a task""" + task = get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + run_id = str(uuid.uuid4()) + update_task_status(task_id, "running") + background_tasks.add_task(run_claude_task, task, run_id) + + return {"run_id": run_id, "status": "started"} + + +@app.get("/api/tasks/{task_id}/runs") +async def get_task_runs(task_id: str) -> List[Dict]: + """Get runs for a task""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50", + (task_id,) + ) + rows = cursor.fetchall() + conn.close() + + return [ + { + "run_id": row[0], + "task_id": row[1], + "status": row[2], + "output": row[3], + "error": row[4], + "started_at": row[5], + "completed_at": row[6] + } + for row in rows + ] + + +@app.get("/api/system/info") +async def system_info() -> Dict: + """Get system information""" + return { + "app_name": "Claude Persistent Agent", + "version": "1.0.0", + "uptime": datetime.now().isoformat(), + "scheduler_running": scheduler.running, + "task_count": len(get_all_tasks()) + } + + +# Serve static frontend +app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static") + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000)