1005 lines
32 KiB
Python
1005 lines
32 KiB
Python
"""
|
|
Claude Persistent Agent - FastAPI Backend
|
|
Interfaces with a long-running interactive Claude Code process via stdin/stdout pipes.
|
|
"""
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict, Any
|
|
import asyncio
|
|
import json
|
|
import sqlite3
|
|
import subprocess
|
|
import logging
|
|
from datetime import datetime
|
|
import uuid
|
|
from pathlib import Path
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
import os
|
|
import threading
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(title="Claude Persistent Agent", version="3.0.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Paths
|
|
DB_PATH = Path("/app/data/tasks.db")
|
|
LOGS_PATH = Path("/app/logs")
|
|
TASKS_PATH = Path("/app/tasks")
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
LOGS_PATH.mkdir(parents=True, exist_ok=True)
|
|
TASKS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
# ============ Models ============
|
|
|
|
class Task(BaseModel):
|
|
id: Optional[str] = None
|
|
name: str
|
|
description: Optional[str] = None
|
|
prompt: str
|
|
schedule_type: str = "manual"
|
|
schedule_value: Optional[str] = None
|
|
enabled: bool = True
|
|
created_at: Optional[str] = None
|
|
last_run: Optional[str] = None
|
|
next_run: Optional[str] = None
|
|
status: str = "idle"
|
|
agent_tools: Optional[str] = None
|
|
agent_system_prompt: Optional[str] = None
|
|
agent_max_turns: Optional[int] = None
|
|
agent_permission_mode: str = "auto"
|
|
agent_timeout: int = 300
|
|
|
|
|
|
class TaskRun(BaseModel):
|
|
task_id: str
|
|
run_id: str
|
|
status: str
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
class ChatSend(BaseModel):
|
|
message: str
|
|
session_id: Optional[str] = None
|
|
|
|
|
|
class McpServerAdd(BaseModel):
|
|
name: str
|
|
server_type: str = "sse"
|
|
url: Optional[str] = None
|
|
command: Optional[str] = None
|
|
args: Optional[List[str]] = None
|
|
|
|
|
|
# ============ Database ============
|
|
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
|
|
c.execute("""
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
prompt TEXT NOT NULL,
|
|
schedule_type TEXT NOT NULL,
|
|
schedule_value TEXT,
|
|
enabled BOOLEAN DEFAULT 1,
|
|
created_at TEXT,
|
|
last_run TEXT,
|
|
next_run TEXT,
|
|
status TEXT DEFAULT 'idle',
|
|
agent_tools TEXT,
|
|
agent_system_prompt TEXT,
|
|
agent_max_turns INTEGER,
|
|
agent_permission_mode TEXT DEFAULT 'auto',
|
|
agent_timeout INTEGER DEFAULT 300
|
|
)
|
|
""")
|
|
|
|
c.execute("""
|
|
CREATE TABLE IF NOT EXISTS task_runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
task_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
output TEXT,
|
|
error TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
FOREIGN KEY (task_id) REFERENCES tasks(id)
|
|
)
|
|
""")
|
|
|
|
c.execute("""
|
|
CREATE TABLE IF NOT EXISTS chat_messages (
|
|
id TEXT PRIMARY KEY,
|
|
session_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL,
|
|
metadata TEXT
|
|
)
|
|
""")
|
|
|
|
# Migrations for any legacy columns
|
|
for col in ["agent_tools TEXT", "agent_system_prompt TEXT",
|
|
"agent_max_turns INTEGER", "agent_permission_mode TEXT DEFAULT 'auto'",
|
|
"agent_timeout INTEGER DEFAULT 300"]:
|
|
try:
|
|
c.execute(f"ALTER TABLE tasks ADD COLUMN {col}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def db_save_message(session_id: str, role: str, content: str, metadata: dict = None):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute(
|
|
"INSERT INTO chat_messages (id, session_id, role, content, timestamp, metadata) VALUES (?,?,?,?,?,?)",
|
|
(str(uuid.uuid4()), session_id, role, content,
|
|
datetime.now().isoformat(), json.dumps(metadata) if metadata else None)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def db_get_messages(session_id: str) -> list:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute(
|
|
"SELECT id, role, content, timestamp, metadata FROM chat_messages WHERE session_id=? ORDER BY timestamp",
|
|
(session_id,)
|
|
)
|
|
rows = c.fetchall()
|
|
conn.close()
|
|
return [{"id": r[0], "role": r[1], "content": r[2], "timestamp": r[3],
|
|
"metadata": json.loads(r[4]) if r[4] else None} for r in rows]
|
|
|
|
|
|
def db_get_sessions() -> list:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
SELECT session_id,
|
|
MIN(timestamp) as started,
|
|
MAX(timestamp) as last_active,
|
|
COUNT(*) as message_count
|
|
FROM chat_messages
|
|
GROUP BY session_id
|
|
ORDER BY last_active DESC
|
|
""")
|
|
rows = c.fetchall()
|
|
conn.close()
|
|
return [{"session_id": r[0], "started": r[1], "last_active": r[2], "message_count": r[3]}
|
|
for r in rows]
|
|
|
|
|
|
# ============ Persistent Claude Process Manager ============
|
|
|
|
class ClaudeProcessManager:
|
|
"""
|
|
Manages a long-running `claude` interactive process.
|
|
Communicates via stdin/stdout using stream-json format.
|
|
|
|
The process is started once (after auth is confirmed) and kept alive.
|
|
Messages are sent as JSON lines to stdin; responses arrive as JSON lines from stdout.
|
|
|
|
Claude stream-json input format (one line per message):
|
|
{"type": "user", "message": {"role": "user", "content": "your prompt"}}
|
|
|
|
Claude stream-json output events (one line per event):
|
|
{"type": "system", "subtype": "init", "session_id": "...", ...}
|
|
{"type": "assistant", "message": {"role": "assistant", "content": [...]}}
|
|
{"type": "content_block_delta", "delta": {"type": "text_delta", "text": "..."}}
|
|
{"type": "result", "subtype": "success", "result": "...", "session_id": "..."}
|
|
{"type": "result", "subtype": "error_max_turns", ...}
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._process: Optional[asyncio.subprocess.Process] = None
|
|
self._lock = asyncio.Lock()
|
|
self._response_buffer: List[str] = []
|
|
self._waiting_for_response = False
|
|
self._stdout_task: Optional[asyncio.Task] = None
|
|
self._current_session_id: Optional[str] = None
|
|
# Subscribers: session_id -> list of WebSocket queues
|
|
self._subscribers: Dict[str, List[asyncio.Queue]] = {}
|
|
# Global broadcast queues for all connected clients
|
|
self._broadcast_queues: List[asyncio.Queue] = []
|
|
self._is_ready = False
|
|
self._status = "not_started" # not_started, starting, ready, dead
|
|
|
|
@property
|
|
def status(self):
|
|
return self._status
|
|
|
|
@property
|
|
def session_id(self):
|
|
return self._current_session_id
|
|
|
|
async def start(self) -> bool:
|
|
"""Start the Claude interactive process."""
|
|
async with self._lock:
|
|
if self._process and self._process.returncode is None:
|
|
return True # Already running
|
|
|
|
self._status = "starting"
|
|
logger.info("Starting Claude interactive process...")
|
|
|
|
env = os.environ.copy()
|
|
|
|
# Build command: interactive claude with stream-json I/O
|
|
cmd = [
|
|
"claude",
|
|
"--output-format", "stream-json",
|
|
"--input-format", "stream-json",
|
|
"--verbose",
|
|
]
|
|
|
|
try:
|
|
self._process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=env
|
|
)
|
|
self._is_ready = False
|
|
self._status = "starting"
|
|
|
|
# Start stdout reader task
|
|
if self._stdout_task:
|
|
self._stdout_task.cancel()
|
|
self._stdout_task = asyncio.create_task(self._read_stdout())
|
|
|
|
# Wait for the init event (up to 15 seconds)
|
|
for _ in range(150):
|
|
await asyncio.sleep(0.1)
|
|
if self._is_ready:
|
|
break
|
|
|
|
if self._is_ready:
|
|
logger.info(f"Claude process ready, session: {self._current_session_id}")
|
|
self._status = "ready"
|
|
return True
|
|
else:
|
|
logger.warning("Claude process started but no init event yet - proceeding anyway")
|
|
self._status = "ready"
|
|
return True
|
|
|
|
except FileNotFoundError:
|
|
self._status = "dead"
|
|
logger.error("claude CLI not found in PATH")
|
|
return False
|
|
except Exception as e:
|
|
self._status = "dead"
|
|
logger.error(f"Failed to start Claude process: {e}")
|
|
return False
|
|
|
|
async def stop(self):
|
|
"""Stop the Claude process."""
|
|
if self._process:
|
|
try:
|
|
self._process.terminate()
|
|
await asyncio.wait_for(self._process.wait(), timeout=5)
|
|
except Exception:
|
|
self._process.kill()
|
|
self._process = None
|
|
self._status = "not_started"
|
|
self._is_ready = False
|
|
if self._stdout_task:
|
|
self._stdout_task.cancel()
|
|
self._stdout_task = None
|
|
|
|
async def restart(self) -> bool:
|
|
"""Restart the Claude process."""
|
|
await self.stop()
|
|
return await self.start()
|
|
|
|
async def _read_stdout(self):
|
|
"""Continuously read stdout from the Claude process and broadcast events."""
|
|
try:
|
|
while self._process and self._process.returncode is None:
|
|
line = await self._process.stdout.readline()
|
|
if not line:
|
|
break
|
|
line_str = line.decode("utf-8", errors="replace").strip()
|
|
if not line_str:
|
|
continue
|
|
|
|
try:
|
|
event = json.loads(line_str)
|
|
except json.JSONDecodeError:
|
|
logger.debug(f"Non-JSON stdout: {line_str}")
|
|
continue
|
|
|
|
await self._handle_event(event)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Error reading Claude stdout: {e}")
|
|
finally:
|
|
if self._status == "ready":
|
|
self._status = "dead"
|
|
logger.warning("Claude process stdout ended")
|
|
# Notify all subscribers the process died
|
|
await self._broadcast({"type": "system", "subtype": "process_dead",
|
|
"message": "Claude process ended unexpectedly"})
|
|
|
|
async def _handle_event(self, event: dict):
|
|
"""Process an event from Claude's stdout."""
|
|
event_type = event.get("type", "")
|
|
|
|
if event_type == "system" and event.get("subtype") == "init":
|
|
self._current_session_id = event.get("session_id")
|
|
self._is_ready = True
|
|
logger.info(f"Claude session initialized: {self._current_session_id}")
|
|
|
|
elif event_type == "result":
|
|
# End of a response turn
|
|
session_id = event.get("session_id", self._current_session_id)
|
|
self._current_session_id = session_id
|
|
|
|
await self._broadcast(event)
|
|
|
|
async def _broadcast(self, event: dict):
|
|
"""Send event to all connected WebSocket queues."""
|
|
dead_queues = []
|
|
for q in self._broadcast_queues:
|
|
try:
|
|
q.put_nowait(event)
|
|
except asyncio.QueueFull:
|
|
dead_queues.append(q)
|
|
for q in dead_queues:
|
|
if q in self._broadcast_queues:
|
|
self._broadcast_queues.remove(q)
|
|
|
|
def subscribe(self) -> asyncio.Queue:
|
|
"""Subscribe to Claude output events. Returns a queue."""
|
|
q = asyncio.Queue(maxsize=200)
|
|
self._broadcast_queues.append(q)
|
|
return q
|
|
|
|
def unsubscribe(self, q: asyncio.Queue):
|
|
"""Unsubscribe a queue."""
|
|
if q in self._broadcast_queues:
|
|
self._broadcast_queues.remove(q)
|
|
|
|
async def send_message(self, content: str) -> bool:
|
|
"""Send a message to the running Claude process."""
|
|
if not self._process or self._process.returncode is not None:
|
|
return False
|
|
if self._status != "ready":
|
|
return False
|
|
|
|
# Claude stream-json input format
|
|
msg = json.dumps({
|
|
"type": "user",
|
|
"message": {"role": "user", "content": content}
|
|
}) + "\n"
|
|
|
|
try:
|
|
self._process.stdin.write(msg.encode("utf-8"))
|
|
await self._process.stdin.drain()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send message to Claude: {e}")
|
|
return False
|
|
|
|
async def send_task_noninteractive(self, prompt: str, tools: str = None,
|
|
system_prompt: str = None,
|
|
permission_mode: str = "auto",
|
|
timeout: int = 300) -> dict:
|
|
"""
|
|
Run a one-shot task using a separate claude -p invocation
|
|
(so it doesn't interfere with the interactive session).
|
|
Returns {output, error, exit_code}.
|
|
"""
|
|
cmd = ["claude", "-p",
|
|
"--output-format", "json"]
|
|
|
|
if tools:
|
|
cmd.extend(["--allowedTools", tools])
|
|
if system_prompt:
|
|
cmd.extend(["--system-prompt", system_prompt])
|
|
if permission_mode:
|
|
cmd.extend(["--permission-mode", permission_mode])
|
|
|
|
cmd.append(prompt)
|
|
|
|
env = os.environ.copy()
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=env
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
stdout_str = stdout.decode("utf-8", errors="replace")
|
|
stderr_str = stderr.decode("utf-8", errors="replace")
|
|
|
|
result_text = ""
|
|
try:
|
|
data = json.loads(stdout_str)
|
|
result_text = data.get("result", stdout_str)
|
|
except json.JSONDecodeError:
|
|
result_text = stdout_str
|
|
|
|
return {
|
|
"output": result_text,
|
|
"error": stderr_str if proc.returncode != 0 else None,
|
|
"exit_code": proc.returncode
|
|
}
|
|
except asyncio.TimeoutError:
|
|
return {"output": None, "error": f"Task timed out after {timeout}s", "exit_code": -1}
|
|
except FileNotFoundError:
|
|
return {"output": None, "error": "claude CLI not found", "exit_code": -1}
|
|
except Exception as e:
|
|
return {"output": None, "error": str(e), "exit_code": -1}
|
|
|
|
|
|
# Global process manager instance
|
|
claude_mgr = ClaudeProcessManager()
|
|
|
|
|
|
# ============ Scheduler ============
|
|
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
|
def _schedule_task_sync(task_id: str):
|
|
"""Synchronous wrapper for scheduled tasks (runs in APScheduler thread)."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(_run_scheduled_task(task_id))
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _run_scheduled_task(task_id: str):
|
|
"""Execute a scheduled task via the non-interactive claude -p path."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
|
|
row = c.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
logger.error(f"Scheduled task {task_id} not found")
|
|
return
|
|
|
|
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
|
|
"enabled", "created_at", "last_run", "next_run", "status",
|
|
"agent_tools", "agent_system_prompt", "agent_max_turns",
|
|
"agent_permission_mode", "agent_timeout"]
|
|
task_data = dict(zip(cols, row))
|
|
task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields})
|
|
|
|
run_id = str(uuid.uuid4())
|
|
started = datetime.now().isoformat()
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute(
|
|
"INSERT INTO task_runs (run_id, task_id, status, started_at) VALUES (?,?,?,?)",
|
|
(run_id, task_id, "running", started)
|
|
)
|
|
c.execute("UPDATE tasks SET status='running', last_run=? WHERE id=?", (started, task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
result = await claude_mgr.send_task_noninteractive(
|
|
prompt=task.prompt,
|
|
tools=task.agent_tools,
|
|
system_prompt=task.agent_system_prompt,
|
|
permission_mode=task.agent_permission_mode,
|
|
timeout=task.agent_timeout
|
|
)
|
|
|
|
completed = datetime.now().isoformat()
|
|
status = "completed" if result["exit_code"] == 0 else "failed"
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute(
|
|
"UPDATE task_runs SET status=?, output=?, error=?, completed_at=? WHERE run_id=?",
|
|
(status, result["output"], result["error"], completed, run_id)
|
|
)
|
|
c.execute("UPDATE tasks SET status='idle' WHERE id=?", (task_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
logger.info(f"Task {task_id} ({task.name}) completed with status {status}")
|
|
|
|
|
|
# ============ Startup / Shutdown ============
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
init_db()
|
|
scheduler.start()
|
|
_reload_schedules()
|
|
# Auto-start the Claude interactive process
|
|
asyncio.create_task(_auto_start_claude())
|
|
|
|
|
|
async def _auto_start_claude():
|
|
"""Try to start the Claude process on startup."""
|
|
await asyncio.sleep(2) # Let the server fully start first
|
|
ok = await claude_mgr.start()
|
|
if ok:
|
|
logger.info("Claude interactive process started on startup")
|
|
else:
|
|
logger.warning("Could not start Claude process on startup (auth may be needed)")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown():
|
|
scheduler.shutdown()
|
|
await claude_mgr.stop()
|
|
|
|
|
|
def _reload_schedules():
|
|
"""Load all enabled recurring tasks into the scheduler."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT id, schedule_type, schedule_value FROM tasks WHERE enabled=1")
|
|
rows = c.fetchall()
|
|
conn.close()
|
|
|
|
for row in rows:
|
|
task_id, stype, svalue = row
|
|
if stype == "recurring" and svalue:
|
|
try:
|
|
scheduler.add_job(
|
|
_schedule_task_sync,
|
|
CronTrigger.from_crontab(svalue),
|
|
args=[task_id],
|
|
id=f"task_{task_id}",
|
|
replace_existing=True,
|
|
misfire_grace_time=60
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule task {task_id}: {e}")
|
|
|
|
|
|
# ============ API Routes ============
|
|
|
|
@app.get("/api/health")
|
|
async def health():
|
|
return {
|
|
"status": "ok",
|
|
"claude_status": claude_mgr.status,
|
|
"claude_session_id": claude_mgr.session_id,
|
|
"version": "3.0.0"
|
|
}
|
|
|
|
|
|
# -------- Claude Process Control --------
|
|
|
|
@app.post("/api/claude/start")
|
|
async def start_claude():
|
|
"""Start the Claude interactive process."""
|
|
ok = await claude_mgr.start()
|
|
return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id}
|
|
|
|
|
|
@app.post("/api/claude/stop")
|
|
async def stop_claude():
|
|
"""Stop the Claude interactive process."""
|
|
await claude_mgr.stop()
|
|
return {"success": True, "status": claude_mgr.status}
|
|
|
|
|
|
@app.post("/api/claude/restart")
|
|
async def restart_claude():
|
|
"""Restart the Claude interactive process."""
|
|
ok = await claude_mgr.restart()
|
|
return {"success": ok, "status": claude_mgr.status, "session_id": claude_mgr.session_id}
|
|
|
|
|
|
@app.get("/api/claude/status")
|
|
async def claude_status():
|
|
"""Get current status of the Claude process."""
|
|
proc_alive = claude_mgr._process is not None and (
|
|
claude_mgr._process.returncode is None
|
|
)
|
|
return {
|
|
"status": claude_mgr.status,
|
|
"session_id": claude_mgr.session_id,
|
|
"process_alive": proc_alive
|
|
}
|
|
|
|
|
|
# -------- Chat --------
|
|
|
|
@app.get("/api/chat/sessions")
|
|
async def get_sessions():
|
|
return db_get_sessions()
|
|
|
|
|
|
@app.get("/api/chat/sessions/{session_id}/messages")
|
|
async def get_session_messages(session_id: str):
|
|
return db_get_messages(session_id)
|
|
|
|
|
|
@app.post("/api/chat/send")
|
|
async def send_chat(msg: ChatSend):
|
|
"""
|
|
Send a message to the running Claude process.
|
|
The session_id here is OUR tracking ID (stored in DB),
|
|
not necessarily Claude's internal session ID.
|
|
"""
|
|
if claude_mgr.status != "ready":
|
|
raise HTTPException(503, f"Claude process not ready (status: {claude_mgr.status}). "
|
|
"Use /api/claude/start to start it.")
|
|
|
|
session_id = msg.session_id or str(uuid.uuid4())
|
|
|
|
# Save user message to DB
|
|
db_save_message(session_id, "user", msg.message)
|
|
|
|
# Send to Claude process
|
|
ok = await claude_mgr.send_message(msg.message)
|
|
if not ok:
|
|
raise HTTPException(503, "Failed to send message to Claude process")
|
|
|
|
return {"session_id": session_id, "status": "sent"}
|
|
|
|
|
|
@app.websocket("/api/chat/ws")
|
|
async def chat_websocket(ws: WebSocket):
|
|
"""
|
|
WebSocket endpoint for real-time chat.
|
|
Client sends: {"message": "...", "session_id": "..."}
|
|
Server streams: Claude output events as JSON
|
|
"""
|
|
await ws.accept()
|
|
queue = claude_mgr.subscribe()
|
|
session_id = None
|
|
|
|
# Send current process status immediately
|
|
await ws.send_json({
|
|
"type": "system",
|
|
"subtype": "connected",
|
|
"claude_status": claude_mgr.status,
|
|
"claude_session_id": claude_mgr.session_id
|
|
})
|
|
|
|
async def send_from_queue():
|
|
"""Forward Claude events to the WebSocket client."""
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(queue.get(), timeout=30)
|
|
await ws.send_json(event)
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive ping
|
|
try:
|
|
await ws.send_json({"type": "ping"})
|
|
except Exception:
|
|
break
|
|
except Exception:
|
|
break
|
|
|
|
send_task = asyncio.create_task(send_from_queue())
|
|
|
|
try:
|
|
while True:
|
|
data = await ws.receive_json()
|
|
action = data.get("action", "send")
|
|
|
|
if action == "send":
|
|
text = data.get("message", "")
|
|
session_id = data.get("session_id") or session_id or str(uuid.uuid4())
|
|
|
|
if not text:
|
|
continue
|
|
|
|
# Save user message
|
|
db_save_message(session_id, "user", text)
|
|
|
|
if claude_mgr.status != "ready":
|
|
await ws.send_json({
|
|
"type": "error",
|
|
"message": f"Claude process not ready (status: {claude_mgr.status})"
|
|
})
|
|
continue
|
|
|
|
ok = await claude_mgr.send_message(text)
|
|
if not ok:
|
|
await ws.send_json({
|
|
"type": "error",
|
|
"message": "Failed to send to Claude process"
|
|
})
|
|
else:
|
|
# Acknowledge
|
|
await ws.send_json({
|
|
"type": "user_ack",
|
|
"session_id": session_id,
|
|
"message": text
|
|
})
|
|
|
|
elif action == "start_claude":
|
|
ok = await claude_mgr.start()
|
|
await ws.send_json({
|
|
"type": "system",
|
|
"subtype": "start_result",
|
|
"success": ok,
|
|
"claude_status": claude_mgr.status
|
|
})
|
|
|
|
elif action == "restart_claude":
|
|
ok = await claude_mgr.restart()
|
|
await ws.send_json({
|
|
"type": "system",
|
|
"subtype": "restart_result",
|
|
"success": ok,
|
|
"claude_status": claude_mgr.status
|
|
})
|
|
|
|
elif action == "get_status":
|
|
await ws.send_json({
|
|
"type": "system",
|
|
"subtype": "status",
|
|
"claude_status": claude_mgr.status,
|
|
"claude_session_id": claude_mgr.session_id
|
|
})
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
finally:
|
|
send_task.cancel()
|
|
claude_mgr.unsubscribe(queue)
|
|
|
|
# Save assistant messages from buffer (handled via DB in handle_event for full impl)
|
|
logger.info(f"WebSocket disconnected, session: {session_id}")
|
|
|
|
|
|
# -------- Tasks --------
|
|
|
|
@app.get("/api/tasks")
|
|
async def list_tasks():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM tasks ORDER BY created_at DESC")
|
|
rows = c.fetchall()
|
|
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
|
|
"enabled", "created_at", "last_run", "next_run", "status",
|
|
"agent_tools", "agent_system_prompt", "agent_max_turns",
|
|
"agent_permission_mode", "agent_timeout"]
|
|
conn.close()
|
|
return [dict(zip(cols, r)) for r in rows]
|
|
|
|
|
|
@app.post("/api/tasks")
|
|
async def create_task(task: Task):
|
|
task.id = str(uuid.uuid4())
|
|
task.created_at = datetime.now().isoformat()
|
|
task.status = "idle"
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
INSERT INTO tasks (id, name, description, prompt, schedule_type, schedule_value,
|
|
enabled, created_at, status, agent_tools, agent_system_prompt,
|
|
agent_max_turns, agent_permission_mode, agent_timeout)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""", (task.id, task.name, task.description, task.prompt, task.schedule_type,
|
|
task.schedule_value, task.enabled, task.created_at, task.status,
|
|
task.agent_tools, task.agent_system_prompt, task.agent_max_turns,
|
|
task.agent_permission_mode, task.agent_timeout))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if task.schedule_type == "recurring" and task.schedule_value and task.enabled:
|
|
try:
|
|
scheduler.add_job(
|
|
_schedule_task_sync,
|
|
CronTrigger.from_crontab(task.schedule_value),
|
|
args=[task.id],
|
|
id=f"task_{task.id}",
|
|
replace_existing=True,
|
|
misfire_grace_time=60
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule: {e}")
|
|
|
|
return task
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}")
|
|
async def get_task(task_id: str):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
|
|
row = c.fetchone()
|
|
conn.close()
|
|
if not row:
|
|
raise HTTPException(404, "Task not found")
|
|
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
|
|
"enabled", "created_at", "last_run", "next_run", "status",
|
|
"agent_tools", "agent_system_prompt", "agent_max_turns",
|
|
"agent_permission_mode", "agent_timeout"]
|
|
return dict(zip(cols, row))
|
|
|
|
|
|
@app.put("/api/tasks/{task_id}")
|
|
async def update_task(task_id: str, task: Task):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
UPDATE tasks SET name=?, description=?, prompt=?, schedule_type=?,
|
|
schedule_value=?, enabled=?, agent_tools=?, agent_system_prompt=?,
|
|
agent_max_turns=?, agent_permission_mode=?, agent_timeout=?
|
|
WHERE id=?
|
|
""", (task.name, task.description, task.prompt, task.schedule_type,
|
|
task.schedule_value, task.enabled, task.agent_tools, task.agent_system_prompt,
|
|
task.agent_max_turns, task.agent_permission_mode, task.agent_timeout, task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Reschedule
|
|
try:
|
|
scheduler.remove_job(f"task_{task_id}")
|
|
except Exception:
|
|
pass
|
|
if task.schedule_type == "recurring" and task.schedule_value and task.enabled:
|
|
try:
|
|
scheduler.add_job(
|
|
_schedule_task_sync,
|
|
CronTrigger.from_crontab(task.schedule_value),
|
|
args=[task_id],
|
|
id=f"task_{task_id}",
|
|
replace_existing=True,
|
|
misfire_grace_time=60
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to reschedule: {e}")
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@app.delete("/api/tasks/{task_id}")
|
|
async def delete_task(task_id: str):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("DELETE FROM tasks WHERE id=?", (task_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
try:
|
|
scheduler.remove_job(f"task_{task_id}")
|
|
except Exception:
|
|
pass
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/api/tasks/{task_id}/run")
|
|
async def run_task_now(task_id: str, background_tasks: BackgroundTasks):
|
|
"""Run a task immediately (non-interactive, separate process)."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
|
|
row = c.fetchone()
|
|
conn.close()
|
|
if not row:
|
|
raise HTTPException(404, "Task not found")
|
|
|
|
cols = ["id", "name", "description", "prompt", "schedule_type", "schedule_value",
|
|
"enabled", "created_at", "last_run", "next_run", "status",
|
|
"agent_tools", "agent_system_prompt", "agent_max_turns",
|
|
"agent_permission_mode", "agent_timeout"]
|
|
task_data = dict(zip(cols, row))
|
|
task = Task(**{k: v for k, v in task_data.items() if k in Task.model_fields})
|
|
|
|
background_tasks.add_task(_run_scheduled_task, task_id)
|
|
return {"success": True, "message": f"Task '{task.name}' started"}
|
|
|
|
|
|
@app.get("/api/tasks/{task_id}/runs")
|
|
async def get_task_runs(task_id: str):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM task_runs WHERE task_id=? ORDER BY started_at DESC LIMIT 20", (task_id,))
|
|
rows = c.fetchall()
|
|
conn.close()
|
|
cols = ["run_id", "task_id", "status", "output", "error", "started_at", "completed_at"]
|
|
return [dict(zip(cols, r)) for r in rows]
|
|
|
|
|
|
# -------- MCP Servers (config file based) --------
|
|
|
|
MCP_CONFIG_PATH = Path("/app/data/mcp_config.json")
|
|
|
|
|
|
def _load_mcp_config() -> dict:
|
|
if MCP_CONFIG_PATH.exists():
|
|
try:
|
|
return json.loads(MCP_CONFIG_PATH.read_text())
|
|
except Exception:
|
|
pass
|
|
return {"mcpServers": {}}
|
|
|
|
|
|
def _save_mcp_config(config: dict):
|
|
MCP_CONFIG_PATH.write_text(json.dumps(config, indent=2))
|
|
|
|
|
|
@app.get("/api/mcp/servers")
|
|
async def list_mcp_servers():
|
|
config = _load_mcp_config()
|
|
servers = []
|
|
for name, cfg in config.get("mcpServers", {}).items():
|
|
servers.append({"name": name, **cfg})
|
|
return servers
|
|
|
|
|
|
@app.post("/api/mcp/servers")
|
|
async def add_mcp_server(server: McpServerAdd):
|
|
config = _load_mcp_config()
|
|
entry = {"type": server.server_type}
|
|
if server.url:
|
|
entry["url"] = server.url
|
|
if server.command:
|
|
entry["command"] = server.command
|
|
if server.args:
|
|
entry["args"] = server.args
|
|
config.setdefault("mcpServers", {})[server.name] = entry
|
|
_save_mcp_config(config)
|
|
return {"success": True}
|
|
|
|
|
|
@app.delete("/api/mcp/servers/{name}")
|
|
async def remove_mcp_server(name: str):
|
|
config = _load_mcp_config()
|
|
config.get("mcpServers", {}).pop(name, None)
|
|
_save_mcp_config(config)
|
|
return {"success": True}
|
|
|
|
|
|
# -------- Static files / React SPA --------
|
|
|
|
# Try multiple possible frontend build output locations
|
|
STATIC_DIR = Path("/app/static")
|
|
if not STATIC_DIR.exists():
|
|
STATIC_DIR = Path("/app/frontend/dist")
|
|
|
|
|
|
@app.get("/")
|
|
async def serve_index():
|
|
index = STATIC_DIR / "index.html"
|
|
if index.exists():
|
|
return FileResponse(str(index))
|
|
return {"message": "Claude Persistent Agent API v3.0", "docs": "/docs"}
|
|
|
|
|
|
# Mount static assets AFTER all API/WS routes are registered
|
|
# This ensures /api/* and WebSocket routes take priority
|
|
if STATIC_DIR.exists():
|
|
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="spa")
|