""" User Management Routes for MCP Gateway ====================================== REST endpoints for user, API key, and MCP access control management. """ import json from starlette.requests import Request from starlette.responses import JSONResponse from user_management import user_manager async def create_user(request: Request) -> JSONResponse: """POST /users — Create a new user.""" try: body = await request.json() username = body.get('username', '').strip() email = body.get('email', '').strip() description = body.get('description', '').strip() if not username: return JSONResponse({'error': 'username required'}, status_code=400) user = user_manager.create_user(username, email=email, description=description) return JSONResponse({ 'status': 'created', 'user': { 'username': username, 'user_id': user['user_id'], 'email': email, 'created_at': user['created_at'] } }, status_code=201) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=409) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def list_users(request: Request) -> JSONResponse: """GET /users — List all users.""" try: users = user_manager.list_users() return JSONResponse({ 'status': 'ok', 'count': len(users), 'users': users }) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def get_user(request: Request) -> JSONResponse: """GET /users/{username} — Get user details.""" try: username = request.path_params.get('username') users = {u['username']: u for u in user_manager.list_users()} if username not in users: return JSONResponse({'error': 'user not found'}, status_code=404) user = users[username] keys = user_manager.get_user_keys(username) return JSONResponse({ 'status': 'ok', 'user': user, 'api_keys': keys }) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def delete_user(request: Request) -> JSONResponse: """DELETE /users/{username} — Delete a user.""" try: username = request.path_params.get('username') user_manager.delete_user(username) return JSONResponse({'status': 'deleted', 'username': username}) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=404) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def toggle_user(request: Request) -> JSONResponse: """PATCH /users/{username}/enable — Enable/disable a user.""" try: username = request.path_params.get('username') body = await request.json() enabled = body.get('enabled', True) user_manager.toggle_user(username, enabled) return JSONResponse({ 'status': 'ok', 'username': username, 'enabled': enabled }) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=404) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def generate_api_key(request: Request) -> JSONResponse: """POST /users/{username}/keys — Generate a new API key.""" try: username = request.path_params.get('username') body = await request.json() key_name = body.get('key_name', '') ttl_days = body.get('ttl_days') api_key = user_manager.generate_api_key(username, key_name=key_name, ttl_days=ttl_days) return JSONResponse({ 'status': 'created', 'username': username, 'api_key': api_key, 'note': 'Save this key immediately — it will not be shown again' }, status_code=201) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=404) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def revoke_api_key(request: Request) -> JSONResponse: """DELETE /keys/{key_hash} — Revoke an API key.""" try: key_hash = request.path_params.get('key_hash') # In practice, we'd need to reconstruct the hash or pass the full key # For now, we'll expect the client to pass the full key body = await request.json() api_key = body.get('api_key') if not api_key: return JSONResponse({'error': 'api_key required'}, status_code=400) user_manager.revoke_api_key(api_key) return JSONResponse({'status': 'revoked'}) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=404) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500) async def set_mcp_access(request: Request) -> JSONResponse: """PUT /users/{username}/mcp-access — Configure MCP access for a user.""" try: username = request.path_params.get('username') body = await request.json() allowed_mcps = body.get('allowed_mcps') blocked_mcps = body.get('blocked_mcps') user_manager.set_mcp_access( username, allowed_mcps=allowed_mcps, blocked_mcps=blocked_mcps ) users = {u['username']: u for u in user_manager.list_users()} user = users.get(username, {}) return JSONResponse({ 'status': 'updated', 'username': username, 'mcp_allowed': user.get('mcp_allowed', []), 'mcp_blocked': user.get('mcp_blocked', []) }) except ValueError as e: return JSONResponse({'error': str(e)}, status_code=404) except Exception as e: return JSONResponse({'error': str(e)}, status_code=500)