This repository has been archived on 2026-04-05. You can view files and clone it, but cannot push or open issues or pull requests.
claude-persistent-agent/backend/main.py

1019 lines
33 KiB
Python

"""
Claude Persistent Agent - FastAPI Backend
Interfaces with a long-running interactive Claude Code process via stdin/stdout pipes.
"""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import asyncio
import json
import sqlite3
import subprocess
import logging
from datetime import datetime
import uuid
from pathlib import Path
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import os
import threading
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Claude Persistent Agent", version="3.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Paths
DB_PATH = Path("/app/data/tasks.db")
LOGS_PATH = Path("/app/logs")
TASKS_PATH = Path("/app/tasks")
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
LOGS_PATH.mkdir(parents=True, exist_ok=True)
TASKS_PATH.mkdir(parents=True, exist_ok=True)
# ============ Models ============
class Task(BaseModel):
id: Optional[str] = None
name: str
description: Optional[str] = None
prompt: str
schedule_type: str = "manual"
schedule_value: Optional[str] = None
enabled: bool = True
created_at: Optional[str] = None
last_run: Optional[str] = None
next_run: Optional[str] = None
status: str = "idle"
agent_tools: Optional[str] = None
agent_system_prompt: Optional[str] = None
agent_max_turns: Optional[int] = None
agent_permission_mode: str = "auto"
agent_timeout: int = 300
class TaskRun(BaseModel):
task_id: str
run_id: str
status: str
output: Optional[str] = None
error: Optional[str] = None
started_at: str
completed_at: Optional[str] = None
class ChatSend(BaseModel):
message: str
session_id: Optional[str] = None
class McpServerAdd(BaseModel):
name: str
server_type: str = "sse"
url: Optional[str] = None
command: Optional[str] = None
args: Optional[List[str]] = None
# ============ Database ============
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
prompt TEXT NOT NULL,
schedule_type TEXT NOT NULL,
schedule_value TEXT,
enabled BOOLEAN DEFAULT 1,
created_at TEXT,
last_run TEXT,
next_run TEXT,
status TEXT DEFAULT 'idle',
agent_tools TEXT,
agent_system_prompt TEXT,
agent_max_turns INTEGER,
agent_permission_mode TEXT DEFAULT 'auto',
agent_timeout INTEGER DEFAULT 300
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS task_runs (
run_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
status TEXT NOT NULL,
output TEXT,
error TEXT,
started_at TEXT,
completed_at TEXT,
FOREIGN KEY (task_id) REFERENCES tasks(id)
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
metadata TEXT
)
""")
# Migrations for any legacy columns
for col in ["agent_tools TEXT", "agent_system_prompt TEXT",
"agent_max_turns INTEGER", "agent_permission_mode TEXT DEFAULT 'auto'",
"agent_timeout INTEGER DEFAULT 300"]:
try:
c.execute(f"ALTER TABLE tasks ADD COLUMN {col}")
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def db_save_message(session_id: str, role: str, content: str, metadata: dict = None):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"INSERT INTO chat_messages (id, session_id, role, content, timestamp, metadata) VALUES (?,?,?,?,?,?)",
(str(uuid.uuid4()), session_id, role, content,
datetime.now().isoformat(), json.dumps(metadata) if metadata else None)
)
conn.commit()
conn.close()
def db_get_messages(session_id: str) -> list:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"SELECT id, role, content, timestamp, metadata FROM chat_messages WHERE session_id=? ORDER BY timestamp",
(session_id,)
)
rows = c.fetchall()
conn.close()
return [{"id": r[0], "role": r[1], "content": r[2], "timestamp": r[3],
"metadata": json.loads(r[4]) if r[4] else None} for r in rows]
def db_get_sessions() -> list:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
SELECT session_id,
MIN(timestamp) as started,
MAX(timestamp) as last_active,
COUNT(*) as message_count
FROM chat_messages
GROUP BY session_id
ORDER BY last_active DESC
""")
rows = c.fetchall()
conn.close()
return [{"session_id": r[0], "started": r[1], "last_active": r[2], "message_count": r[3]}
for r in rows]
# ============ Persistent Claude Process Manager ============
class ClaudeProcessManager:
"""
Manages a long-running `claude` interactive process.
Communicates via stdin/stdout using stream-json format.
The process is started once (after auth is confirmed) and kept alive.
Messages are sent as JSON lines to stdin; responses arrive as JSON lines from stdout.
Claude stream-json input format (one line per message):
{"type": "user", "message": {"role": "user", "content": "your prompt"}}
Claude stream-json output events (one line per event):
{"type": "system", "subtype": "init", "session_id": "...", ...}
{"type": "assistant", "message": {"role": "assistant", "content": [...]}}
{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}}
{"type": "result", "subtype": "success", "result": "...", "session_id": "..."}
{"type": "result", "subtype": "error_max_turns", ...}
"""
def __init__(self):
self._process: Optional[asyncio.subprocess.Process] = None
self._lock = asyncio.Lock()
self._response_buffer: List[str] = []
self._waiting_for_response = False
self._stdout_task: Optional[asyncio.Task] = None
self._current_session_id: Optional[str] = None
# Subscribers: session_id -> list of WebSocket queues
self._subscribers: Dict[str, List[asyncio.Queue]] = {}
# Global broadcast queues for all connected clients
self._broadcast_queues: List[asyncio.Queue] = []
self._is_ready = False
self._status = "not_started" # not_started, starting, ready, dead
@property
def status(self):
return self._status
@property
def session_id(self):
return self._current_session_id
async def start(self) -> bool:
"""Start the Claude interactive process."""
async with self._lock:
if self._process and self._process.returncode is None:
return True # Already running
self._status = "starting"
logger.info("Starting Claude interactive process...")
env = os.environ.copy()
# Build command: interactive claude with stream-json I/O
cmd = [
"claude",
"--output-format", "stream-json",
"--input-format", "stream-json",
"--verbose",
]
try:
self._process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
self._is_ready = False
self._status = "starting"
# Start stdout reader task
if self._stdout_task:
self._stdout_task.cancel()
self._stdout_task = asyncio.create_task(self._read_stdout())
# Wait for the init event (up to 15 seconds)
for _ in range(150):
await asyncio.sleep(0.1)
if self._is_ready:
break
if self._is_ready:
logger.info(f"Claude process ready, session: {self._current_session_id}")
self._status = "ready"
return True
else:
logger.warning("Claude process started but no init event yet - proceeding anyway")
self._status = "ready"
return True
except FileNotFoundError:
self._status = "dead"
logger.error("claude CLI not found in PATH")
return False
except Exception as e:
self._status = "dead"
logger.error(f"Failed to start Claude process: {e}")
return False
async def stop(self):
"""Stop the Claude process."""
if self._process:
try:
self._process.terminate()
await asyncio.wait_for(self._process.wait(), timeout=5)
except Exception:
self._process.kill()
self._process = None
self._status = "not_started"
self._is_ready = False
if self._stdout_task:
self._stdout_task.cancel()
self._stdout_task = None
async def restart(self) -> bool:
"""Restart the Claude process."""
await self.stop()
return await self.start()
async def _read_stdout(self):
"""Continuously read stdout from the Claude process and broadcast events."""
try:
while self._process and self._process.returncode is None:
line = await self._process.stdout.readline()
if not line:
break
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
try:
event = json.loads(line_str)
except json.JSONDecodeError:
logger.debug(f"Non-JSON stdout: {line_str}")
continue
await self._handle_event(event)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error reading Claude stdout: {e}")
finally:
if self._status == "ready":
self._status = "dead"
logger.warning("Claude process stdout ended")
# Notify all subscribers the process died
await self._broadcast({"type": "system", "subtype": "process_dead",
"message": "Claude process ended unexpectedly"})
async def _handle_event(self, event: dict):
"""Process an event from Claude's stdout."""
event_type = event.get("type", "")
if event_type == "system" and event.get("subtype") == "init":
self._current_session_id = event.get("session_id")
self._is_ready = True
logger.info(f"Claude session initialized: {self._current_session_id}")
elif event_type == "result":
# End of a response turn
session_id = event.get("session_id", self._current_session_id)
self._current_session_id = session_id
await self._broadcast(event)
async def _broadcast(self, event: dict):
"""Send event to all connected WebSocket queues."""
dead_queues = []
for q in self._broadcast_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
dead_queues.append(q)
for q in dead_queues:
if q in self._broadcast_queues:
self._broadcast_queues.remove(q)
def subscribe(self) -> asyncio.Queue:
"""Subscribe to Claude output events. Returns a queue."""
q = asyncio.Queue(maxsize=200)
self._broadcast_queues.append(q)
return q
def unsubscribe(self, q: asyncio.Queue):
"""Unsubscribe a queue."""
if q in self._broadcast_queues:
self._broadcast_queues.remove(q)
async def send_message(self, content: str) -> bool:
"""Send a message to the running Claude process."""
if not self._process or self._process.returncode is not None:
return False
if self._status != "ready":
return False
# Claude stream-json input format
msg = json.dumps({
"type": "user",
"message": {"role": "user", "content": content}
}) + "\n"
try:
self._process.stdin.write(msg.encode("utf-8"))
await self._process.stdin.drain()
return True
except Exception as e:
logger.error(f"Failed to send message to Claude: {e}")
return False
async def send_task_noninteractive(self, prompt: str, tools: str = None,
system_prompt: str = None,
permission_mode: str = "auto",
timeout: int = 300) -> dict:
"""
Run a one-shot task using a separate claude -p invocation
(so it doesn't interfere with the interactive session).
Returns {output, error, exit_code}.
"""
cmd = ["claude", "-p",
"--output-format", "json"]
if tools:
cmd.extend(["--allowedTools", tools])
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
if permission_mode:
cmd.extend(["--permission-mode", permission_mode])
cmd.append(prompt)
env = os.environ.copy()
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
stdout_str = stdout.decode("utf-8", errors="replace")
stderr_str = stderr.decode("utf-8", errors="replace")
result_text = ""
try:
data = json.loads(stdout_str)
result_text = data.get("result", stdout_str)
except json.JSONDecodeError:
result_text = stdout_str
return {
"output": result_text,
"error": stderr_str if proc.returncode != 0 else None,
"exit_code": proc.returncode
}
except asyncio.TimeoutError:
return {"output": None, "error": f"Task timed out after {timeout}s", "exit_code": -1}
except FileNotFoundError:
return {"output": None, "error": "claude CLI not found", "exit_code": -1}
except Exception as e:
return {"output": None, "error": str(e), "exit_code": -1}
# Global process manager instance
claude_mgr = ClaudeProcessManager()
# ============ Scheduler ============
scheduler = BackgroundScheduler()
def _schedule_task_sync(task_id: str):
"""Synchronous wrapper for scheduled tasks (runs in APScheduler thread)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_run_scheduled_task(task_id))
finally:
loop.close()
async def _run_scheduled_task(task_id: str):
"""Execute a scheduled task via the non-interactive claude -p path."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
row = c.fetchone()
conn.close()
if not row:
logger.error(f"Scheduled task {task_id} not found")
return
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
"enabled", "created_at", "last_run", "next_run", "status",
"agent_tools", "agent_system_prompt", "agent_max_turns",
"agent_permission_mode", "agent_timeout"]
task_data = dict(zip(cols, row))
task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields})
run_id = str(uuid.uuid4())
started = datetime.now().isoformat()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"INSERT INTO task_runs (run_id, task_id, status, started_at) VALUES (?,?,?,?)",
(run_id, task_id, "running", started)
)
c.execute("UPDATE tasks SET status='running', last_run=? WHERE id=?", (started, task_id))
conn.commit()
conn.close()
result = await claude_mgr.send_task_noninteractive(
prompt=task.prompt,
tools=task.agent_tools,
system_prompt=task.agent_system_prompt,
permission_mode=task.agent_permission_mode,
timeout=task.agent_timeout
)
completed = datetime.now().isoformat()
status = "completed" if result["exit_code"] == 0 else "failed"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"UPDATE task_runs SET status=?, output=?, error=?, completed_at=? WHERE run_id=?",
(status, result["output"], result["error"], completed, run_id)
)
c.execute("UPDATE tasks SET status='idle' WHERE id=?", (task_id,))
conn.commit()
conn.close()
logger.info(f"Task {task_id} ({task.name}) completed with status {status}")
# ============ Startup / Shutdown ============
@app.on_event("startup")
async def startup():
init_db()
scheduler.start()
_reload_schedules()
# Auto-start the Claude interactive process
asyncio.create_task(_auto_start_claude())
async def _auto_start_claude():
"""Try to start the Claude process on startup."""
await asyncio.sleep(2) # Let the server fully start first
ok = await claude_mgr.start()
if ok:
logger.info("Claude interactive process started on startup")
else:
logger.warning("Could not start Claude process on startup (auth may be needed)")
@app.on_event("shutdown")
async def shutdown():
scheduler.shutdown()
await claude_mgr.stop()
def _reload_schedules():
"""Load all enabled recurring tasks into the scheduler."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT id, schedule_type, schedule_value FROM tasks WHERE enabled=1")
rows = c.fetchall()
conn.close()
for row in rows:
task_id, stype, svalue = row
if stype == "recurring" and svalue:
try:
scheduler.add_job(
_schedule_task_sync,
CronTrigger.from_crontab(svalue),
args=[task_id],
id=f"task_{task_id}",
replace_existing=True,
misfire_grace_time=60
)
except Exception as e:
logger.error(f"Failed to schedule task {task_id}: {e}")
# ============ API Routes ============
@app.get("/api/health")
async def health():
return {
"status": "ok",
"claude_status": claude_mgr.status,
"claude_session_id": claude_mgr.session_id,
"version": "3.0.0"
}
# -------- Claude Process Control --------
@app.post("/api/claude/start")
async def start_claude():
"""Start the Claude interactive process."""
ok = await claude_mgr.start()
return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id}
@app.post("/api/claude/stop")
async def stop_claude():
"""Stop the Claude interactive process."""
await claude_mgr.stop()
return {"success": True, "status": claude_mgr.status}
@app.post("/api/claude/restart")
async def restart_claude():
"""Restart the Claude interactive process."""
ok = await claude_mgr.restart()
return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id}
@app.get("/api/claude/status")
async def claude_status():
"""Get current status of the Claude process."""
proc_alive = claude_mgr._process is not None and (
claude_mgr._process.returncode is None
)
return {
"status": claude_mgr.status,
"session_id": claude_mgr.session_id,
"process_alive": proc_alive
}
# -------- Chat --------
@app.get("/api/chat/sessions")
async def get_sessions():
return db_get_sessions()
@app.get("/api/chat/sessions/{session_id}/messages")
async def get_session_messages(session_id: str):
return db_get_messages(session_id)
@app.post("/api/chat/send")
async def send_chat(msg: ChatSend):
"""
Send a message to the running Claude process.
The session_id here is OUR tracking ID (stored in DB),
not necessarily Claude's internal session ID.
"""
if claude_mgr.status != "ready":
raise HTTPException(503, f"Claude process not ready (status: {claude_mgr.status}). "
"Use /api/claude/start to start it.")
session_id = msg.session_id or str(uuid.uuid4())
# Save user message to DB
db_save_message(session_id, "user", msg.message)
# Send to Claude process
ok = await claude_mgr.send_message(msg.message)
if not ok:
raise HTTPException(503, "Failed to send message to Claude process")
return {"session_id": session_id, "status": "sent"}
@app.websocket("/api/chat/ws")
async def chat_websocket(ws: WebSocket):
"""
WebSocket endpoint for real-time chat.
Client sends: {"message": "...", "session_id": "..."}
Server streams: Claude output events as JSON
"""
await ws.accept()
queue = claude_mgr.subscribe()
session_id = None
# Send current process status immediately
await ws.send_json({
"type": "system",
"subtype": "connected",
"claude_status": claude_mgr.status,
"claude_session_id": claude_mgr.session_id
})
async def send_from_queue():
"""Forward Claude events to the WebSocket client."""
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=30)
await ws.send_json(event)
except asyncio.TimeoutError:
# Send keepalive ping
try:
await ws.send_json({"type": "ping"})
except Exception:
break
except Exception:
break
send_task = asyncio.create_task(send_from_queue())
try:
while True:
data = await ws.receive_json()
action = data.get("action", "send")
if action == "send":
text = data.get("message", "")
session_id = data.get("session_id") or session_id or str(uuid.uuid4())
if not text:
continue
# Save user message
db_save_message(session_id, "user", text)
if claude_mgr.status != "ready":
await ws.send_json({
"type": "error",
"message": f"Claude process not ready (status: {claude_mgr.status})"
})
continue
ok = await claude_mgr.send_message(text)
if not ok:
await ws.send_json({
"type": "error",
"message": "Failed to send to Claude process"
})
else:
# Acknowledge
await ws.send_json({
"type": "user_ack",
"session_id": session_id,
"message": text
})
elif action == "start_claude":
ok = await claude_mgr.start()
await ws.send_json({
"type": "system",
"subtype": "start_result",
"success": ok,
"claude_status": claude_mgr.status
})
elif action == "restart_claude":
ok = await claude_mgr.restart()
await ws.send_json({
"type": "system",
"subtype": "restart_result",
"success": ok,
"claude_status": claude_mgr.status
})
elif action == "get_status":
await ws.send_json({
"type": "system",
"subtype": "status",
"claude_status": claude_mgr.status,
"claude_session_id": claude_mgr.session_id
})
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
send_task.cancel()
claude_mgr.unsubscribe(queue)
# Save assistant messages from buffer (handled via DB in handle_event for full impl)
logger.info(f"WebSocket disconnected, session: {session_id}")
# -------- Tasks --------
@app.get("/api/tasks")
async def list_tasks():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tasks ORDER BY created_at DESC")
rows = c.fetchall()
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
"enabled", "created_at", "last_run", "next_run", "status",
"agent_tools", "agent_system_prompt", "agent_max_turns",
"agent_permission_mode", "agent_timeout"]
conn.close()
return [dict(zip(cols, r)) for r in rows]
@app.post("/api/tasks")
async def create_task(task: Task):
task.id = str(uuid.uuid4())
task.created_at = datetime.now().isoformat()
task.status = "idle"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
INSERT INTO tasks (id, name, description, prompt, schedule_type, schedule_value,
enabled, created_at, status, agent_tools, agent_system_prompt,
agent_max_turns, agent_permission_mode, agent_timeout)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (task.id, task.name, task.description, task.prompt, task.schedule_type,
task.schedule_value, task.enabled, task.created_at, task.status,
task.agent_tools, task.agent_system_prompt, task.agent_max_turns,
task.agent_permission_mode, task.agent_timeout))
conn.commit()
conn.close()
if task.schedule_type == "recurring" and task.schedule_value and task.enabled:
try:
scheduler.add_job(
_schedule_task_sync,
CronTrigger.from_crontab(task.schedule_value),
args=[task.id],
id=f"task_{task.id}",
replace_existing=True,
misfire_grace_time=60
)
except Exception as e:
logger.error(f"Failed to schedule: {e}")
return task
@app.get("/api/tasks/{task_id}")
async def get_task(task_id: str):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
row = c.fetchone()
conn.close()
if not row:
raise HTTPException(404, "Task not found")
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
"enabled", "created_at", "last_run", "next_run", "status",
"agent_tools", "agent_system_prompt", "agent_max_turns",
"agent_permission_mode", "agent_timeout"]
return dict(zip(cols, row))
@app.put("/api/tasks/{task_id}")
async def update_task(task_id: str, task: Task):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
UPDATE tasks SET name=?, description=?, prompt=?, schedule_type=?,
schedule_value=?, enabled=?, agent_tools=?, agent_system_prompt=?,
agent_max_turns=?, agent_permission_mode=?, agent_timeout=?
WHERE id=?
""", (task.name, task.description, task.prompt, task.schedule_type,
task.schedule_value, task.enabled, task.agent_tools, task.agent_system_prompt,
task.agent_max_turns, task.agent_permission_mode, task.agent_timeout, task_id))
conn.commit()
conn.close()
# Reschedule
try:
scheduler.remove_job(f"task_{task_id}")
except Exception:
pass
if task.schedule_type == "recurring" and task.schedule_value and task.enabled:
try:
scheduler.add_job(
_schedule_task_sync,
CronTrigger.from_crontab(task.schedule_value),
args=[task_id],
id=f"task_{task_id}",
replace_existing=True,
misfire_grace_time=60
)
except Exception as e:
logger.error(f"Failed to reschedule: {e}")
return {"success": True}
@app.delete("/api/tasks/{task_id}")
async def delete_task(task_id: str):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
conn.close()
try:
scheduler.remove_job(f"task_{task_id}")
except Exception:
pass
return {"success": True}
@app.post("/api/tasks/{task_id}/run")
async def run_task_now(task_id: str, background_tasks: BackgroundTasks):
"""Run a task immediately (non-interactive, separate process)."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
row = c.fetchone()
conn.close()
if not row:
raise HTTPException(404, "Task not found")
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
"enabled", "created_at", "last_run", "next_run", "status",
"agent_tools", "agent_system_prompt", "agent_max_turns",
"agent_permission_mode", "agent_timeout"]
task_data = dict(zip(cols, row))
task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields})
background_tasks.add_task(_run_scheduled_task, task_id)
return {"success": True, "message": f"Task '{task.name}' started"}
@app.get("/api/tasks/{task_id}/runs")
async def get_task_runs(task_id: str):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM task_runs WHERE task_id=? ORDER BY started_at DESC LIMIT 20", (task_id,))
rows = c.fetchall()
conn.close()
cols = ["run_id", "task_id", "status", "output", "error", "started_at", "completed_at"]
return [dict(zip(cols, r)) for r in rows]
# -------- MCP Servers (config file based) --------
MCP_CONFIG_PATH = Path("/app/data/mcp_config.json")
def _load_mcp_config() -> dict:
if MCP_CONFIG_PATH.exists():
try:
return json.loads(MCP_CONFIG_PATH.read_text())
except Exception:
pass
return {"mcpServers": {}}
def _save_mcp_config(config: dict):
MCP_CONFIG_PATH.write_text(json.dumps(config, indent=2))
@app.get("/api/mcp/servers")
async def list_mcp_servers():
config = _load_mcp_config()
servers = []
for name, cfg in config.get("mcpServers", {}).items():
servers.append({"name": name, **cfg})
return servers
@app.post("/api/mcp/servers")
async def add_mcp_server(server: McpServerAdd):
config = _load_mcp_config()
entry = {"type": server.server_type}
if server.url:
entry["url"] = server.url
if server.command:
entry["command"] = server.command
if server.args:
entry["args"] = server.args
config.setdefault("mcpServers", {})[server.name] = entry
_save_mcp_config(config)
return {"success": True}
@app.delete("/api/mcp/servers/{name}")
async def remove_mcp_server(name: str):
config = _load_mcp_config()
config.get("mcpServers", {}).pop(name, None)
_save_mcp_config(config)
return {"success": True}
# -------- Static files / React SPA --------
# Try multiple possible frontend build output locations
STATIC_DIR = Path("/app/static")
if not STATIC_DIR.exists():
STATIC_DIR = Path("/app/frontend/dist")
# Mount /assets for Vite-built JS/CSS bundles
if STATIC_DIR.exists() and (STATIC_DIR / "assets").exists():
app.mount("/assets", StaticFiles(directory=str(STATIC_DIR / "assets")), name="assets")
@app.get("/")
async def serve_index():
index = STATIC_DIR / "index.html"
if index.exists():
return FileResponse(str(index))
return {"message": "Claude Persistent Agent API v3.0", "docs": "/docs"}
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve React SPA for all non-API routes. API and WS are handled above."""
if full_path.startswith("api"):
raise HTTPException(404)
# Try serving the file directly (e.g. vite.svg, favicon)
file_path = STATIC_DIR / full_path
if file_path.is_file():
return FileResponse(str(file_path))
# Fall back to index.html for SPA routing
index = STATIC_DIR / "index.html"
if index.exists():
return FileResponse(str(index))
raise HTTPException(404)