""" User Management System for MCP Gateway ====================================== Handles: - User creation and deletion - API key generation and revocation - Per-user MCP access control (allow/block lists) - Persistent storage with JSON """ import json import os import secrets import time import hashlib from datetime import datetime from pathlib import Path from typing import Optional class UserManager: """Manages users, API keys, and MCP access permissions.""" def __init__(self, storage_path: str = "/data/users.json"): self.storage_path = storage_path self.users = {} self.api_keys = {} # api_key -> {user_id, created_at, revoked} self.load_from_disk() def load_from_disk(self): """Load users and API keys from persistent storage.""" if os.path.exists(self.storage_path): try: with open(self.storage_path, 'r') as f: data = json.load(f) self.users = data.get('users', {}) self.api_keys = data.get('api_keys', {}) except Exception as e: print(f"Warning: Failed to load users from {self.storage_path}: {e}") self.users = {} self.api_keys = {} else: # Create storage directory os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) self.save_to_disk() def save_to_disk(self): """Persist users and API keys to storage.""" try: os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) with open(self.storage_path, 'w') as f: json.dump({ 'users': self.users, 'api_keys': self.api_keys, 'last_updated': datetime.utcnow().isoformat() }, f, indent=2) except Exception as e: print(f"Error saving users to disk: {e}") def create_user(self, username: str, email: str = "", description: str = "") -> dict: """Create a new user.""" if username in self.users: raise ValueError(f"User '{username}' already exists") user_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()[:16] self.users[username] = { 'user_id': user_id, 'email': email, 'description': description, 'created_at': datetime.utcnow().isoformat(), 'last_used': None, 'api_keys': [], # List of API key hashes 'mcp_allowed': list(os.environ.get('MCP_BACKEND_', '').split(',')) or [], # Default: allow all 'mcp_blocked': [], # MCPs to explicitly block 'enabled': True, 'metadata': {} } self.save_to_disk() return self.users[username] def delete_user(self, username: str): """Delete a user and all their API keys.""" if username not in self.users: raise ValueError(f"User '{username}' not found") # Revoke all API keys for this user user_id = self.users[username]['user_id'] keys_to_remove = [k for k, v in self.api_keys.items() if v['user_id'] == user_id] for key in keys_to_remove: del self.api_keys[key] del self.users[username] self.save_to_disk() def generate_api_key(self, username: str, key_name: str = "", ttl_days: Optional[int] = None) -> str: """Generate a new API key for a user.""" if username not in self.users: raise ValueError(f"User '{username}' not found") # Generate a random key api_key = f"mcpgw_{secrets.token_urlsafe(32)}" # Store key metadata (hash the key so we don't store plain text) key_hash = hashlib.sha256(api_key.encode()).hexdigest() expires_at = None if ttl_days: expires_at = (datetime.utcnow().timestamp() + (ttl_days * 86400)) self.api_keys[key_hash] = { 'user_id': self.users[username]['user_id'], 'username': username, 'key_name': key_name or f"key-{int(time.time())}", 'created_at': datetime.utcnow().isoformat(), 'expires_at': expires_at, 'last_used': None, 'revoked': False, 'revoked_at': None } # Track key in user record self.users[username]['api_keys'].append(key_hash) self.save_to_disk() return api_key # Return the actual key (not hash) def revoke_api_key(self, api_key: str): """Revoke an API key.""" key_hash = hashlib.sha256(api_key.encode()).hexdigest() if key_hash not in self.api_keys: raise ValueError("API key not found") self.api_keys[key_hash]['revoked'] = True self.api_keys[key_hash]['revoked_at'] = datetime.utcnow().isoformat() self.save_to_disk() def validate_api_key(self, api_key: str) -> Optional[dict]: """Validate an API key and return user info if valid.""" key_hash = hashlib.sha256(api_key.encode()).hexdigest() if key_hash not in self.api_keys: return None key_info = self.api_keys[key_hash] # Check if revoked if key_info['revoked']: return None # Check if expired if key_info['expires_at'] and key_info['expires_at'] < time.time(): return None # Update last_used key_info['last_used'] = datetime.utcnow().isoformat() # Get user info username = key_info['username'] if username not in self.users: return None user = self.users[username] # Check if user is enabled if not user['enabled']: return None self.save_to_disk() return { 'user_id': user['user_id'], 'username': username, 'email': user['email'], 'mcp_allowed': user['mcp_allowed'], 'mcp_blocked': user['mcp_blocked'], 'key_name': key_info['key_name'] } def set_mcp_access(self, username: str, allowed_mcps: list = None, blocked_mcps: list = None): """Configure which MCPs a user can access.""" if username not in self.users: raise ValueError(f"User '{username}' not found") if allowed_mcps is not None: self.users[username]['mcp_allowed'] = allowed_mcps if blocked_mcps is not None: self.users[username]['mcp_blocked'] = blocked_mcps self.save_to_disk() def can_access_mcp(self, username: str, mcp_name: str) -> bool: """Check if a user can access a specific MCP.""" if username not in self.users: return False user = self.users[username] # Check if blocked if mcp_name in user['mcp_blocked']: return False # Check if allowed (empty list = allow all) if user['mcp_allowed'] and mcp_name not in user['mcp_allowed']: return False return True def list_users(self) -> list: """Get all users with sanitized info (no keys).""" return [ { 'username': username, 'user_id': user['user_id'], 'email': user['email'], 'description': user['description'], 'created_at': user['created_at'], 'enabled': user['enabled'], 'api_key_count': len([k for k in user['api_keys'] if not self.api_keys[k]['revoked']]), 'mcp_allowed': user['mcp_allowed'], 'mcp_blocked': user['mcp_blocked'], 'metadata': user['metadata'] } for username, user in self.users.items() ] def get_user_keys(self, username: str) -> list: """Get all API keys for a user (sanitized, no secrets).""" if username not in self.users: return [] user = self.users[username] keys = [] for key_hash in user['api_keys']: if key_hash in self.api_keys: key_info = self.api_keys[key_hash] keys.append({ 'key_name': key_info['key_name'], 'key_hash': key_hash[:8] + '****', # Show first 8 chars 'created_at': key_info['created_at'], 'expires_at': key_info['expires_at'], 'last_used': key_info['last_used'], 'revoked': key_info['revoked'], 'revoked_at': key_info['revoked_at'] }) return keys def toggle_user(self, username: str, enabled: bool): """Enable or disable a user.""" if username not in self.users: raise ValueError(f"User '{username}' not found") self.users[username]['enabled'] = enabled self.save_to_disk() # Global instance user_manager = UserManager(storage_path=os.environ.get("USERS_DB_PATH", "/data/users.json"))