mcp-servers/gateway-proxy/openai_routes_fixed.py

339 lines
11 KiB
Python
Raw Normal View History

"""
OpenAI-Compatible Routes for MCP Gateway (FIXED)
Properly converts MCP tool schemas to OpenAI function format.
Fixes schema issues that prevent tool discovery in OpenUI.
"""
import json
import logging
import uuid
import hashlib
from datetime import datetime
from typing import Optional, Any
import httpx
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
logger = logging.getLogger("mcp-gateway.openai")
def _simplify_schema(schema: dict) -> dict:
"""
Convert complex JSON schema to OpenAI function parameter schema.
Handles:
- type as list (["string", "null"]) -> "string"
- anyOf/oneOf -> simplified to first valid option
- Removes unsupported keywords
"""
if not isinstance(schema, dict):
return {}
simplified = {}
# Handle type field - prefer string if it's a list
if "type" in schema:
type_val = schema["type"]
if isinstance(type_val, list):
# Take first non-null type, default to "string"
simplified["type"] = next((t for t in type_val if t != "null"), "string")
else:
simplified["type"] = type_val
# Copy allowed fields
for key in ["description", "enum", "default", "pattern", "minimum", "maximum", "minLength", "maxLength"]:
if key in schema:
simplified[key] = schema[key]
# Handle properties for objects
if "properties" in schema and isinstance(schema["properties"], dict):
simplified["properties"] = {
k: _simplify_schema(v) if isinstance(v, dict) else v
for k, v in schema["properties"].items()
}
# Handle items for arrays
if "items" in schema:
items = schema["items"]
if isinstance(items, dict):
simplified["items"] = _simplify_schema(items)
elif isinstance(items, list):
# Tuple validation - simplify to single item schema
simplified["items"] = {"type": "string"}
# Handle anyOf/oneOf - take first valid option
for key in ("anyOf", "oneOf"):
if key in schema and isinstance(schema[key], list):
valid_options = [opt for opt in schema[key] if isinstance(opt, dict)]
if valid_options:
simplified.update(_simplify_schema(valid_options[0]))
break # Process only one
# Set default type if none exists
if "type" not in simplified and "properties" in simplified:
simplified["type"] = "object"
elif "type" not in simplified:
simplified["type"] = "string"
return simplified
def convert_mcp_tool_to_openai(mcp_tool: dict) -> dict:
"""
Convert MCP tool definition to OpenAI function schema.
MCP format:
{
"name": "string",
"description": "string",
"inputSchema": { "type": "object", "properties": {...}, "required": [...] }
}
OpenAI format:
{
"type": "function",
"function": {
"name": "string",
"description": "string",
"parameters": {
"type": "object",
"properties": {...},
"required": [...]
}
}
}
"""
try:
name = mcp_tool.get("name", "unknown_tool")
description = mcp_tool.get("description", "")
# Get and simplify input schema
input_schema = mcp_tool.get("inputSchema", {})
if isinstance(input_schema, dict):
properties = input_schema.get("properties", {})
required = input_schema.get("required", [])
else:
properties = {}
required = []
# Simplify each property schema
simplified_properties = {}
for prop_name, prop_schema in properties.items():
if isinstance(prop_schema, dict):
simplified_properties[prop_name] = _simplify_schema(prop_schema)
else:
simplified_properties[prop_name] = {"type": "string"}
return {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": {
"type": "object",
"properties": simplified_properties,
"required": required if isinstance(required, list) else []
}
}
}
except Exception as e:
logger.error(f"Error converting tool {mcp_tool.get('name', 'unknown')}: {e}")
# Return minimal valid function
return {
"type": "function",
"function": {
"name": mcp_tool.get("name", "unknown_tool"),
"description": f"Tool (conversion error: {str(e)[:50]})",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}
async def list_models(request: Request) -> JSONResponse:
"""List available models from MCP Gateway (OpenAI compatible)"""
try:
return JSONResponse({
"object": "list",
"data": [
{
"id": "mcp-gateway",
"object": "model",
"owned_by": "mcp-gateway",
"permission": [
{
"id": "modelperm-1",
"object": "model_permission",
"created": int(datetime.now().timestamp()),
"allow_create_engine": False,
"allow_sampling": True,
"allow_logprobs": False,
"allow_search_indices": False,
"allow_view": True,
"allow_fine_tuning": False,
"organization": "*",
"group_id": None,
"is_blocking": False
}
],
"created": 1677649963,
"parent_model": None,
"root": "mcp-gateway",
"root_owner": "mcp-gateway"
}
]
})
except Exception as e:
logger.error(f"Error listing models: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
async def tools(request: Request, tool_definitions: dict) -> JSONResponse:
"""
GET /v1/tools
Return all available tools in OpenAI function schema format.
"""
try:
tools_list = []
for tool_def in tool_definitions.values():
try:
openai_tool = convert_mcp_tool_to_openai(tool_def)
tools_list.append(openai_tool)
except Exception as e:
logger.warning(f"Skipping tool due to conversion error: {e}")
continue
return JSONResponse({
"object": "list",
"data": tools_list
})
except Exception as e:
logger.error(f"Error listing tools: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
async def chat_completions(request: Request, tool_definitions: dict) -> JSONResponse | StreamingResponse:
"""
POST /v1/chat/completions
OpenAI-compatible chat completions endpoint with tools support.
"""
try:
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "mcp-gateway")
stream = body.get("stream", False)
# Convert MCP tools to OpenAI format
tools_list = []
for tool_def in tool_definitions.values():
try:
openai_tool = convert_mcp_tool_to_openai(tool_def)
tools_list.append(openai_tool)
except Exception as e:
logger.warning(f"Skipping tool: {e}")
continue
# Extract the latest user message
user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
if not user_message:
return JSONResponse(
{"error": {"message": "No user message found"}},
status_code=400
)
# Build response
response = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": f"Available {len(tools_list)} tools from MCP Gateway",
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": len(user_message.split()),
"completion_tokens": 10,
"total_tokens": len(user_message.split()) + 10
}
}
# Add tools to response if they were requested
if body.get("tools"):
response["choices"][0]["message"]["tool_calls"] = []
if stream:
return StreamingResponse(
_stream_response(response),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)
else:
return JSONResponse(response)
except Exception as e:
logger.error(f"Error in chat_completions: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
async def _stream_response(response: dict):
"""Generate streaming response chunks"""
choice = response["choices"][0]
# Send initial chunk
chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {"role": "assistant", "content": choice["message"]["content"]},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Send final chunk
final_chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {},
"finish_reason": "stop"
}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"