mcp-servers/gateway-proxy/user_routes.py
zgaetano a1a6ef137a security: require auth on all admin/dashboard/user routes
/admin, /dashboard, /dashboard/status, and all /users/* and /keys/*
endpoints were publicly accessible with no authentication, exposing
user management, API key generation, and backend topology to anyone.

- /dashboard and /dashboard/status now require Bearer token
- /admin (user management UI) now requires Bearer token
- All /users/* and /keys/revoke routes now require Bearer token
- /health scrubbed of sensitive fields (token counts, client counts)
- /linkedin/* left public (required for OAuth callback flow)

Auth checks use GATEWAY_STATIC_API_KEY or valid OAuth access tokens,
consistent with the existing /mcp and /status endpoints.
2026-03-31 23:32:15 -04:00

220 lines
7.3 KiB
Python

"""
User Management Routes for MCP Gateway
======================================
REST endpoints for user, API key, and MCP access control management.
All routes require a valid Bearer token (static API key or OAuth token).
"""
import json
import os
import time
import hashlib
from starlette.requests import Request
from starlette.responses import JSONResponse
from user_management import user_manager
STATIC_API_KEY = os.environ.get("GATEWAY_STATIC_API_KEY", "")
def _hash(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
def _require_auth(request: Request):
"""Returns None if authorized, or a 401 JSONResponse if not."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse({"error": "unauthorized"}, status_code=401)
token = auth_header[7:]
if STATIC_API_KEY and token == STATIC_API_KEY:
return None
# Import live token store from gateway_proxy to validate OAuth tokens
try:
from gateway_proxy import ACCESS_TOKENS
token_hash = _hash(token)
info = ACCESS_TOKENS.get(token_hash)
if info and info["expires_at"] >= time.time():
return None
except ImportError:
pass
return JSONResponse({"error": "unauthorized"}, status_code=401)
async def create_user(request: Request) -> JSONResponse:
"""POST /users — Create a new user."""
if (err := _require_auth(request)): return err
try:
body = await request.json()
username = body.get('username', '').strip()
email = body.get('email', '').strip()
description = body.get('description', '').strip()
if not username:
return JSONResponse({'error': 'username required'}, status_code=400)
user = user_manager.create_user(username, email=email, description=description)
return JSONResponse({
'status': 'created',
'user': {
'username': username,
'user_id': user['user_id'],
'email': email,
'created_at': user['created_at']
}
}, status_code=201)
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=409)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def list_users(request: Request) -> JSONResponse:
"""GET /users — List all users."""
if (err := _require_auth(request)): return err
try:
users = user_manager.list_users()
return JSONResponse({
'status': 'ok',
'count': len(users),
'users': users
})
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def get_user(request: Request) -> JSONResponse:
"""GET /users/{username} — Get user details."""
if (err := _require_auth(request)): return err
try:
username = request.path_params.get('username')
users = {u['username']: u for u in user_manager.list_users()}
if username not in users:
return JSONResponse({'error': 'user not found'}, status_code=404)
user = users[username]
keys = user_manager.get_user_keys(username)
return JSONResponse({
'status': 'ok',
'user': user,
'api_keys': keys
})
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def delete_user(request: Request) -> JSONResponse:
"""DELETE /users/{username} — Delete a user."""
if (err := _require_auth(request)): return err
try:
username = request.path_params.get('username')
user_manager.delete_user(username)
return JSONResponse({'status': 'deleted', 'username': username})
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=404)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def toggle_user(request: Request) -> JSONResponse:
"""PATCH /users/{username}/enable — Enable/disable a user."""
if (err := _require_auth(request)): return err
try:
username = request.path_params.get('username')
body = await request.json()
enabled = body.get('enabled', True)
user_manager.toggle_user(username, enabled)
return JSONResponse({
'status': 'ok',
'username': username,
'enabled': enabled
})
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=404)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def generate_api_key(request: Request) -> JSONResponse:
"""POST /users/{username}/keys — Generate a new API key."""
if (err := _require_auth(request)): return err
try:
username = request.path_params.get('username')
body = await request.json()
key_name = body.get('key_name', '')
ttl_days = body.get('ttl_days')
api_key = user_manager.generate_api_key(username, key_name=key_name, ttl_days=ttl_days)
return JSONResponse({
'status': 'created',
'username': username,
'api_key': api_key,
'note': 'Save this key immediately — it will not be shown again'
}, status_code=201)
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=404)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def revoke_api_key(request: Request) -> JSONResponse:
"""DELETE /keys/{key_hash} — Revoke an API key."""
if (err := _require_auth(request)): return err
try:
key_hash = request.path_params.get('key_hash')
# In practice, we'd need to reconstruct the hash or pass the full key
# For now, we'll expect the client to pass the full key
body = await request.json()
api_key = body.get('api_key')
if not api_key:
return JSONResponse({'error': 'api_key required'}, status_code=400)
user_manager.revoke_api_key(api_key)
return JSONResponse({'status': 'revoked'})
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=404)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)
async def set_mcp_access(request: Request) -> JSONResponse:
"""PUT /users/{username}/mcp-access — Configure MCP access for a user."""
if (err := _require_auth(request)): return err
try:
username = request.path_params.get('username')
body = await request.json()
allowed_mcps = body.get('allowed_mcps')
blocked_mcps = body.get('blocked_mcps')
user_manager.set_mcp_access(
username,
allowed_mcps=allowed_mcps,
blocked_mcps=blocked_mcps
)
users = {u['username']: u for u in user_manager.list_users()}
user = users.get(username, {})
return JSONResponse({
'status': 'updated',
'username': username,
'mcp_allowed': user.get('mcp_allowed', []),
'mcp_blocked': user.get('mcp_blocked', [])
})
except ValueError as e:
return JSONResponse({'error': str(e)}, status_code=404)
except Exception as e:
return JSONResponse({'error': str(e)}, status_code=500)