From ff4c78f42177601bd1214c46a744879763b5a361 Mon Sep 17 00:00:00 2001 From: zgaetano Date: Tue, 31 Mar 2026 15:33:11 -0400 Subject: [PATCH] Remove mcp-gateway/gateway-proxy/user_management.py --- mcp-gateway/gateway-proxy/user_management.py | 264 ------------------- 1 file changed, 264 deletions(-) delete mode 100644 mcp-gateway/gateway-proxy/user_management.py diff --git a/mcp-gateway/gateway-proxy/user_management.py b/mcp-gateway/gateway-proxy/user_management.py deleted file mode 100644 index e844c9d..0000000 --- a/mcp-gateway/gateway-proxy/user_management.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -User Management System for MCP Gateway -====================================== -Handles: -- User creation and deletion -- API key generation and revocation -- Per-user MCP access control (allow/block lists) -- Persistent storage with JSON -""" - -import json -import os -import secrets -import time -import hashlib -from datetime import datetime -from pathlib import Path -from typing import Optional - - -class UserManager: - """Manages users, API keys, and MCP access permissions.""" - - def __init__(self, storage_path: str = "/data/users.json"): - self.storage_path = storage_path - self.users = {} - self.api_keys = {} # api_key -> {user_id, created_at, revoked} - self.load_from_disk() - - def load_from_disk(self): - """Load users and API keys from persistent storage.""" - if os.path.exists(self.storage_path): - try: - with open(self.storage_path, 'r') as f: - data = json.load(f) - self.users = data.get('users', {}) - self.api_keys = data.get('api_keys', {}) - except Exception as e: - print(f"Warning: Failed to load users from {self.storage_path}: {e}") - self.users = {} - self.api_keys = {} - else: - # Create storage directory - os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) - self.save_to_disk() - - def save_to_disk(self): - """Persist users and API keys to storage.""" - try: - os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True) - with open(self.storage_path, 'w') as f: - json.dump({ - 'users': self.users, - 'api_keys': self.api_keys, - 'last_updated': datetime.utcnow().isoformat() - }, f, indent=2) - except Exception as e: - print(f"Error saving users to disk: {e}") - - def create_user(self, username: str, email: str = "", description: str = "") -> dict: - """Create a new user.""" - if username in self.users: - raise ValueError(f"User '{username}' already exists") - - user_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()[:16] - - self.users[username] = { - 'user_id': user_id, - 'email': email, - 'description': description, - 'created_at': datetime.utcnow().isoformat(), - 'last_used': None, - 'api_keys': [], # List of API key hashes - 'mcp_allowed': list(os.environ.get('MCP_BACKEND_', '').split(',')) or [], # Default: allow all - 'mcp_blocked': [], # MCPs to explicitly block - 'enabled': True, - 'metadata': {} - } - - self.save_to_disk() - return self.users[username] - - def delete_user(self, username: str): - """Delete a user and all their API keys.""" - if username not in self.users: - raise ValueError(f"User '{username}' not found") - - # Revoke all API keys for this user - user_id = self.users[username]['user_id'] - keys_to_remove = [k for k, v in self.api_keys.items() if v['user_id'] == user_id] - for key in keys_to_remove: - del self.api_keys[key] - - del self.users[username] - self.save_to_disk() - - def generate_api_key(self, username: str, key_name: str = "", ttl_days: Optional[int] = None) -> str: - """Generate a new API key for a user.""" - if username not in self.users: - raise ValueError(f"User '{username}' not found") - - # Generate a random key - api_key = f"mcpgw_{secrets.token_urlsafe(32)}" - - # Store key metadata (hash the key so we don't store plain text) - key_hash = hashlib.sha256(api_key.encode()).hexdigest() - - expires_at = None - if ttl_days: - expires_at = (datetime.utcnow().timestamp() + (ttl_days * 86400)) - - self.api_keys[key_hash] = { - 'user_id': self.users[username]['user_id'], - 'username': username, - 'key_name': key_name or f"key-{int(time.time())}", - 'created_at': datetime.utcnow().isoformat(), - 'expires_at': expires_at, - 'last_used': None, - 'revoked': False, - 'revoked_at': None - } - - # Track key in user record - self.users[username]['api_keys'].append(key_hash) - - self.save_to_disk() - return api_key # Return the actual key (not hash) - - def revoke_api_key(self, api_key: str): - """Revoke an API key.""" - key_hash = hashlib.sha256(api_key.encode()).hexdigest() - - if key_hash not in self.api_keys: - raise ValueError("API key not found") - - self.api_keys[key_hash]['revoked'] = True - self.api_keys[key_hash]['revoked_at'] = datetime.utcnow().isoformat() - - self.save_to_disk() - - def validate_api_key(self, api_key: str) -> Optional[dict]: - """Validate an API key and return user info if valid.""" - key_hash = hashlib.sha256(api_key.encode()).hexdigest() - - if key_hash not in self.api_keys: - return None - - key_info = self.api_keys[key_hash] - - # Check if revoked - if key_info['revoked']: - return None - - # Check if expired - if key_info['expires_at'] and key_info['expires_at'] < time.time(): - return None - - # Update last_used - key_info['last_used'] = datetime.utcnow().isoformat() - - # Get user info - username = key_info['username'] - if username not in self.users: - return None - - user = self.users[username] - - # Check if user is enabled - if not user['enabled']: - return None - - self.save_to_disk() - - return { - 'user_id': user['user_id'], - 'username': username, - 'email': user['email'], - 'mcp_allowed': user['mcp_allowed'], - 'mcp_blocked': user['mcp_blocked'], - 'key_name': key_info['key_name'] - } - - def set_mcp_access(self, username: str, allowed_mcps: list = None, blocked_mcps: list = None): - """Configure which MCPs a user can access.""" - if username not in self.users: - raise ValueError(f"User '{username}' not found") - - if allowed_mcps is not None: - self.users[username]['mcp_allowed'] = allowed_mcps - - if blocked_mcps is not None: - self.users[username]['mcp_blocked'] = blocked_mcps - - self.save_to_disk() - - def can_access_mcp(self, username: str, mcp_name: str) -> bool: - """Check if a user can access a specific MCP.""" - if username not in self.users: - return False - - user = self.users[username] - - # Check if blocked - if mcp_name in user['mcp_blocked']: - return False - - # Check if allowed (empty list = allow all) - if user['mcp_allowed'] and mcp_name not in user['mcp_allowed']: - return False - - return True - - def list_users(self) -> list: - """Get all users with sanitized info (no keys).""" - return [ - { - 'username': username, - 'user_id': user['user_id'], - 'email': user['email'], - 'description': user['description'], - 'created_at': user['created_at'], - 'enabled': user['enabled'], - 'api_key_count': len([k for k in user['api_keys'] if not self.api_keys[k]['revoked']]), - 'mcp_allowed': user['mcp_allowed'], - 'mcp_blocked': user['mcp_blocked'], - 'metadata': user['metadata'] - } - for username, user in self.users.items() - ] - - def get_user_keys(self, username: str) -> list: - """Get all API keys for a user (sanitized, no secrets).""" - if username not in self.users: - return [] - - user = self.users[username] - keys = [] - - for key_hash in user['api_keys']: - if key_hash in self.api_keys: - key_info = self.api_keys[key_hash] - keys.append({ - 'key_name': key_info['key_name'], - 'key_hash': key_hash[:8] + '****', # Show first 8 chars - 'created_at': key_info['created_at'], - 'expires_at': key_info['expires_at'], - 'last_used': key_info['last_used'], - 'revoked': key_info['revoked'], - 'revoked_at': key_info['revoked_at'] - }) - - return keys - - def toggle_user(self, username: str, enabled: bool): - """Enable or disable a user.""" - if username not in self.users: - raise ValueError(f"User '{username}' not found") - - self.users[username]['enabled'] = enabled - self.save_to_disk() - - -# Global instance -user_manager = UserManager(storage_path=os.environ.get("USERS_DB_PATH", "/data/users.json"))