Remove mcp-gateway/openai_adapter.py

This commit is contained in:
Zac Gaetano 2026-03-31 15:33:14 -04:00
parent dfddbf153b
commit 3c723fc055

View file

@ -1,230 +0,0 @@
"""
OpenAI-Compatible Adapter for MCP Gateway
==========================================
Wraps the MCP Gateway to provide OpenAI-compatible /v1/chat/completions endpoint.
Allows Open-UI and other OpenAI-compatible clients to use MCP tools.
"""
import asyncio
import json
import logging
from typing import Optional, Any
from datetime import datetime
import uuid
import httpx
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
logger = logging.getLogger("openai-adapter")
class OpenAIAdapter:
"""Converts between OpenAI API format and MCP protocol"""
def __init__(self, mcp_gateway_url: str = "http://localhost:4444/mcp"):
self.mcp_gateway_url = mcp_gateway_url
self.model_name = "mcp-gateway"
async def get_mcp_tools(self, session_id: Optional[str] = None) -> list[dict]:
"""Fetch available tools from MCP gateway"""
try:
async with httpx.AsyncClient(timeout=10) as client:
payload = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 1
}
headers = {"Content-Type": "application/json"}
if session_id:
headers["Mcp-Session-Id"] = session_id
resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers)
data = resp.json()
tools = []
if "result" in data and "tools" in data["result"]:
for tool in data["result"]["tools"]:
# Convert MCP tool to OpenAI function format
tools.append(self._convert_mcp_tool_to_openai(tool))
return tools
except Exception as e:
logger.error(f"Error fetching MCP tools: {e}")
return []
def _convert_mcp_tool_to_openai(self, mcp_tool: dict) -> dict:
"""Convert MCP tool definition to OpenAI function schema"""
return {
"type": "function",
"function": {
"name": mcp_tool.get("name", "unknown_tool"),
"description": mcp_tool.get("description", ""),
"parameters": {
"type": "object",
"properties": mcp_tool.get("inputSchema", {}).get("properties", {}),
"required": mcp_tool.get("inputSchema", {}).get("required", [])
}
}
}
async def call_mcp_tool(self, tool_name: str, tool_input: dict, session_id: Optional[str] = None) -> dict:
"""Call a tool via MCP gateway"""
try:
async with httpx.AsyncClient(timeout=30) as client:
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": json.dumps(tool_input)
},
"id": 1
}
headers = {"Content-Type": "application/json"}
if session_id:
headers["Mcp-Session-Id"] = session_id
resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers)
data = resp.json()
if "result" in data:
return {
"success": True,
"content": data["result"].get("content", []),
"isError": data["result"].get("isError", False)
}
else:
return {
"success": False,
"error": data.get("error", {}).get("message", "Unknown error")
}
except Exception as e:
logger.error(f"Error calling MCP tool {tool_name}: {e}")
return {
"success": False,
"error": str(e)
}
async def chat_completions(self, request: Request) -> Any:
"""Handle OpenAI-compatible chat completions request"""
try:
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", self.model_name)
stream = body.get("stream", False)
tools = body.get("tools", [])
tool_choice = body.get("tool_choice", "auto")
# Get available MCP tools if not provided
if not tools:
mcp_tools = await self.get_mcp_tools()
tools = mcp_tools
# Build MCP request
system_prompt = self._build_system_prompt(tools)
user_message = messages[-1].get("content", "") if messages else ""
# For now, return a simple response that lists available tools
# In a full implementation, you'd use an LLM to process the conversation
# and call tools as needed
response = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": self._format_tool_response(tools)
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": len(user_message.split()),
"completion_tokens": 0,
"total_tokens": len(user_message.split())
}
}
if stream:
return self._stream_response(response)
else:
return JSONResponse(response)
except Exception as e:
logger.error(f"Error in chat_completions: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
def _build_system_prompt(self, tools: list) -> str:
"""Build a system prompt that explains available tools"""
tool_descriptions = []
for tool in tools:
if "function" in tool:
func = tool["function"]
tool_descriptions.append(f"- {func['name']}: {func.get('description', '')}")
return f"""You are an AI assistant with access to the following tools:
{chr(10).join(tool_descriptions)}
Use these tools to help the user accomplish their tasks."""
def _format_tool_response(self, tools: list) -> str:
"""Format available tools for display"""
lines = ["Available tools from MCP Gateway:"]
for tool in tools:
if "function" in tool:
func = tool["function"]
lines.append(f"- **{func['name']}**: {func.get('description', 'No description')}")
return "\n".join(lines)
def _stream_response(self, response: dict) -> StreamingResponse:
"""Convert response to streaming format"""
async def generate():
# Send initial chunk
choice = response["choices"][0]
chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {"role": "assistant", "content": choice["message"]["content"]},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Send final chunk
final_chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {},
"finish_reason": choice["finish_reason"]
}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)