diff --git a/mcp-gateway/openai_adapter.py b/mcp-gateway/openai_adapter.py new file mode 100644 index 0000000..f86e6f7 --- /dev/null +++ b/mcp-gateway/openai_adapter.py @@ -0,0 +1,230 @@ +""" +OpenAI-Compatible Adapter for MCP Gateway +========================================== +Wraps the MCP Gateway to provide OpenAI-compatible /v1/chat/completions endpoint. +Allows Open-UI and other OpenAI-compatible clients to use MCP tools. +""" + +import asyncio +import json +import logging +from typing import Optional, Any +from datetime import datetime +import uuid + +import httpx +from starlette.requests import Request +from starlette.responses import JSONResponse, StreamingResponse + +logger = logging.getLogger("openai-adapter") + +class OpenAIAdapter: + """Converts between OpenAI API format and MCP protocol""" + + def __init__(self, mcp_gateway_url: str = "http://localhost:4444/mcp"): + self.mcp_gateway_url = mcp_gateway_url + self.model_name = "mcp-gateway" + + async def get_mcp_tools(self, session_id: Optional[str] = None) -> list[dict]: + """Fetch available tools from MCP gateway""" + try: + async with httpx.AsyncClient(timeout=10) as client: + payload = { + "jsonrpc": "2.0", + "method": "tools/list", + "params": {}, + "id": 1 + } + headers = {"Content-Type": "application/json"} + if session_id: + headers["Mcp-Session-Id"] = session_id + + resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers) + data = resp.json() + + tools = [] + if "result" in data and "tools" in data["result"]: + for tool in data["result"]["tools"]: + # Convert MCP tool to OpenAI function format + tools.append(self._convert_mcp_tool_to_openai(tool)) + + return tools + except Exception as e: + logger.error(f"Error fetching MCP tools: {e}") + return [] + + def _convert_mcp_tool_to_openai(self, mcp_tool: dict) -> dict: + """Convert MCP tool definition to OpenAI function schema""" + return { + "type": "function", + "function": { + "name": mcp_tool.get("name", "unknown_tool"), + "description": mcp_tool.get("description", ""), + "parameters": { + "type": "object", + "properties": mcp_tool.get("inputSchema", {}).get("properties", {}), + "required": mcp_tool.get("inputSchema", {}).get("required", []) + } + } + } + + async def call_mcp_tool(self, tool_name: str, tool_input: dict, session_id: Optional[str] = None) -> dict: + """Call a tool via MCP gateway""" + try: + async with httpx.AsyncClient(timeout=30) as client: + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": tool_name, + "arguments": json.dumps(tool_input) + }, + "id": 1 + } + headers = {"Content-Type": "application/json"} + if session_id: + headers["Mcp-Session-Id"] = session_id + + resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers) + data = resp.json() + + if "result" in data: + return { + "success": True, + "content": data["result"].get("content", []), + "isError": data["result"].get("isError", False) + } + else: + return { + "success": False, + "error": data.get("error", {}).get("message", "Unknown error") + } + except Exception as e: + logger.error(f"Error calling MCP tool {tool_name}: {e}") + return { + "success": False, + "error": str(e) + } + + async def chat_completions(self, request: Request) -> Any: + """Handle OpenAI-compatible chat completions request""" + try: + body = await request.json() + messages = body.get("messages", []) + model = body.get("model", self.model_name) + stream = body.get("stream", False) + tools = body.get("tools", []) + tool_choice = body.get("tool_choice", "auto") + + # Get available MCP tools if not provided + if not tools: + mcp_tools = await self.get_mcp_tools() + tools = mcp_tools + + # Build MCP request + system_prompt = self._build_system_prompt(tools) + user_message = messages[-1].get("content", "") if messages else "" + + # For now, return a simple response that lists available tools + # In a full implementation, you'd use an LLM to process the conversation + # and call tools as needed + + response = { + "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", + "object": "chat.completion", + "created": int(datetime.now().timestamp()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": self._format_tool_response(tools) + }, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": len(user_message.split()), + "completion_tokens": 0, + "total_tokens": len(user_message.split()) + } + } + + if stream: + return self._stream_response(response) + else: + return JSONResponse(response) + + except Exception as e: + logger.error(f"Error in chat_completions: {e}") + return JSONResponse( + {"error": {"message": str(e)}}, + status_code=500 + ) + + def _build_system_prompt(self, tools: list) -> str: + """Build a system prompt that explains available tools""" + tool_descriptions = [] + for tool in tools: + if "function" in tool: + func = tool["function"] + tool_descriptions.append(f"- {func['name']}: {func.get('description', '')}") + + return f"""You are an AI assistant with access to the following tools: + +{chr(10).join(tool_descriptions)} + +Use these tools to help the user accomplish their tasks.""" + + def _format_tool_response(self, tools: list) -> str: + """Format available tools for display""" + lines = ["Available tools from MCP Gateway:"] + for tool in tools: + if "function" in tool: + func = tool["function"] + lines.append(f"- **{func['name']}**: {func.get('description', 'No description')}") + return "\n".join(lines) + + def _stream_response(self, response: dict) -> StreamingResponse: + """Convert response to streaming format""" + async def generate(): + # Send initial chunk + choice = response["choices"][0] + chunk = { + "id": response["id"], + "object": "chat.completion.chunk", + "created": response["created"], + "model": response["model"], + "choices": [ + { + "index": choice["index"], + "delta": {"role": "assistant", "content": choice["message"]["content"]}, + "finish_reason": None + } + ] + } + yield f"data: {json.dumps(chunk)}\n\n" + + # Send final chunk + final_chunk = { + "id": response["id"], + "object": "chat.completion.chunk", + "created": response["created"], + "model": response["model"], + "choices": [ + { + "index": choice["index"], + "delta": {}, + "finish_reason": choice["finish_reason"] + } + ] + } + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache"} + )