""" OpenAI-Compatible Routes for MCP Gateway Adds /v1/chat/completions and /v1/models endpoints """ import json import logging import uuid import hashlib from datetime import datetime from typing import Optional import httpx from starlette.requests import Request from starlette.responses import JSONResponse, StreamingResponse logger = logging.getLogger("mcp-gateway.openai") async def list_models(request: Request) -> JSONResponse: """List available models from MCP Gateway (OpenAI compatible)""" try: # Return a model representing the MCP Gateway return JSONResponse({ "object": "list", "data": [ { "id": "mcp-gateway", "object": "model", "owned_by": "mcp-gateway", "permission": [ { "id": "modelperm-1", "object": "model_permission", "created": int(datetime.now().timestamp()), "allow_create_engine": False, "allow_sampling": True, "allow_logprobs": False, "allow_search_indices": False, "allow_view": True, "allow_fine_tuning": False, "organization": "*", "group_id": None, "is_blocking": False } ], "created": 1677649963, "parent_model": None, "root": "mcp-gateway", "root_owner": "mcp-gateway" } ] }) except Exception as e: logger.error(f"Error listing models: {e}") return JSONResponse( {"error": {"message": str(e)}}, status_code=500 ) async def chat_completions(request: Request) -> JSONResponse | StreamingResponse: """OpenAI-compatible chat completions endpoint""" try: body = await request.json() messages = body.get("messages", []) model = body.get("model", "mcp-gateway") stream = body.get("stream", False) temperature = body.get("temperature", 0.7) # Extract the latest user message user_message = None for msg in reversed(messages): if msg.get("role") == "user": user_message = msg.get("content", "") break if not user_message: return JSONResponse( {"error": {"message": "No user message found"}}, status_code=400 ) # Build response with available tools info tool_list = "MCP Gateway is active with tools available from: ERPNext, Wave Finance, TrueNAS, and Home Assistant" response = { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": tool_list, "tool_calls": [] }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(user_message.split()), "completion_tokens": 50, "total_tokens": len(user_message.split()) + 50 } } if stream: return StreamingResponse( _stream_response(response), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) else: return JSONResponse(response) except Exception as e: logger.error(f"Error in chat_completions: {e}") return JSONResponse( {"error": {"message": str(e)}}, status_code=500 ) async def _stream_response(response: dict): """Generate streaming response chunks""" choice = response["choices"][0] # Send start chunk chunk = { "id": response["id"], "object": "chat.completion.chunk", "created": response["created"], "model": response["model"], "choices": [ { "index": choice["index"], "delta": {"role": "assistant", "content": choice["message"]["content"]}, "finish_reason": None } ] } yield f"data: {json.dumps(chunk)}\n\n" # Send end chunk final_chunk = { "id": response["id"], "object": "chat.completion.chunk", "created": response["created"], "model": response["model"], "choices": [ { "index": choice["index"], "delta": {}, "finish_reason": "stop" } ] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n"