fix: replace broken OAuth login with token-based auth
This commit is contained in:
parent
379d612388
commit
0046db1849
1 changed files with 136 additions and 182 deletions
318
backend/main.py
318
backend/main.py
|
|
@ -41,6 +41,7 @@ app.add_middleware(
|
||||||
DB_PATH = Path("/app/data/tasks.db")
|
DB_PATH = Path("/app/data/tasks.db")
|
||||||
LOGS_PATH = Path("/app/logs")
|
LOGS_PATH = Path("/app/logs")
|
||||||
TASKS_PATH = Path("/app/tasks")
|
TASKS_PATH = Path("/app/tasks")
|
||||||
|
TOKEN_FILE = Path("/app/data/.claude_token") # persist token across restarts
|
||||||
|
|
||||||
# Ensure directories exist
|
# Ensure directories exist
|
||||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
@ -110,40 +111,57 @@ def init_db():
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_claude_env():
|
||||||
|
"""Get environment with auth tokens set for Claude CLI"""
|
||||||
|
env = os.environ.copy()
|
||||||
|
# Load saved token if it exists
|
||||||
|
if TOKEN_FILE.exists():
|
||||||
|
try:
|
||||||
|
saved = json.loads(TOKEN_FILE.read_text())
|
||||||
|
if saved.get("type") == "oauth_token" and saved.get("token"):
|
||||||
|
env["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
|
||||||
|
elif saved.get("type") == "api_key" and saved.get("token"):
|
||||||
|
env["ANTHROPIC_API_KEY"] = saved["token"]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to load saved token: {e}")
|
||||||
|
return env
|
||||||
|
|
||||||
|
|
||||||
async def run_claude_task(task: Task, run_id: str):
|
async def run_claude_task(task: Task, run_id: str):
|
||||||
"""Execute a Claude Code task"""
|
"""Execute a Claude Code task"""
|
||||||
|
started = datetime.now().isoformat()
|
||||||
try:
|
try:
|
||||||
# Save task prompt to temp file
|
env = _get_claude_env()
|
||||||
prompt_file = TASKS_PATH / f"{run_id}.md"
|
|
||||||
prompt_file.write_text(task.prompt)
|
|
||||||
|
|
||||||
# Run Claude Code
|
# Run Claude Code in print mode (non-interactive)
|
||||||
process = await asyncio.create_subprocess_exec(
|
process = await asyncio.create_subprocess_exec(
|
||||||
"claude",
|
"claude", "-p", "--allowedTools", "Bash,Read,Write,Edit",
|
||||||
"task",
|
"--permission-mode", "auto",
|
||||||
str(prompt_file),
|
task.prompt,
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE
|
stderr=asyncio.subprocess.PIPE,
|
||||||
|
env=env
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
|
||||||
|
|
||||||
output = stdout.decode() if stdout else ""
|
output = stdout.decode(errors="replace") if stdout else ""
|
||||||
error = stderr.decode() if stderr else ""
|
error = stderr.decode(errors="replace") if stderr else ""
|
||||||
|
|
||||||
|
status = "completed" if process.returncode == 0 else "failed"
|
||||||
|
|
||||||
# Save run result
|
# Save run result
|
||||||
save_task_run(TaskRun(
|
save_task_run(TaskRun(
|
||||||
task_id=task.id,
|
task_id=task.id,
|
||||||
run_id=run_id,
|
run_id=run_id,
|
||||||
status="completed" if process.returncode == 0 else "failed",
|
status=status,
|
||||||
output=output,
|
output=output,
|
||||||
error=error if process.returncode != 0 else None,
|
error=error if status == "failed" else None,
|
||||||
started_at=datetime.now().isoformat(),
|
started_at=started,
|
||||||
completed_at=datetime.now().isoformat()
|
completed_at=datetime.now().isoformat()
|
||||||
))
|
))
|
||||||
|
|
||||||
# Update task
|
update_task_status(task.id, status)
|
||||||
update_task_status(task.id, "completed")
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
save_task_run(TaskRun(
|
save_task_run(TaskRun(
|
||||||
|
|
@ -151,7 +169,7 @@ async def run_claude_task(task: Task, run_id: str):
|
||||||
run_id=run_id,
|
run_id=run_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
error="Task timeout (>5 minutes)",
|
error="Task timeout (>5 minutes)",
|
||||||
started_at=datetime.now().isoformat(),
|
started_at=started,
|
||||||
completed_at=datetime.now().isoformat()
|
completed_at=datetime.now().isoformat()
|
||||||
))
|
))
|
||||||
update_task_status(task.id, "failed")
|
update_task_status(task.id, "failed")
|
||||||
|
|
@ -161,7 +179,7 @@ async def run_claude_task(task: Task, run_id: str):
|
||||||
run_id=run_id,
|
run_id=run_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
error=str(e),
|
error=str(e),
|
||||||
started_at=datetime.now().isoformat(),
|
started_at=started,
|
||||||
completed_at=datetime.now().isoformat()
|
completed_at=datetime.now().isoformat()
|
||||||
))
|
))
|
||||||
update_task_status(task.id, "failed")
|
update_task_status(task.id, "failed")
|
||||||
|
|
@ -294,6 +312,19 @@ async def startup():
|
||||||
init_db()
|
init_db()
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
# Load saved token into environment on startup
|
||||||
|
if TOKEN_FILE.exists():
|
||||||
|
try:
|
||||||
|
saved = json.loads(TOKEN_FILE.read_text())
|
||||||
|
if saved.get("type") == "oauth_token" and saved.get("token"):
|
||||||
|
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = saved["token"]
|
||||||
|
logger.info("Loaded saved OAuth token")
|
||||||
|
elif saved.get("type") == "api_key" and saved.get("token"):
|
||||||
|
os.environ["ANTHROPIC_API_KEY"] = saved["token"]
|
||||||
|
logger.info("Loaded saved API key")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to load saved token: {e}")
|
||||||
|
|
||||||
# Schedule existing tasks
|
# Schedule existing tasks
|
||||||
for task in get_all_tasks():
|
for task in get_all_tasks():
|
||||||
if task.enabled:
|
if task.enabled:
|
||||||
|
|
@ -432,8 +463,6 @@ async def system_info() -> Dict:
|
||||||
@app.get("/api/system/usage")
|
@app.get("/api/system/usage")
|
||||||
async def usage_stats() -> Dict:
|
async def usage_stats() -> Dict:
|
||||||
"""Get Claude API usage stats from session files if available"""
|
"""Get Claude API usage stats from session files if available"""
|
||||||
import glob, json as _json
|
|
||||||
|
|
||||||
usage = {
|
usage = {
|
||||||
"models_used": [],
|
"models_used": [],
|
||||||
"session_count": 0,
|
"session_count": 0,
|
||||||
|
|
@ -443,19 +472,16 @@ async def usage_stats() -> Dict:
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Claude Code stores session info under ~/.claude
|
|
||||||
claude_dir = Path("/root/.claude")
|
claude_dir = Path("/root/.claude")
|
||||||
sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl"))
|
sessions = list(claude_dir.glob("**/session*.json")) + list(claude_dir.glob("**/*.jsonl"))
|
||||||
usage["session_count"] = len(sessions)
|
usage["session_count"] = len(sessions)
|
||||||
|
|
||||||
# Try to parse any usage metadata
|
|
||||||
stats_file = claude_dir / "usage_stats.json"
|
stats_file = claude_dir / "usage_stats.json"
|
||||||
if stats_file.exists():
|
if stats_file.exists():
|
||||||
with open(stats_file) as f:
|
with open(stats_file) as f:
|
||||||
saved = _json.load(f)
|
saved = json.load(f)
|
||||||
usage.update(saved)
|
usage.update(saved)
|
||||||
else:
|
else:
|
||||||
# Estimate reset time: Anthropic resets usage monthly
|
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
if now.day <= 1:
|
if now.day <= 1:
|
||||||
reset = now.replace(day=1, hour=0, minute=0, second=0)
|
reset = now.replace(day=1, hour=0, minute=0, second=0)
|
||||||
|
|
@ -467,7 +493,6 @@ async def usage_stats() -> Dict:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
usage["error"] = str(e)
|
usage["error"] = str(e)
|
||||||
|
|
||||||
# Add run stats for context
|
|
||||||
conn = sqlite3.connect(DB_PATH)
|
conn = sqlite3.connect(DB_PATH)
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs")
|
cursor.execute("SELECT COUNT(*), MIN(started_at), MAX(started_at) FROM task_runs")
|
||||||
|
|
@ -480,203 +505,136 @@ async def usage_stats() -> Dict:
|
||||||
return usage
|
return usage
|
||||||
|
|
||||||
|
|
||||||
# Auth state stored in memory (persists while container runs)
|
# ============ Auth Endpoints ============
|
||||||
_auth_process = None
|
|
||||||
_auth_url = None
|
|
||||||
_auth_status = "unknown" # unknown | logged_in | logged_out | pending
|
|
||||||
|
|
||||||
|
|
||||||
async def _check_claude_auth() -> str:
|
|
||||||
"""Check if claude is authenticated by running claude auth status"""
|
|
||||||
global _auth_status
|
|
||||||
try:
|
|
||||||
proc = await asyncio.create_subprocess_exec(
|
|
||||||
"claude", "auth", "status",
|
|
||||||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
||||||
)
|
|
||||||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
||||||
output = stdout.decode(errors="replace").strip()
|
|
||||||
logger.info(f"claude auth status output: {output}")
|
|
||||||
try:
|
|
||||||
import json as _json
|
|
||||||
data = _json.loads(output)
|
|
||||||
if data.get("loggedIn"):
|
|
||||||
_auth_status = "logged_in"
|
|
||||||
return "logged_in"
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
_auth_status = "logged_out"
|
|
||||||
return "logged_out"
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Auth check failed: {e}")
|
|
||||||
_auth_status = "unknown"
|
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/auth/status")
|
@app.get("/api/auth/status")
|
||||||
async def auth_status():
|
async def auth_status():
|
||||||
"""Check Claude auth status using claude auth status JSON"""
|
"""Check Claude auth status"""
|
||||||
global _auth_status, _auth_url
|
|
||||||
account = None
|
account = None
|
||||||
auth_method = None
|
auth_method = None
|
||||||
|
status = "logged_out"
|
||||||
|
has_saved_token = TOKEN_FILE.exists()
|
||||||
|
token_type = None
|
||||||
|
|
||||||
|
if has_saved_token:
|
||||||
|
try:
|
||||||
|
saved = json.loads(TOKEN_FILE.read_text())
|
||||||
|
token_type = saved.get("type")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
env = _get_claude_env()
|
||||||
proc = await asyncio.create_subprocess_exec(
|
proc = await asyncio.create_subprocess_exec(
|
||||||
"claude", "auth", "status",
|
"claude", "auth", "status", "--json",
|
||||||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
||||||
|
env=env
|
||||||
)
|
)
|
||||||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
||||||
output = stdout.decode(errors="replace").strip()
|
output = stdout.decode(errors="replace").strip()
|
||||||
import json as _json
|
logger.info(f"claude auth status: {output}")
|
||||||
data = _json.loads(output)
|
data = json.loads(output)
|
||||||
if data.get("loggedIn"):
|
if data.get("loggedIn"):
|
||||||
_auth_status = "logged_in"
|
status = "logged_in"
|
||||||
account = data.get("email") or data.get("account", {}).get("emailAddress")
|
account = data.get("email") or data.get("account", {}).get("emailAddress")
|
||||||
auth_method = data.get("authMethod")
|
auth_method = data.get("authMethod")
|
||||||
else:
|
|
||||||
_auth_status = "logged_out"
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Auth status check failed: {e}")
|
logger.error(f"Auth status check failed: {e}")
|
||||||
# Fall back to existing status
|
|
||||||
pass
|
|
||||||
return {
|
return {
|
||||||
"status": _auth_status,
|
"status": status,
|
||||||
"account": account,
|
"account": account,
|
||||||
"auth_method": auth_method,
|
"auth_method": auth_method,
|
||||||
"auth_url": _auth_url if _auth_status == "pending" else None
|
"has_saved_token": has_saved_token,
|
||||||
|
"token_type": token_type
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/auth/login")
|
class TokenSubmit(BaseModel):
|
||||||
async def auth_login():
|
token: str
|
||||||
"""Initiate Claude OAuth login - returns the URL to open in browser"""
|
token_type: str = "oauth_token" # "oauth_token" or "api_key"
|
||||||
global _auth_process, _auth_url, _auth_status
|
|
||||||
|
|
||||||
# Kill any existing auth process
|
|
||||||
if _auth_process and _auth_process.returncode is None:
|
|
||||||
try:
|
|
||||||
_auth_process.kill()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
_auth_status = "pending"
|
@app.post("/api/auth/token")
|
||||||
_auth_url = None
|
async def auth_set_token(payload: TokenSubmit):
|
||||||
|
"""Save an auth token (OAuth setup token or API key)"""
|
||||||
|
token = payload.token.strip()
|
||||||
|
token_type = payload.token_type
|
||||||
|
|
||||||
|
if not token:
|
||||||
|
return {"status": "error", "message": "Token cannot be empty"}
|
||||||
|
|
||||||
|
# Auto-detect token type
|
||||||
|
if token.startswith("sk-ant-oat"):
|
||||||
|
token_type = "oauth_token"
|
||||||
|
elif token.startswith("sk-ant-api"):
|
||||||
|
token_type = "api_key"
|
||||||
|
|
||||||
|
# Save to file for persistence
|
||||||
|
TOKEN_FILE.write_text(json.dumps({
|
||||||
|
"type": token_type,
|
||||||
|
"token": token,
|
||||||
|
"saved_at": datetime.now().isoformat()
|
||||||
|
}))
|
||||||
|
|
||||||
|
# Set in current process environment
|
||||||
|
if token_type == "oauth_token":
|
||||||
|
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = token
|
||||||
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
||||||
|
elif token_type == "api_key":
|
||||||
|
os.environ["ANTHROPIC_API_KEY"] = token
|
||||||
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
||||||
|
|
||||||
|
# Verify auth works
|
||||||
try:
|
try:
|
||||||
# Run claude auth login --claudeai with stdin PIPE so we can send the code later
|
env = _get_claude_env()
|
||||||
proc = await asyncio.create_subprocess_exec(
|
proc = await asyncio.create_subprocess_exec(
|
||||||
"claude", "auth", "login", "--claudeai",
|
"claude", "auth", "status", "--json",
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.STDOUT,
|
env=env
|
||||||
stdin=asyncio.subprocess.PIPE
|
|
||||||
)
|
)
|
||||||
_auth_process = proc
|
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
||||||
|
output = stdout.decode(errors="replace").strip()
|
||||||
# Read output looking for the OAuth URL
|
data = json.loads(output)
|
||||||
url = None
|
if data.get("loggedIn"):
|
||||||
try:
|
|
||||||
async def _read_url():
|
|
||||||
collected = ""
|
|
||||||
while True:
|
|
||||||
line = await proc.stdout.readline()
|
|
||||||
if not line:
|
|
||||||
break
|
|
||||||
text = line.decode(errors="replace")
|
|
||||||
collected += text
|
|
||||||
logger.info(f"claude auth: {text.strip()}")
|
|
||||||
for part in text.split():
|
|
||||||
cleaned = part.strip().rstrip("\\n")
|
|
||||||
if cleaned.startswith("https://") and ("oauth" in cleaned or "claude.com" in cleaned):
|
|
||||||
return cleaned
|
|
||||||
logger.info(f"Full auth output: {collected}")
|
|
||||||
return None
|
|
||||||
url = await asyncio.wait_for(_read_url(), timeout=15)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.warning("Timed out waiting for auth URL")
|
|
||||||
|
|
||||||
if url:
|
|
||||||
_auth_url = url
|
|
||||||
return {
|
return {
|
||||||
"status": "pending",
|
"status": "logged_in",
|
||||||
"auth_url": url,
|
"message": "Token saved and verified!",
|
||||||
"message": "Open this URL, authorize, then paste the code you receive back here."
|
"account": data.get("email"),
|
||||||
|
"auth_method": data.get("authMethod")
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
return {
|
return {
|
||||||
"status": "error",
|
"status": "token_saved",
|
||||||
"message": "Could not extract login URL. Try: docker exec -it claude-persistent-agent claude auth login --claudeai"
|
"message": "Token saved but auth status shows not logged in. The token may still work for API calls.",
|
||||||
|
"raw": output
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Auth login failed: {e}")
|
return {
|
||||||
return {"status": "error", "message": str(e)}
|
"status": "token_saved",
|
||||||
|
"message": f"Token saved. Could not verify: {e}"
|
||||||
|
}
|
||||||
class AuthCode(BaseModel):
|
|
||||||
code: str
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/auth/code")
|
|
||||||
async def auth_submit_code(payload: AuthCode):
|
|
||||||
"""Submit the OAuth authorization code from the browser callback"""
|
|
||||||
global _auth_process, _auth_status
|
|
||||||
|
|
||||||
if not _auth_process or _auth_process.returncode is not None:
|
|
||||||
return {"status": "error", "message": "No active login session. Click 'Login' first."}
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Send the code to the waiting claude auth login process via stdin
|
|
||||||
code = payload.code.strip() + "\n"
|
|
||||||
_auth_process.stdin.write(code.encode())
|
|
||||||
await _auth_process.stdin.drain()
|
|
||||||
_auth_process.stdin.close()
|
|
||||||
|
|
||||||
# Wait for the process to finish
|
|
||||||
try:
|
|
||||||
stdout, _ = await asyncio.wait_for(_auth_process.communicate(), timeout=30)
|
|
||||||
output = stdout.decode(errors="replace") if stdout else ""
|
|
||||||
logger.info(f"Auth code result: {output}")
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.warning("Auth process timed out after submitting code")
|
|
||||||
|
|
||||||
# Check if login succeeded
|
|
||||||
check_proc = await asyncio.create_subprocess_exec(
|
|
||||||
"claude", "auth", "status",
|
|
||||||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
||||||
)
|
|
||||||
check_stdout, _ = await asyncio.wait_for(check_proc.communicate(), timeout=10)
|
|
||||||
check_output = check_stdout.decode(errors="replace").strip()
|
|
||||||
|
|
||||||
try:
|
|
||||||
import json as _json
|
|
||||||
data = _json.loads(check_output)
|
|
||||||
if data.get("loggedIn"):
|
|
||||||
_auth_status = "logged_in"
|
|
||||||
return {"status": "logged_in", "message": "Successfully logged in!", "account": data.get("email")}
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
_auth_status = "logged_out"
|
|
||||||
return {"status": "error", "message": f"Login may have failed. Auth status: {check_output}"}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Auth code submission failed: {e}")
|
|
||||||
return {"status": "error", "message": str(e)}
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/auth/logout")
|
@app.post("/api/auth/logout")
|
||||||
async def auth_logout():
|
async def auth_logout():
|
||||||
"""Log out of Claude"""
|
"""Clear saved auth token"""
|
||||||
global _auth_status
|
if TOKEN_FILE.exists():
|
||||||
|
TOKEN_FILE.unlink()
|
||||||
|
os.environ.pop("CLAUDE_CODE_OAUTH_TOKEN", None)
|
||||||
|
os.environ.pop("ANTHROPIC_API_KEY", None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
proc = await asyncio.create_subprocess_exec(
|
proc = await asyncio.create_subprocess_exec(
|
||||||
"claude", "auth", "logout",
|
"claude", "auth", "logout",
|
||||||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||||||
)
|
)
|
||||||
await asyncio.wait_for(proc.communicate(), timeout=10)
|
await asyncio.wait_for(proc.communicate(), timeout=10)
|
||||||
_auth_status = "logged_out"
|
except Exception:
|
||||||
return {"status": "logged_out"}
|
pass
|
||||||
except Exception as e:
|
|
||||||
return {"status": "error", "message": str(e)}
|
return {"status": "logged_out"}
|
||||||
|
|
||||||
|
|
||||||
# ============ MCP Server Management ============
|
# ============ MCP Server Management ============
|
||||||
|
|
@ -693,21 +651,17 @@ async def list_mcp_servers():
|
||||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
|
||||||
output = stdout.decode(errors="replace").strip()
|
output = stdout.decode(errors="replace").strip()
|
||||||
|
|
||||||
# Parse the output — claude mcp list shows servers in a table or JSON
|
|
||||||
servers = []
|
servers = []
|
||||||
if "No MCP servers configured" in output:
|
if "No MCP servers configured" in output:
|
||||||
return {"servers": [], "raw": output}
|
return {"servers": [], "raw": output}
|
||||||
|
|
||||||
# Try JSON parse first
|
|
||||||
try:
|
try:
|
||||||
import json as _json
|
data = json.loads(output)
|
||||||
data = _json.loads(output)
|
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
servers = data
|
servers = data
|
||||||
elif isinstance(data, dict):
|
elif isinstance(data, dict):
|
||||||
servers = list(data.values()) if data else []
|
servers = list(data.values()) if data else []
|
||||||
except Exception:
|
except Exception:
|
||||||
# Parse text output line by line
|
|
||||||
for line in output.split("\n"):
|
for line in output.split("\n"):
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line and not line.startswith("-") and not line.startswith("Name"):
|
if line and not line.startswith("-") and not line.startswith("Name"):
|
||||||
|
|
|
||||||
Reference in a new issue