264 lines
8.8 KiB
Python
264 lines
8.8 KiB
Python
"""
|
|
User Management System for MCP Gateway
|
|
======================================
|
|
Handles:
|
|
- User creation and deletion
|
|
- API key generation and revocation
|
|
- Per-user MCP access control (allow/block lists)
|
|
- Persistent storage with JSON
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import secrets
|
|
import time
|
|
import hashlib
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
class UserManager:
|
|
"""Manages users, API keys, and MCP access permissions."""
|
|
|
|
def __init__(self, storage_path: str = "/data/users.json"):
|
|
self.storage_path = storage_path
|
|
self.users = {}
|
|
self.api_keys = {} # api_key -> {user_id, created_at, revoked}
|
|
self.load_from_disk()
|
|
|
|
def load_from_disk(self):
|
|
"""Load users and API keys from persistent storage."""
|
|
if os.path.exists(self.storage_path):
|
|
try:
|
|
with open(self.storage_path, 'r') as f:
|
|
data = json.load(f)
|
|
self.users = data.get('users', {})
|
|
self.api_keys = data.get('api_keys', {})
|
|
except Exception as e:
|
|
print(f"Warning: Failed to load users from {self.storage_path}: {e}")
|
|
self.users = {}
|
|
self.api_keys = {}
|
|
else:
|
|
# Create storage directory
|
|
os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True)
|
|
self.save_to_disk()
|
|
|
|
def save_to_disk(self):
|
|
"""Persist users and API keys to storage."""
|
|
try:
|
|
os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True)
|
|
with open(self.storage_path, 'w') as f:
|
|
json.dump({
|
|
'users': self.users,
|
|
'api_keys': self.api_keys,
|
|
'last_updated': datetime.utcnow().isoformat()
|
|
}, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error saving users to disk: {e}")
|
|
|
|
def create_user(self, username: str, email: str = "", description: str = "") -> dict:
|
|
"""Create a new user."""
|
|
if username in self.users:
|
|
raise ValueError(f"User '{username}' already exists")
|
|
|
|
user_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()[:16]
|
|
|
|
self.users[username] = {
|
|
'user_id': user_id,
|
|
'email': email,
|
|
'description': description,
|
|
'created_at': datetime.utcnow().isoformat(),
|
|
'last_used': None,
|
|
'api_keys': [], # List of API key hashes
|
|
'mcp_allowed': list(os.environ.get('MCP_BACKEND_', '').split(',')) or [], # Default: allow all
|
|
'mcp_blocked': [], # MCPs to explicitly block
|
|
'enabled': True,
|
|
'metadata': {}
|
|
}
|
|
|
|
self.save_to_disk()
|
|
return self.users[username]
|
|
|
|
def delete_user(self, username: str):
|
|
"""Delete a user and all their API keys."""
|
|
if username not in self.users:
|
|
raise ValueError(f"User '{username}' not found")
|
|
|
|
# Revoke all API keys for this user
|
|
user_id = self.users[username]['user_id']
|
|
keys_to_remove = [k for k, v in self.api_keys.items() if v['user_id'] == user_id]
|
|
for key in keys_to_remove:
|
|
del self.api_keys[key]
|
|
|
|
del self.users[username]
|
|
self.save_to_disk()
|
|
|
|
def generate_api_key(self, username: str, key_name: str = "", ttl_days: Optional[int] = None) -> str:
|
|
"""Generate a new API key for a user."""
|
|
if username not in self.users:
|
|
raise ValueError(f"User '{username}' not found")
|
|
|
|
# Generate a random key
|
|
api_key = f"mcpgw_{secrets.token_urlsafe(32)}"
|
|
|
|
# Store key metadata (hash the key so we don't store plain text)
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
|
|
|
|
expires_at = None
|
|
if ttl_days:
|
|
expires_at = (datetime.utcnow().timestamp() + (ttl_days * 86400))
|
|
|
|
self.api_keys[key_hash] = {
|
|
'user_id': self.users[username]['user_id'],
|
|
'username': username,
|
|
'key_name': key_name or f"key-{int(time.time())}",
|
|
'created_at': datetime.utcnow().isoformat(),
|
|
'expires_at': expires_at,
|
|
'last_used': None,
|
|
'revoked': False,
|
|
'revoked_at': None
|
|
}
|
|
|
|
# Track key in user record
|
|
self.users[username]['api_keys'].append(key_hash)
|
|
|
|
self.save_to_disk()
|
|
return api_key # Return the actual key (not hash)
|
|
|
|
def revoke_api_key(self, api_key: str):
|
|
"""Revoke an API key."""
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
|
|
|
|
if key_hash not in self.api_keys:
|
|
raise ValueError("API key not found")
|
|
|
|
self.api_keys[key_hash]['revoked'] = True
|
|
self.api_keys[key_hash]['revoked_at'] = datetime.utcnow().isoformat()
|
|
|
|
self.save_to_disk()
|
|
|
|
def validate_api_key(self, api_key: str) -> Optional[dict]:
|
|
"""Validate an API key and return user info if valid."""
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
|
|
|
|
if key_hash not in self.api_keys:
|
|
return None
|
|
|
|
key_info = self.api_keys[key_hash]
|
|
|
|
# Check if revoked
|
|
if key_info['revoked']:
|
|
return None
|
|
|
|
# Check if expired
|
|
if key_info['expires_at'] and key_info['expires_at'] < time.time():
|
|
return None
|
|
|
|
# Update last_used
|
|
key_info['last_used'] = datetime.utcnow().isoformat()
|
|
|
|
# Get user info
|
|
username = key_info['username']
|
|
if username not in self.users:
|
|
return None
|
|
|
|
user = self.users[username]
|
|
|
|
# Check if user is enabled
|
|
if not user['enabled']:
|
|
return None
|
|
|
|
self.save_to_disk()
|
|
|
|
return {
|
|
'user_id': user['user_id'],
|
|
'username': username,
|
|
'email': user['email'],
|
|
'mcp_allowed': user['mcp_allowed'],
|
|
'mcp_blocked': user['mcp_blocked'],
|
|
'key_name': key_info['key_name']
|
|
}
|
|
|
|
def set_mcp_access(self, username: str, allowed_mcps: list = None, blocked_mcps: list = None):
|
|
"""Configure which MCPs a user can access."""
|
|
if username not in self.users:
|
|
raise ValueError(f"User '{username}' not found")
|
|
|
|
if allowed_mcps is not None:
|
|
self.users[username]['mcp_allowed'] = allowed_mcps
|
|
|
|
if blocked_mcps is not None:
|
|
self.users[username]['mcp_blocked'] = blocked_mcps
|
|
|
|
self.save_to_disk()
|
|
|
|
def can_access_mcp(self, username: str, mcp_name: str) -> bool:
|
|
"""Check if a user can access a specific MCP."""
|
|
if username not in self.users:
|
|
return False
|
|
|
|
user = self.users[username]
|
|
|
|
# Check if blocked
|
|
if mcp_name in user['mcp_blocked']:
|
|
return False
|
|
|
|
# Check if allowed (empty list = allow all)
|
|
if user['mcp_allowed'] and mcp_name not in user['mcp_allowed']:
|
|
return False
|
|
|
|
return True
|
|
|
|
def list_users(self) -> list:
|
|
"""Get all users with sanitized info (no keys)."""
|
|
return [
|
|
{
|
|
'username': username,
|
|
'user_id': user['user_id'],
|
|
'email': user['email'],
|
|
'description': user['description'],
|
|
'created_at': user['created_at'],
|
|
'enabled': user['enabled'],
|
|
'api_key_count': len([k for k in user['api_keys'] if not self.api_keys[k]['revoked']]),
|
|
'mcp_allowed': user['mcp_allowed'],
|
|
'mcp_blocked': user['mcp_blocked'],
|
|
'metadata': user['metadata']
|
|
}
|
|
for username, user in self.users.items()
|
|
]
|
|
|
|
def get_user_keys(self, username: str) -> list:
|
|
"""Get all API keys for a user (sanitized, no secrets)."""
|
|
if username not in self.users:
|
|
return []
|
|
|
|
user = self.users[username]
|
|
keys = []
|
|
|
|
for key_hash in user['api_keys']:
|
|
if key_hash in self.api_keys:
|
|
key_info = self.api_keys[key_hash]
|
|
keys.append({
|
|
'key_name': key_info['key_name'],
|
|
'key_hash': key_hash[:8] + '****', # Show first 8 chars
|
|
'created_at': key_info['created_at'],
|
|
'expires_at': key_info['expires_at'],
|
|
'last_used': key_info['last_used'],
|
|
'revoked': key_info['revoked'],
|
|
'revoked_at': key_info['revoked_at']
|
|
})
|
|
|
|
return keys
|
|
|
|
def toggle_user(self, username: str, enabled: bool):
|
|
"""Enable or disable a user."""
|
|
if username not in self.users:
|
|
raise ValueError(f"User '{username}' not found")
|
|
|
|
self.users[username]['enabled'] = enabled
|
|
self.save_to_disk()
|
|
|
|
|
|
# Global instance
|
|
user_manager = UserManager(storage_path=os.environ.get("USERS_DB_PATH", "/data/users.json"))
|