338 lines
11 KiB
Python
338 lines
11 KiB
Python
"""
|
|
OpenAI-Compatible Routes for MCP Gateway (FIXED)
|
|
Properly converts MCP tool schemas to OpenAI function format.
|
|
Fixes schema issues that prevent tool discovery in OpenUI.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import hashlib
|
|
from datetime import datetime
|
|
from typing import Optional, Any
|
|
|
|
import httpx
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse, StreamingResponse
|
|
|
|
logger = logging.getLogger("mcp-gateway.openai")
|
|
|
|
|
|
def _simplify_schema(schema: dict) -> dict:
|
|
"""
|
|
Convert complex JSON schema to OpenAI function parameter schema.
|
|
Handles:
|
|
- type as list (["string", "null"]) -> "string"
|
|
- anyOf/oneOf -> simplified to first valid option
|
|
- Removes unsupported keywords
|
|
"""
|
|
if not isinstance(schema, dict):
|
|
return {}
|
|
|
|
simplified = {}
|
|
|
|
# Handle type field - prefer string if it's a list
|
|
if "type" in schema:
|
|
type_val = schema["type"]
|
|
if isinstance(type_val, list):
|
|
# Take first non-null type, default to "string"
|
|
simplified["type"] = next((t for t in type_val if t != "null"), "string")
|
|
else:
|
|
simplified["type"] = type_val
|
|
|
|
# Copy allowed fields
|
|
for key in ["description", "enum", "default", "pattern", "minimum", "maximum", "minLength", "maxLength"]:
|
|
if key in schema:
|
|
simplified[key] = schema[key]
|
|
|
|
# Handle properties for objects
|
|
if "properties" in schema and isinstance(schema["properties"], dict):
|
|
simplified["properties"] = {
|
|
k: _simplify_schema(v) if isinstance(v, dict) else v
|
|
for k, v in schema["properties"].items()
|
|
}
|
|
|
|
# Handle items for arrays
|
|
if "items" in schema:
|
|
items = schema["items"]
|
|
if isinstance(items, dict):
|
|
simplified["items"] = _simplify_schema(items)
|
|
elif isinstance(items, list):
|
|
# Tuple validation - simplify to single item schema
|
|
simplified["items"] = {"type": "string"}
|
|
|
|
# Handle anyOf/oneOf - take first valid option
|
|
for key in ("anyOf", "oneOf"):
|
|
if key in schema and isinstance(schema[key], list):
|
|
valid_options = [opt for opt in schema[key] if isinstance(opt, dict)]
|
|
if valid_options:
|
|
simplified.update(_simplify_schema(valid_options[0]))
|
|
break # Process only one
|
|
|
|
# Set default type if none exists
|
|
if "type" not in simplified and "properties" in simplified:
|
|
simplified["type"] = "object"
|
|
elif "type" not in simplified:
|
|
simplified["type"] = "string"
|
|
|
|
return simplified
|
|
|
|
|
|
def convert_mcp_tool_to_openai(mcp_tool: dict) -> dict:
|
|
"""
|
|
Convert MCP tool definition to OpenAI function schema.
|
|
|
|
MCP format:
|
|
{
|
|
"name": "string",
|
|
"description": "string",
|
|
"inputSchema": { "type": "object", "properties": {...}, "required": [...] }
|
|
}
|
|
|
|
OpenAI format:
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "string",
|
|
"description": "string",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {...},
|
|
"required": [...]
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
try:
|
|
name = mcp_tool.get("name", "unknown_tool")
|
|
description = mcp_tool.get("description", "")
|
|
|
|
# Get and simplify input schema
|
|
input_schema = mcp_tool.get("inputSchema", {})
|
|
|
|
if isinstance(input_schema, dict):
|
|
properties = input_schema.get("properties", {})
|
|
required = input_schema.get("required", [])
|
|
else:
|
|
properties = {}
|
|
required = []
|
|
|
|
# Simplify each property schema
|
|
simplified_properties = {}
|
|
for prop_name, prop_schema in properties.items():
|
|
if isinstance(prop_schema, dict):
|
|
simplified_properties[prop_name] = _simplify_schema(prop_schema)
|
|
else:
|
|
simplified_properties[prop_name] = {"type": "string"}
|
|
|
|
return {
|
|
"type": "function",
|
|
"function": {
|
|
"name": name,
|
|
"description": description,
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": simplified_properties,
|
|
"required": required if isinstance(required, list) else []
|
|
}
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error converting tool {mcp_tool.get('name', 'unknown')}: {e}")
|
|
# Return minimal valid function
|
|
return {
|
|
"type": "function",
|
|
"function": {
|
|
"name": mcp_tool.get("name", "unknown_tool"),
|
|
"description": f"Tool (conversion error: {str(e)[:50]})",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {},
|
|
"required": []
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
async def list_models(request: Request) -> JSONResponse:
|
|
"""List available models from MCP Gateway (OpenAI compatible)"""
|
|
try:
|
|
return JSONResponse({
|
|
"object": "list",
|
|
"data": [
|
|
{
|
|
"id": "mcp-gateway",
|
|
"object": "model",
|
|
"owned_by": "mcp-gateway",
|
|
"permission": [
|
|
{
|
|
"id": "modelperm-1",
|
|
"object": "model_permission",
|
|
"created": int(datetime.now().timestamp()),
|
|
"allow_create_engine": False,
|
|
"allow_sampling": True,
|
|
"allow_logprobs": False,
|
|
"allow_search_indices": False,
|
|
"allow_view": True,
|
|
"allow_fine_tuning": False,
|
|
"organization": "*",
|
|
"group_id": None,
|
|
"is_blocking": False
|
|
}
|
|
],
|
|
"created": 1677649963,
|
|
"parent_model": None,
|
|
"root": "mcp-gateway",
|
|
"root_owner": "mcp-gateway"
|
|
}
|
|
]
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error listing models: {e}")
|
|
return JSONResponse(
|
|
{"error": {"message": str(e)}},
|
|
status_code=500
|
|
)
|
|
|
|
|
|
async def tools(request: Request, tool_definitions: dict) -> JSONResponse:
|
|
"""
|
|
GET /v1/tools
|
|
Return all available tools in OpenAI function schema format.
|
|
"""
|
|
try:
|
|
tools_list = []
|
|
for tool_def in tool_definitions.values():
|
|
try:
|
|
openai_tool = convert_mcp_tool_to_openai(tool_def)
|
|
tools_list.append(openai_tool)
|
|
except Exception as e:
|
|
logger.warning(f"Skipping tool due to conversion error: {e}")
|
|
continue
|
|
|
|
return JSONResponse({
|
|
"object": "list",
|
|
"data": tools_list
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error listing tools: {e}")
|
|
return JSONResponse(
|
|
{"error": {"message": str(e)}},
|
|
status_code=500
|
|
)
|
|
|
|
|
|
async def chat_completions(request: Request, tool_definitions: dict) -> JSONResponse | StreamingResponse:
|
|
"""
|
|
POST /v1/chat/completions
|
|
OpenAI-compatible chat completions endpoint with tools support.
|
|
"""
|
|
try:
|
|
body = await request.json()
|
|
messages = body.get("messages", [])
|
|
model = body.get("model", "mcp-gateway")
|
|
stream = body.get("stream", False)
|
|
|
|
# Convert MCP tools to OpenAI format
|
|
tools_list = []
|
|
for tool_def in tool_definitions.values():
|
|
try:
|
|
openai_tool = convert_mcp_tool_to_openai(tool_def)
|
|
tools_list.append(openai_tool)
|
|
except Exception as e:
|
|
logger.warning(f"Skipping tool: {e}")
|
|
continue
|
|
|
|
# Extract the latest user message
|
|
user_message = None
|
|
for msg in reversed(messages):
|
|
if msg.get("role") == "user":
|
|
user_message = msg.get("content", "")
|
|
break
|
|
|
|
if not user_message:
|
|
return JSONResponse(
|
|
{"error": {"message": "No user message found"}},
|
|
status_code=400
|
|
)
|
|
|
|
# Build response
|
|
response = {
|
|
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
|
"object": "chat.completion",
|
|
"created": int(datetime.now().timestamp()),
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": f"Available {len(tools_list)} tools from MCP Gateway",
|
|
},
|
|
"finish_reason": "stop"
|
|
}
|
|
],
|
|
"usage": {
|
|
"prompt_tokens": len(user_message.split()),
|
|
"completion_tokens": 10,
|
|
"total_tokens": len(user_message.split()) + 10
|
|
}
|
|
}
|
|
|
|
# Add tools to response if they were requested
|
|
if body.get("tools"):
|
|
response["choices"][0]["message"]["tool_calls"] = []
|
|
|
|
if stream:
|
|
return StreamingResponse(
|
|
_stream_response(response),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache"}
|
|
)
|
|
else:
|
|
return JSONResponse(response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in chat_completions: {e}")
|
|
return JSONResponse(
|
|
{"error": {"message": str(e)}},
|
|
status_code=500
|
|
)
|
|
|
|
|
|
async def _stream_response(response: dict):
|
|
"""Generate streaming response chunks"""
|
|
choice = response["choices"][0]
|
|
|
|
# Send initial chunk
|
|
chunk = {
|
|
"id": response["id"],
|
|
"object": "chat.completion.chunk",
|
|
"created": response["created"],
|
|
"model": response["model"],
|
|
"choices": [
|
|
{
|
|
"index": choice["index"],
|
|
"delta": {"role": "assistant", "content": choice["message"]["content"]},
|
|
"finish_reason": None
|
|
}
|
|
]
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
|
|
# Send final chunk
|
|
final_chunk = {
|
|
"id": response["id"],
|
|
"object": "chat.completion.chunk",
|
|
"created": response["created"],
|
|
"model": response["model"],
|
|
"choices": [
|
|
{
|
|
"index": choice["index"],
|
|
"delta": {},
|
|
"finish_reason": "stop"
|
|
}
|
|
]
|
|
}
|
|
yield f"data: {json.dumps(final_chunk)}\n\n"
|
|
yield "data: [DONE]\n\n"
|