423 lines
11 KiB
Python
423 lines
11 KiB
Python
"""
|
|
Claude Persistent Agent - FastAPI Backend
|
|
Main application with task scheduling and management
|
|
"""
|
|
|
|
from fastapi import FastAPI, WebSocket, HTTPException, BackgroundTasks
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict, Any
|
|
import asyncio
|
|
import json
|
|
import sqlite3
|
|
import subprocess
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
import uuid
|
|
from pathlib import Path
|
|
import APScheduler
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
import os
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(title="Claude Persistent Agent", version="1.0.0")
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Database setup
|
|
DB_PATH = Path("/app/data/tasks.db")
|
|
LOGS_PATH = Path("/app/logs")
|
|
TASKS_PATH = Path("/app/tasks")
|
|
|
|
# Ensure directories exist
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
LOGS_PATH.mkdir(parents=True, exist_ok=True)
|
|
TASKS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class Task(BaseModel):
|
|
id: Optional[str] = None
|
|
name: str
|
|
description: Optional[str] = None
|
|
prompt: str
|
|
schedule_type: str # "once", "recurring", "manual"
|
|
schedule_value: Optional[str] = None # cron expression or ISO datetime
|
|
enabled: bool = True
|
|
created_at: Optional[str] = None
|
|
last_run: Optional[str] = None
|
|
next_run: Optional[str] = None
|
|
status: str = "idle" # idle, running, completed, failed
|
|
|
|
|
|
class TaskRun(BaseModel):
|
|
task_id: str
|
|
run_id: str
|
|
status: str # running, completed, failed
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
def init_db():
|
|
"""Initialize SQLite database"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
prompt TEXT NOT NULL,
|
|
schedule_type TEXT NOT NULL,
|
|
schedule_value TEXT,
|
|
enabled BOOLEAN DEFAULT 1,
|
|
created_at TEXT,
|
|
last_run TEXT,
|
|
next_run TEXT,
|
|
status TEXT DEFAULT 'idle'
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS task_runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
task_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
output TEXT,
|
|
error TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
FOREIGN KEY (task_id) REFERENCES tasks(id)
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
async def run_claude_task(task: Task, run_id: str):
|
|
"""Execute a Claude Code task"""
|
|
try:
|
|
# Save task prompt to temp file
|
|
prompt_file = TASKS_PATH / f"{run_id}.md"
|
|
prompt_file.write_text(task.prompt)
|
|
|
|
# Run Claude Code
|
|
process = await asyncio.create_subprocess_exec(
|
|
"claude",
|
|
"task",
|
|
str(prompt_file),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
|
|
|
|
output = stdout.decode() if stdout else ""
|
|
error = stderr.decode() if stderr else ""
|
|
|
|
# Save run result
|
|
save_task_run(TaskRun(
|
|
task_id=task.id,
|
|
run_id=run_id,
|
|
status="completed" if process.returncode == 0 else "failed",
|
|
output=output,
|
|
error=error if process.returncode != 0 else None,
|
|
started_at=datetime.now().isoformat(),
|
|
completed_at=datetime.now().isoformat()
|
|
))
|
|
|
|
# Update task
|
|
update_task_status(task.id, "completed")
|
|
|
|
except asyncio.TimeoutError:
|
|
save_task_run(TaskRun(
|
|
task_id=task.id,
|
|
run_id=run_id,
|
|
status="failed",
|
|
error="Task timeout (>5 minutes)",
|
|
started_at=datetime.now().isoformat(),
|
|
completed_at=datetime.now().isoformat()
|
|
))
|
|
update_task_status(task.id, "failed")
|
|
except Exception as e:
|
|
save_task_run(TaskRun(
|
|
task_id=task.id,
|
|
run_id=run_id,
|
|
status="failed",
|
|
error=str(e),
|
|
started_at=datetime.now().isoformat(),
|
|
completed_at=datetime.now().isoformat()
|
|
))
|
|
update_task_status(task.id, "failed")
|
|
|
|
|
|
def save_task(task: Task):
|
|
"""Save task to database"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
if not task.id:
|
|
task.id = str(uuid.uuid4())
|
|
|
|
if not task.created_at:
|
|
task.created_at = datetime.now().isoformat()
|
|
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO tasks
|
|
(id, name, description, prompt, schedule_type, schedule_value, enabled, created_at, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
task.id, task.name, task.description, task.prompt,
|
|
task.schedule_type, task.schedule_value, task.enabled,
|
|
task.created_at, task.status
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return task
|
|
|
|
|
|
def save_task_run(run: TaskRun):
|
|
"""Save task run result"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO task_runs
|
|
(run_id, task_id, status, output, error, started_at, completed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
run.run_id, run.task_id, run.status, run.output,
|
|
run.error, run.started_at, run.completed_at
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def update_task_status(task_id: str, status: str):
|
|
"""Update task status"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE tasks SET status = ?, last_run = ? WHERE id = ?",
|
|
(status, datetime.now().isoformat(), task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Task]:
|
|
"""Get task from database"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return Task(
|
|
id=row[0], name=row[1], description=row[2], prompt=row[3],
|
|
schedule_type=row[4], schedule_value=row[5], enabled=row[6],
|
|
created_at=row[7], last_run=row[8], next_run=row[9], status=row[10]
|
|
)
|
|
return None
|
|
|
|
|
|
def get_all_tasks() -> List[Task]:
|
|
"""Get all tasks"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [
|
|
Task(
|
|
id=row[0], name=row[1], description=row[2], prompt=row[3],
|
|
schedule_type=row[4], schedule_value=row[5], enabled=row[6],
|
|
created_at=row[7], last_run=row[8], next_run=row[9], status=row[10]
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
# Initialize scheduler
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
|
def schedule_task(task: Task):
|
|
"""Schedule a task with APScheduler"""
|
|
if not task.enabled or task.schedule_type == "manual":
|
|
return
|
|
|
|
if task.schedule_type == "recurring" and task.schedule_value:
|
|
try:
|
|
scheduler.add_job(
|
|
run_task_job,
|
|
CronTrigger.from_crontab(task.schedule_value),
|
|
id=task.id,
|
|
args=[task],
|
|
replace_existing=True
|
|
)
|
|
logger.info(f"Scheduled task {task.name} with cron: {task.schedule_value}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule task {task.name}: {e}")
|
|
|
|
|
|
async def run_task_job(task: Task):
|
|
"""Background job to run a task"""
|
|
run_id = str(uuid.uuid4())
|
|
update_task_status(task.id, "running")
|
|
await run_claude_task(task, run_id)
|
|
|
|
|
|
# ============ API Routes ============
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
"""Initialize on startup"""
|
|
init_db()
|
|
scheduler.start()
|
|
|
|
# Schedule existing tasks
|
|
for task in get_all_tasks():
|
|
if task.enabled:
|
|
schedule_task(task)
|
|
|
|
logger.info("Claude Persistent Agent started")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown():
|
|
"""Cleanup on shutdown"""
|
|
scheduler.shutdown()
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
|
|
|
|
|
|
@app.post("/api/tasks")
|
|
async def create_task(task: Task) -> Task:
|
|
"""Create a new task"""
|
|
saved_task = save_task(task)
|
|
schedule_task(saved_task)
|
|
return saved_task
|
|
|
|
|
|
@app.get("/api/tasks")
|
|
async def list_tasks() -> List[Task]:
|
|
"""List all tasks"""
|
|
return get_all_tasks()
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}")
|
|
async def get_task_endpoint(task_id: str) -> Task:
|
|
"""Get a specific task"""
|
|
task = get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return task
|
|
|
|
|
|
@app.put("/api/tasks/{task_id}")
|
|
async def update_task_endpoint(task_id: str, task: Task) -> Task:
|
|
"""Update a task"""
|
|
task.id = task_id
|
|
saved_task = save_task(task)
|
|
schedule_task(saved_task)
|
|
return saved_task
|
|
|
|
|
|
@app.delete("/api/tasks/{task_id}")
|
|
async def delete_task_endpoint(task_id: str):
|
|
"""Delete a task"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if scheduler.get_job(task_id):
|
|
scheduler.remove_job(task_id)
|
|
|
|
return {"status": "deleted"}
|
|
|
|
|
|
@app.post("/api/tasks/{task_id}/run")
|
|
async def run_task_manual(task_id: str, background_tasks: BackgroundTasks):
|
|
"""Manually trigger a task"""
|
|
task = get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
run_id = str(uuid.uuid4())
|
|
update_task_status(task_id, "running")
|
|
background_tasks.add_task(run_claude_task, task, run_id)
|
|
|
|
return {"run_id": run_id, "status": "started"}
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}/runs")
|
|
async def get_task_runs(task_id: str) -> List[Dict]:
|
|
"""Get runs for a task"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM task_runs WHERE task_id = ? ORDER BY started_at DESC LIMIT 50",
|
|
(task_id,)
|
|
)
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [
|
|
{
|
|
"run_id": row[0],
|
|
"task_id": row[1],
|
|
"status": row[2],
|
|
"output": row[3],
|
|
"error": row[4],
|
|
"started_at": row[5],
|
|
"completed_at": row[6]
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
|
|
@app.get("/api/system/info")
|
|
async def system_info() -> Dict:
|
|
"""Get system information"""
|
|
return {
|
|
"app_name": "Claude Persistent Agent",
|
|
"version": "1.0.0",
|
|
"uptime": datetime.now().isoformat(),
|
|
"scheduler_running": scheduler.running,
|
|
"task_count": len(get_all_tasks())
|
|
}
|
|
|
|
|
|
# Serve static frontend
|
|
app.mount("/", StaticFiles(directory="/app/frontend/dist", html=True), name="static")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|