""" OpenAI-Compatible Routes for MCP Gateway (FIXED) Properly converts MCP tool schemas to OpenAI function format. Fixes schema issues that prevent tool discovery in OpenUI. """ import json import logging import uuid import hashlib from datetime import datetime from typing import Optional, Any import httpx from starlette.requests import Request from starlette.responses import JSONResponse, StreamingResponse logger = logging.getLogger("mcp-gateway.openai") def _simplify_schema(schema: dict) -> dict: """ Convert complex JSON schema to OpenAI function parameter schema. Handles: - type as list (["string", "null"]) -> "string" - anyOf/oneOf -> simplified to first valid option - Removes unsupported keywords """ if not isinstance(schema, dict): return {} simplified = {} # Handle type field - prefer string if it's a list if "type" in schema: type_val = schema["type"] if isinstance(type_val, list): # Take first non-null type, default to "string" simplified["type"] = next((t for t in type_val if t != "null"), "string") else: simplified["type"] = type_val # Copy allowed fields for key in ["description", "enum", "default", "pattern", "minimum", "maximum", "minLength", "maxLength"]: if key in schema: simplified[key] = schema[key] # Handle properties for objects if "properties" in schema and isinstance(schema["properties"], dict): simplified["properties"] = { k: _simplify_schema(v) if isinstance(v, dict) else v for k, v in schema["properties"].items() } # Handle items for arrays if "items" in schema: items = schema["items"] if isinstance(items, dict): simplified["items"] = _simplify_schema(items) elif isinstance(items, list): # Tuple validation - simplify to single item schema simplified["items"] = {"type": "string"} # Handle anyOf/oneOf - take first valid option for key in ("anyOf", "oneOf"): if key in schema and isinstance(schema[key], list): valid_options = [opt for opt in schema[key] if isinstance(opt, dict)] if valid_options: simplified.update(_simplify_schema(valid_options[0])) break # Process only one # Set default type if none exists if "type" not in simplified and "properties" in simplified: simplified["type"] = "object" elif "type" not in simplified: simplified["type"] = "string" return simplified def convert_mcp_tool_to_openai(mcp_tool: dict) -> dict: """ Convert MCP tool definition to OpenAI function schema. MCP format: { "name": "string", "description": "string", "inputSchema": { "type": "object", "properties": {...}, "required": [...] } } OpenAI format: { "type": "function", "function": { "name": "string", "description": "string", "parameters": { "type": "object", "properties": {...}, "required": [...] } } } """ try: name = mcp_tool.get("name", "unknown_tool") description = mcp_tool.get("description", "") # Get and simplify input schema input_schema = mcp_tool.get("inputSchema", {}) if isinstance(input_schema, dict): properties = input_schema.get("properties", {}) required = input_schema.get("required", []) else: properties = {} required = [] # Simplify each property schema simplified_properties = {} for prop_name, prop_schema in properties.items(): if isinstance(prop_schema, dict): simplified_properties[prop_name] = _simplify_schema(prop_schema) else: simplified_properties[prop_name] = {"type": "string"} return { "type": "function", "function": { "name": name, "description": description, "parameters": { "type": "object", "properties": simplified_properties, "required": required if isinstance(required, list) else [] } } } except Exception as e: logger.error(f"Error converting tool {mcp_tool.get('name', 'unknown')}: {e}") # Return minimal valid function return { "type": "function", "function": { "name": mcp_tool.get("name", "unknown_tool"), "description": f"Tool (conversion error: {str(e)[:50]})", "parameters": { "type": "object", "properties": {}, "required": [] } } } async def list_models(request: Request) -> JSONResponse: """List available models from MCP Gateway (OpenAI compatible)""" try: return JSONResponse({ "object": "list", "data": [ { "id": "mcp-gateway", "object": "model", "owned_by": "mcp-gateway", "permission": [ { "id": "modelperm-1", "object": "model_permission", "created": int(datetime.now().timestamp()), "allow_create_engine": False, "allow_sampling": True, "allow_logprobs": False, "allow_search_indices": False, "allow_view": True, "allow_fine_tuning": False, "organization": "*", "group_id": None, "is_blocking": False } ], "created": 1677649963, "parent_model": None, "root": "mcp-gateway", "root_owner": "mcp-gateway" } ] }) except Exception as e: logger.error(f"Error listing models: {e}") return JSONResponse( {"error": {"message": str(e)}}, status_code=500 ) async def tools(request: Request, tool_definitions: dict) -> JSONResponse: """ GET /v1/tools Return all available tools in OpenAI function schema format. """ try: tools_list = [] for tool_def in tool_definitions.values(): try: openai_tool = convert_mcp_tool_to_openai(tool_def) tools_list.append(openai_tool) except Exception as e: logger.warning(f"Skipping tool due to conversion error: {e}") continue return JSONResponse({ "object": "list", "data": tools_list }) except Exception as e: logger.error(f"Error listing tools: {e}") return JSONResponse( {"error": {"message": str(e)}}, status_code=500 ) async def chat_completions(request: Request, tool_definitions: dict) -> JSONResponse | StreamingResponse: """ POST /v1/chat/completions OpenAI-compatible chat completions endpoint with tools support. """ try: body = await request.json() messages = body.get("messages", []) model = body.get("model", "mcp-gateway") stream = body.get("stream", False) # Convert MCP tools to OpenAI format tools_list = [] for tool_def in tool_definitions.values(): try: openai_tool = convert_mcp_tool_to_openai(tool_def) tools_list.append(openai_tool) except Exception as e: logger.warning(f"Skipping tool: {e}") continue # Extract the latest user message user_message = None for msg in reversed(messages): if msg.get("role") == "user": user_message = msg.get("content", "") break if not user_message: return JSONResponse( {"error": {"message": "No user message found"}}, status_code=400 ) # Build response response = { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": f"Available {len(tools_list)} tools from MCP Gateway", }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(user_message.split()), "completion_tokens": 10, "total_tokens": len(user_message.split()) + 10 } } # Add tools to response if they were requested if body.get("tools"): response["choices"][0]["message"]["tool_calls"] = [] if stream: return StreamingResponse( _stream_response(response), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) else: return JSONResponse(response) except Exception as e: logger.error(f"Error in chat_completions: {e}") return JSONResponse( {"error": {"message": str(e)}}, status_code=500 ) async def _stream_response(response: dict): """Generate streaming response chunks""" choice = response["choices"][0] # Send initial chunk chunk = { "id": response["id"], "object": "chat.completion.chunk", "created": response["created"], "model": response["model"], "choices": [ { "index": choice["index"], "delta": {"role": "assistant", "content": choice["message"]["content"]}, "finish_reason": None } ] } yield f"data: {json.dumps(chunk)}\n\n" # Send final chunk final_chunk = { "id": response["id"], "object": "chat.completion.chunk", "created": response["created"], "model": response["model"], "choices": [ { "index": choice["index"], "delta": {}, "finish_reason": "stop" } ] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n"