Add gateway-proxy/openai_routes.py

This commit is contained in:
zgaetano 2026-03-31 15:33:39 -04:00
parent 633ea63bbf
commit 932e97acad

View file

@ -0,0 +1,162 @@
"""
OpenAI-Compatible Routes for MCP Gateway
Adds /v1/chat/completions and /v1/models endpoints
"""
import json
import logging
import uuid
import hashlib
from datetime import datetime
from typing import Optional
import httpx
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
logger = logging.getLogger("mcp-gateway.openai")
async def list_models(request: Request) -> JSONResponse:
"""List available models from MCP Gateway (OpenAI compatible)"""
try:
# Return a model representing the MCP Gateway
return JSONResponse({
"object": "list",
"data": [
{
"id": "mcp-gateway",
"object": "model",
"owned_by": "mcp-gateway",
"permission": [
{
"id": "modelperm-1",
"object": "model_permission",
"created": int(datetime.now().timestamp()),
"allow_create_engine": False,
"allow_sampling": True,
"allow_logprobs": False,
"allow_search_indices": False,
"allow_view": True,
"allow_fine_tuning": False,
"organization": "*",
"group_id": None,
"is_blocking": False
}
],
"created": 1677649963,
"parent_model": None,
"root": "mcp-gateway",
"root_owner": "mcp-gateway"
}
]
})
except Exception as e:
logger.error(f"Error listing models: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
async def chat_completions(request: Request) -> JSONResponse | StreamingResponse:
"""OpenAI-compatible chat completions endpoint"""
try:
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "mcp-gateway")
stream = body.get("stream", False)
temperature = body.get("temperature", 0.7)
# Extract the latest user message
user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
if not user_message:
return JSONResponse(
{"error": {"message": "No user message found"}},
status_code=400
)
# Build response with available tools info
tool_list = "MCP Gateway is active with tools available from: ERPNext, Wave Finance, TrueNAS, and Home Assistant"
response = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": tool_list,
"tool_calls": []
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": len(user_message.split()),
"completion_tokens": 50,
"total_tokens": len(user_message.split()) + 50
}
}
if stream:
return StreamingResponse(
_stream_response(response),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)
else:
return JSONResponse(response)
except Exception as e:
logger.error(f"Error in chat_completions: {e}")
return JSONResponse(
{"error": {"message": str(e)}},
status_code=500
)
async def _stream_response(response: dict):
"""Generate streaming response chunks"""
choice = response["choices"][0]
# Send start chunk
chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {"role": "assistant", "content": choice["message"]["content"]},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Send end chunk
final_chunk = {
"id": response["id"],
"object": "chat.completion.chunk",
"created": response["created"],
"model": response["model"],
"choices": [
{
"index": choice["index"],
"delta": {},
"finish_reason": "stop"
}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"