From 44f0b9bb018485655c85957cf4971d0affa43786 Mon Sep 17 00:00:00 2001 From: zgaetano Date: Tue, 31 Mar 2026 15:29:39 -0400 Subject: [PATCH] Add mcp-gateway/gateway-proxy/user_management.py --- mcp-gateway/gateway-proxy/user_management.py | 264 +++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100644 mcp-gateway/gateway-proxy/user_management.py diff --git a/mcp-gateway/gateway-proxy/user_management.py b/mcp-gateway/gateway-proxy/user_management.py new file mode 100644 index 0000000..e844c9d --- /dev/null +++ b/mcp-gateway/gateway-proxy/user_management.py @@ -0,0 +1,264 @@ +""" +User Management System for MCP Gateway +====================================== +Handles: +- User creation and deletion +- API key generation and revocation +- Per-user MCP access control (allow/block lists) +- Persistent storage with JSON +""" + +import json +import os +import secrets +import time +import hashlib +from datetime import datetime +from pathlib import Path +from typing import Optional + + +class UserManager: + """Manages users, API keys, and MCP access permissions.""" + + def __init__(self, storage_path: str = "/data/users.json"): + self.storage_path = storage_path + self.users = {} + self.api_keys = {} # api_key -> {user_id, created_at, revoked} + self.load_from_disk() + + def load_from_disk(self): + """Load users and API keys from persistent storage.""" + if os.path.exists(self.storage_path): + try: + with open(self.storage_path, 'r') as f: + data = json.load(f) + self.users = data.get('users', {}) + self.api_keys = data.get('api_keys', {}) + except Exception as e: + print(f"Warning: Failed to load users from {self.storage_path}: {e}") + self.users = {} + self.api_keys = {} + else: + # Create storage directory + os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) + self.save_to_disk() + + def save_to_disk(self): + """Persist users and API keys to storage.""" + try: + os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) + with open(self.storage_path, 'w') as f: + json.dump({ + 'users': self.users, + 'api_keys': self.api_keys, + 'last_updated': datetime.utcnow().isoformat() + }, f, indent=2) + except Exception as e: + print(f"Error saving users to disk: {e}") + + def create_user(self, username: str, email: str = "", description: str = "") -> dict: + """Create a new user.""" + if username in self.users: + raise ValueError(f"User '{username}' already exists") + + user_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()[:16] + + self.users[username] = { + 'user_id': user_id, + 'email': email, + 'description': description, + 'created_at': datetime.utcnow().isoformat(), + 'last_used': None, + 'api_keys': [], # List of API key hashes + 'mcp_allowed': list(os.environ.get('MCP_BACKEND_', '').split(',')) or [], # Default: allow all + 'mcp_blocked': [], # MCPs to explicitly block + 'enabled': True, + 'metadata': {} + } + + self.save_to_disk() + return self.users[username] + + def delete_user(self, username: str): + """Delete a user and all their API keys.""" + if username not in self.users: + raise ValueError(f"User '{username}' not found") + + # Revoke all API keys for this user + user_id = self.users[username]['user_id'] + keys_to_remove = [k for k, v in self.api_keys.items() if v['user_id'] == user_id] + for key in keys_to_remove: + del self.api_keys[key] + + del self.users[username] + self.save_to_disk() + + def generate_api_key(self, username: str, key_name: str = "", ttl_days: Optional[int] = None) -> str: + """Generate a new API key for a user.""" + if username not in self.users: + raise ValueError(f"User '{username}' not found") + + # Generate a random key + api_key = f"mcpgw_{secrets.token_urlsafe(32)}" + + # Store key metadata (hash the key so we don't store plain text) + key_hash = hashlib.sha256(api_key.encode()).hexdigest() + + expires_at = None + if ttl_days: + expires_at = (datetime.utcnow().timestamp() + (ttl_days * 86400)) + + self.api_keys[key_hash] = { + 'user_id': self.users[username]['user_id'], + 'username': username, + 'key_name': key_name or f"key-{int(time.time())}", + 'created_at': datetime.utcnow().isoformat(), + 'expires_at': expires_at, + 'last_used': None, + 'revoked': False, + 'revoked_at': None + } + + # Track key in user record + self.users[username]['api_keys'].append(key_hash) + + self.save_to_disk() + return api_key # Return the actual key (not hash) + + def revoke_api_key(self, api_key: str): + """Revoke an API key.""" + key_hash = hashlib.sha256(api_key.encode()).hexdigest() + + if key_hash not in self.api_keys: + raise ValueError("API key not found") + + self.api_keys[key_hash]['revoked'] = True + self.api_keys[key_hash]['revoked_at'] = datetime.utcnow().isoformat() + + self.save_to_disk() + + def validate_api_key(self, api_key: str) -> Optional[dict]: + """Validate an API key and return user info if valid.""" + key_hash = hashlib.sha256(api_key.encode()).hexdigest() + + if key_hash not in self.api_keys: + return None + + key_info = self.api_keys[key_hash] + + # Check if revoked + if key_info['revoked']: + return None + + # Check if expired + if key_info['expires_at'] and key_info['expires_at'] < time.time(): + return None + + # Update last_used + key_info['last_used'] = datetime.utcnow().isoformat() + + # Get user info + username = key_info['username'] + if username not in self.users: + return None + + user = self.users[username] + + # Check if user is enabled + if not user['enabled']: + return None + + self.save_to_disk() + + return { + 'user_id': user['user_id'], + 'username': username, + 'email': user['email'], + 'mcp_allowed': user['mcp_allowed'], + 'mcp_blocked': user['mcp_blocked'], + 'key_name': key_info['key_name'] + } + + def set_mcp_access(self, username: str, allowed_mcps: list = None, blocked_mcps: list = None): + """Configure which MCPs a user can access.""" + if username not in self.users: + raise ValueError(f"User '{username}' not found") + + if allowed_mcps is not None: + self.users[username]['mcp_allowed'] = allowed_mcps + + if blocked_mcps is not None: + self.users[username]['mcp_blocked'] = blocked_mcps + + self.save_to_disk() + + def can_access_mcp(self, username: str, mcp_name: str) -> bool: + """Check if a user can access a specific MCP.""" + if username not in self.users: + return False + + user = self.users[username] + + # Check if blocked + if mcp_name in user['mcp_blocked']: + return False + + # Check if allowed (empty list = allow all) + if user['mcp_allowed'] and mcp_name not in user['mcp_allowed']: + return False + + return True + + def list_users(self) -> list: + """Get all users with sanitized info (no keys).""" + return [ + { + 'username': username, + 'user_id': user['user_id'], + 'email': user['email'], + 'description': user['description'], + 'created_at': user['created_at'], + 'enabled': user['enabled'], + 'api_key_count': len([k for k in user['api_keys'] if not self.api_keys[k]['revoked']]), + 'mcp_allowed': user['mcp_allowed'], + 'mcp_blocked': user['mcp_blocked'], + 'metadata': user['metadata'] + } + for username, user in self.users.items() + ] + + def get_user_keys(self, username: str) -> list: + """Get all API keys for a user (sanitized, no secrets).""" + if username not in self.users: + return [] + + user = self.users[username] + keys = [] + + for key_hash in user['api_keys']: + if key_hash in self.api_keys: + key_info = self.api_keys[key_hash] + keys.append({ + 'key_name': key_info['key_name'], + 'key_hash': key_hash[:8] + '****', # Show first 8 chars + 'created_at': key_info['created_at'], + 'expires_at': key_info['expires_at'], + 'last_used': key_info['last_used'], + 'revoked': key_info['revoked'], + 'revoked_at': key_info['revoked_at'] + }) + + return keys + + def toggle_user(self, username: str, enabled: bool): + """Enable or disable a user.""" + if username not in self.users: + raise ValueError(f"User '{username}' not found") + + self.users[username]['enabled'] = enabled + self.save_to_disk() + + +# Global instance +user_manager = UserManager(storage_path=os.environ.get("USERS_DB_PATH", "/data/users.json"))