diff --git a/mcp-gateway/openai_adapter.py b/mcp-gateway/openai_adapter.py deleted file mode 100644 index f86e6f7..0000000 --- a/mcp-gateway/openai_adapter.py +++ /dev/null @@ -1,230 +0,0 @@ -""" -OpenAI-Compatible Adapter for MCP Gateway -========================================== -Wraps the MCP Gateway to provide OpenAI-compatible /v1/chat/completions endpoint. -Allows Open-UI and other OpenAI-compatible clients to use MCP tools. -""" - -import asyncio -import json -import logging -from typing import Optional, Any -from datetime import datetime -import uuid - -import httpx -from starlette.requests import Request -from starlette.responses import JSONResponse, StreamingResponse - -logger = logging.getLogger("openai-adapter") - -class OpenAIAdapter: - """Converts between OpenAI API format and MCP protocol""" - - def __init__(self, mcp_gateway_url: str = "http://localhost:4444/mcp"): - self.mcp_gateway_url = mcp_gateway_url - self.model_name = "mcp-gateway" - - async def get_mcp_tools(self, session_id: Optional[str] = None) -> list[dict]: - """Fetch available tools from MCP gateway""" - try: - async with httpx.AsyncClient(timeout=10) as client: - payload = { - "jsonrpc": "2.0", - "method": "tools/list", - "params": {}, - "id": 1 - } - headers = {"Content-Type": "application/json"} - if session_id: - headers["Mcp-Session-Id"] = session_id - - resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers) - data = resp.json() - - tools = [] - if "result" in data and "tools" in data["result"]: - for tool in data["result"]["tools"]: - # Convert MCP tool to OpenAI function format - tools.append(self._convert_mcp_tool_to_openai(tool)) - - return tools - except Exception as e: - logger.error(f"Error fetching MCP tools: {e}") - return [] - - def _convert_mcp_tool_to_openai(self, mcp_tool: dict) -> dict: - """Convert MCP tool definition to OpenAI function schema""" - return { - "type": "function", - "function": { - "name": mcp_tool.get("name", "unknown_tool"), - "description": mcp_tool.get("description", ""), - "parameters": { - "type": "object", - "properties": mcp_tool.get("inputSchema", {}).get("properties", {}), - "required": mcp_tool.get("inputSchema", {}).get("required", []) - } - } - } - - async def call_mcp_tool(self, tool_name: str, tool_input: dict, session_id: Optional[str] = None) -> dict: - """Call a tool via MCP gateway""" - try: - async with httpx.AsyncClient(timeout=30) as client: - payload = { - "jsonrpc": "2.0", - "method": "tools/call", - "params": { - "name": tool_name, - "arguments": json.dumps(tool_input) - }, - "id": 1 - } - headers = {"Content-Type": "application/json"} - if session_id: - headers["Mcp-Session-Id"] = session_id - - resp = await client.post(self.mcp_gateway_url, json=payload, headers=headers) - data = resp.json() - - if "result" in data: - return { - "success": True, - "content": data["result"].get("content", []), - "isError": data["result"].get("isError", False) - } - else: - return { - "success": False, - "error": data.get("error", {}).get("message", "Unknown error") - } - except Exception as e: - logger.error(f"Error calling MCP tool {tool_name}: {e}") - return { - "success": False, - "error": str(e) - } - - async def chat_completions(self, request: Request) -> Any: - """Handle OpenAI-compatible chat completions request""" - try: - body = await request.json() - messages = body.get("messages", []) - model = body.get("model", self.model_name) - stream = body.get("stream", False) - tools = body.get("tools", []) - tool_choice = body.get("tool_choice", "auto") - - # Get available MCP tools if not provided - if not tools: - mcp_tools = await self.get_mcp_tools() - tools = mcp_tools - - # Build MCP request - system_prompt = self._build_system_prompt(tools) - user_message = messages[-1].get("content", "") if messages else "" - - # For now, return a simple response that lists available tools - # In a full implementation, you'd use an LLM to process the conversation - # and call tools as needed - - response = { - "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", - "object": "chat.completion", - "created": int(datetime.now().timestamp()), - "model": model, - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": self._format_tool_response(tools) - }, - "finish_reason": "stop" - } - ], - "usage": { - "prompt_tokens": len(user_message.split()), - "completion_tokens": 0, - "total_tokens": len(user_message.split()) - } - } - - if stream: - return self._stream_response(response) - else: - return JSONResponse(response) - - except Exception as e: - logger.error(f"Error in chat_completions: {e}") - return JSONResponse( - {"error": {"message": str(e)}}, - status_code=500 - ) - - def _build_system_prompt(self, tools: list) -> str: - """Build a system prompt that explains available tools""" - tool_descriptions = [] - for tool in tools: - if "function" in tool: - func = tool["function"] - tool_descriptions.append(f"- {func['name']}: {func.get('description', '')}") - - return f"""You are an AI assistant with access to the following tools: - -{chr(10).join(tool_descriptions)} - -Use these tools to help the user accomplish their tasks.""" - - def _format_tool_response(self, tools: list) -> str: - """Format available tools for display""" - lines = ["Available tools from MCP Gateway:"] - for tool in tools: - if "function" in tool: - func = tool["function"] - lines.append(f"- **{func['name']}**: {func.get('description', 'No description')}") - return "\n".join(lines) - - def _stream_response(self, response: dict) -> StreamingResponse: - """Convert response to streaming format""" - async def generate(): - # Send initial chunk - choice = response["choices"][0] - chunk = { - "id": response["id"], - "object": "chat.completion.chunk", - "created": response["created"], - "model": response["model"], - "choices": [ - { - "index": choice["index"], - "delta": {"role": "assistant", "content": choice["message"]["content"]}, - "finish_reason": None - } - ] - } - yield f"data: {json.dumps(chunk)}\n\n" - - # Send final chunk - final_chunk = { - "id": response["id"], - "object": "chat.completion.chunk", - "created": response["created"], - "model": response["model"], - "choices": [ - { - "index": choice["index"], - "delta": {}, - "finish_reason": choice["finish_reason"] - } - ] - } - yield f"data: {json.dumps(final_chunk)}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse( - generate(), - media_type="text/event-stream", - headers={"Cache-Control": "no-cache"} - )