231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
|
|
"""
|
||
|
|
OpenAI-Compatible Adapter for MCP Gateway
|
||
|
|
==========================================
|
||
|
|
Wraps the MCP Gateway to provide OpenAI-compatible /v1/chat/completions endpoint.
|
||
|
|
Allows Open-UI and other OpenAI-compatible clients to use MCP tools.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Optional, Any
|
||
|
|
from datetime import datetime
|
||
|
|
import uuid
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
from starlette.requests import Request
|
||
|
|
from starlette.responses import JSONResponse, StreamingResponse
|
||
|
|
|
||
|
|
logger = logging.getLogger("openai-adapter")
|
||
|
|
|
||
|
|
class OpenAIAdapter:
|
||
|
|
"""Converts between OpenAI API format and MCP protocol"""
|
||
|
|
|
||
|
|
def __init__(self, mcp_gateway_url: str = "http://localhost:4444/mcp"):
|
||
|
|
self.mcp_gateway_url = mcp_gateway_url
|
||
|
|
self.model_name = "mcp-gateway"
|
||
|
|
|
||
|
|
async def get_mcp_tools(self, session_id: Optional[str] = None) -> list[dict]:
|
||
|
|
"""Fetch available tools from MCP gateway"""
|
||
|
|
try:
|
||
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
||
|
|
payload = {
|
||
|
|
"jsonrpc": "2.0",
|
||
|
|
"method": "tools/list",
|
||
|
|
"params": {},
|
||
|
|
"id": 1
|
||
|
|
}
|
||
|
|
headers = {"Content-Type": "application/json"}
|
||
|
|
if session_id:
|
||
|
|
headers["Mcp-Session-Id"] = session_id
|
||
|
|
|
||
|
|
resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers)
|
||
|
|
data = resp.json()
|
||
|
|
|
||
|
|
tools = []
|
||
|
|
if "result" in data and "tools" in data["result"]:
|
||
|
|
for tool in data["result"]["tools"]:
|
||
|
|
# Convert MCP tool to OpenAI function format
|
||
|
|
tools.append(self._convert_mcp_tool_to_openai(tool))
|
||
|
|
|
||
|
|
return tools
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error fetching MCP tools: {e}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
def _convert_mcp_tool_to_openai(self, mcp_tool: dict) -> dict:
|
||
|
|
"""Convert MCP tool definition to OpenAI function schema"""
|
||
|
|
return {
|
||
|
|
"type": "function",
|
||
|
|
"function": {
|
||
|
|
"name": mcp_tool.get("name", "unknown_tool"),
|
||
|
|
"description": mcp_tool.get("description", ""),
|
||
|
|
"parameters": {
|
||
|
|
"type": "object",
|
||
|
|
"properties": mcp_tool.get("inputSchema", {}).get("properties", {}),
|
||
|
|
"required": mcp_tool.get("inputSchema", {}).get("required", [])
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async def call_mcp_tool(self, tool_name: str, tool_input: dict, session_id: Optional[str] = None) -> dict:
|
||
|
|
"""Call a tool via MCP gateway"""
|
||
|
|
try:
|
||
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
||
|
|
payload = {
|
||
|
|
"jsonrpc": "2.0",
|
||
|
|
"method": "tools/call",
|
||
|
|
"params": {
|
||
|
|
"name": tool_name,
|
||
|
|
"arguments": json.dumps(tool_input)
|
||
|
|
},
|
||
|
|
"id": 1
|
||
|
|
}
|
||
|
|
headers = {"Content-Type": "application/json"}
|
||
|
|
if session_id:
|
||
|
|
headers["Mcp-Session-Id"] = session_id
|
||
|
|
|
||
|
|
resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers)
|
||
|
|
data = resp.json()
|
||
|
|
|
||
|
|
if "result" in data:
|
||
|
|
return {
|
||
|
|
"success": True,
|
||
|
|
"content": data["result"].get("content", []),
|
||
|
|
"isError": data["result"].get("isError", False)
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": data.get("error", {}).get("message", "Unknown error")
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error calling MCP tool {tool_name}: {e}")
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": str(e)
|
||
|
|
}
|
||
|
|
|
||
|
|
async def chat_completions(self, request: Request) -> Any:
|
||
|
|
"""Handle OpenAI-compatible chat completions request"""
|
||
|
|
try:
|
||
|
|
body = await request.json()
|
||
|
|
messages = body.get("messages", [])
|
||
|
|
model = body.get("model", self.model_name)
|
||
|
|
stream = body.get("stream", False)
|
||
|
|
tools = body.get("tools", [])
|
||
|
|
tool_choice = body.get("tool_choice", "auto")
|
||
|
|
|
||
|
|
# Get available MCP tools if not provided
|
||
|
|
if not tools:
|
||
|
|
mcp_tools = await self.get_mcp_tools()
|
||
|
|
tools = mcp_tools
|
||
|
|
|
||
|
|
# Build MCP request
|
||
|
|
system_prompt = self._build_system_prompt(tools)
|
||
|
|
user_message = messages[-1].get("content", "") if messages else ""
|
||
|
|
|
||
|
|
# For now, return a simple response that lists available tools
|
||
|
|
# In a full implementation, you'd use an LLM to process the conversation
|
||
|
|
# and call tools as needed
|
||
|
|
|
||
|
|
response = {
|
||
|
|
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||
|
|
"object": "chat.completion",
|
||
|
|
"created": int(datetime.now().timestamp()),
|
||
|
|
"model": model,
|
||
|
|
"choices": [
|
||
|
|
{
|
||
|
|
"index": 0,
|
||
|
|
"message": {
|
||
|
|
"role": "assistant",
|
||
|
|
"content": self._format_tool_response(tools)
|
||
|
|
},
|
||
|
|
"finish_reason": "stop"
|
||
|
|
}
|
||
|
|
],
|
||
|
|
"usage": {
|
||
|
|
"prompt_tokens": len(user_message.split()),
|
||
|
|
"completion_tokens": 0,
|
||
|
|
"total_tokens": len(user_message.split())
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if stream:
|
||
|
|
return self._stream_response(response)
|
||
|
|
else:
|
||
|
|
return JSONResponse(response)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error in chat_completions: {e}")
|
||
|
|
return JSONResponse(
|
||
|
|
{"error": {"message": str(e)}},
|
||
|
|
status_code=500
|
||
|
|
)
|
||
|
|
|
||
|
|
def _build_system_prompt(self, tools: list) -> str:
|
||
|
|
"""Build a system prompt that explains available tools"""
|
||
|
|
tool_descriptions = []
|
||
|
|
for tool in tools:
|
||
|
|
if "function" in tool:
|
||
|
|
func = tool["function"]
|
||
|
|
tool_descriptions.append(f"- {func['name']}: {func.get('description', '')}")
|
||
|
|
|
||
|
|
return f"""You are an AI assistant with access to the following tools:
|
||
|
|
|
||
|
|
{chr(10).join(tool_descriptions)}
|
||
|
|
|
||
|
|
Use these tools to help the user accomplish their tasks."""
|
||
|
|
|
||
|
|
def _format_tool_response(self, tools: list) -> str:
|
||
|
|
"""Format available tools for display"""
|
||
|
|
lines = ["Available tools from MCP Gateway:"]
|
||
|
|
for tool in tools:
|
||
|
|
if "function" in tool:
|
||
|
|
func = tool["function"]
|
||
|
|
lines.append(f"- **{func['name']}**: {func.get('description', 'No description')}")
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
def _stream_response(self, response: dict) -> StreamingResponse:
|
||
|
|
"""Convert response to streaming format"""
|
||
|
|
async def generate():
|
||
|
|
# Send initial chunk
|
||
|
|
choice = response["choices"][0]
|
||
|
|
chunk = {
|
||
|
|
"id": response["id"],
|
||
|
|
"object": "chat.completion.chunk",
|
||
|
|
"created": response["created"],
|
||
|
|
"model": response["model"],
|
||
|
|
"choices": [
|
||
|
|
{
|
||
|
|
"index": choice["index"],
|
||
|
|
"delta": {"role": "assistant", "content": choice["message"]["content"]},
|
||
|
|
"finish_reason": None
|
||
|
|
}
|
||
|
|
]
|
||
|
|
}
|
||
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
||
|
|
|
||
|
|
# Send final chunk
|
||
|
|
final_chunk = {
|
||
|
|
"id": response["id"],
|
||
|
|
"object": "chat.completion.chunk",
|
||
|
|
"created": response["created"],
|
||
|
|
"model": response["model"],
|
||
|
|
"choices": [
|
||
|
|
{
|
||
|
|
"index": choice["index"],
|
||
|
|
"delta": {},
|
||
|
|
"finish_reason": choice["finish_reason"]
|
||
|
|
}
|
||
|
|
]
|
||
|
|
}
|
||
|
|
yield f"data: {json.dumps(final_chunk)}\n\n"
|
||
|
|
yield "data: [DONE]\n\n"
|
||
|
|
|
||
|
|
return StreamingResponse(
|
||
|
|
generate(),
|
||
|
|
media_type="text/event-stream",
|
||
|
|
headers={"Cache-Control": "no-cache"}
|
||
|
|
)
|