Add mcp-gateway/homeassistant-mcp/homeassistant_mcp.py
This commit is contained in:
parent
14b311c287
commit
544d01640b
1 changed files with 589 additions and 0 deletions
589
mcp-gateway/homeassistant-mcp/homeassistant_mcp.py
Normal file
589
mcp-gateway/homeassistant-mcp/homeassistant_mcp.py
Normal file
|
|
@ -0,0 +1,589 @@
|
|||
"""
|
||||
Home Assistant MCP Server
|
||||
=========================
|
||||
MCP server for controlling and monitoring Home Assistant via the REST API.
|
||||
Provides tools for managing entities, services, automations, scenes, and more.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Optional, Any
|
||||
|
||||
import httpx
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
HASS_URL = os.environ.get("HASS_URL", "https://haos.wilddragoncore.online")
|
||||
HASS_TOKEN = os.environ.get("HASS_TOKEN", "")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCP Server
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
mcp = FastMCP("homeassistant_mcp")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _headers() -> dict:
|
||||
return {
|
||||
"Authorization": f"Bearer {HASS_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
async def _get(path: str, params: Optional[dict] = None) -> Any:
|
||||
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||||
resp = await client.get(
|
||||
f"{HASS_URL}/api/{path}", headers=_headers(), params=params
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
async def _post(path: str, payload: Any = None) -> Any:
|
||||
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||||
resp = await client.post(
|
||||
f"{HASS_URL}/api/{path}", headers=_headers(), json=payload
|
||||
)
|
||||
resp.raise_for_status()
|
||||
try:
|
||||
return resp.json()
|
||||
except Exception:
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
def _handle_error(e: Exception) -> str:
|
||||
if isinstance(e, httpx.HTTPStatusError):
|
||||
status = e.response.status_code
|
||||
try:
|
||||
body = e.response.json()
|
||||
msg = body.get("message", str(body))
|
||||
except Exception:
|
||||
msg = e.response.text[:500]
|
||||
return f"Error {status}: {msg}"
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Input Models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class EmptyInput(BaseModel):
|
||||
pass
|
||||
|
||||
|
||||
class EntityIdInput(BaseModel):
|
||||
entity_id: str = Field(description="Entity ID (e.g. light.living_room)")
|
||||
|
||||
|
||||
class ServiceCallInput(BaseModel):
|
||||
domain: str = Field(description="Service domain (e.g. light, switch, climate)")
|
||||
service: str = Field(description="Service name (e.g. turn_on, turn_off, toggle)")
|
||||
entity_id: Optional[str] = Field(
|
||||
default=None, description="Target entity ID"
|
||||
)
|
||||
service_data: Optional[dict] = Field(
|
||||
default=None,
|
||||
description="Additional service data (e.g. brightness, temperature)",
|
||||
)
|
||||
|
||||
|
||||
class EntitiesByDomainInput(BaseModel):
|
||||
domain: str = Field(
|
||||
description="Entity domain to filter by (e.g. light, switch, sensor, climate, media_player)"
|
||||
)
|
||||
|
||||
|
||||
class SetStateInput(BaseModel):
|
||||
entity_id: str = Field(description="Entity ID to update")
|
||||
state: str = Field(description="New state value")
|
||||
attributes: Optional[dict] = Field(
|
||||
default=None, description="Optional attributes to set"
|
||||
)
|
||||
|
||||
|
||||
class AutomationInput(BaseModel):
|
||||
entity_id: str = Field(
|
||||
description="Automation entity ID (e.g. automation.morning_routine)"
|
||||
)
|
||||
|
||||
|
||||
class EventInput(BaseModel):
|
||||
event_type: str = Field(description="Event type to fire")
|
||||
event_data: Optional[dict] = Field(
|
||||
default=None, description="Optional event data payload"
|
||||
)
|
||||
|
||||
|
||||
class HistoryInput(BaseModel):
|
||||
entity_id: str = Field(description="Entity ID to get history for")
|
||||
hours: int = Field(default=24, description="Number of hours of history (max 72)")
|
||||
|
||||
|
||||
class LogbookInput(BaseModel):
|
||||
hours: int = Field(
|
||||
default=24, description="Number of hours of logbook entries (max 72)"
|
||||
)
|
||||
entity_id: Optional[str] = Field(
|
||||
default=None, description="Optional entity ID to filter by"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — System Info
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_get_config",
|
||||
description="Get Home Assistant configuration and system info including version, location, units, and components.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_get_config(params: EmptyInput) -> str:
|
||||
try:
|
||||
config = await _get("")
|
||||
return json.dumps(
|
||||
{
|
||||
"version": config.get("version"),
|
||||
"location_name": config.get("location_name"),
|
||||
"latitude": config.get("latitude"),
|
||||
"longitude": config.get("longitude"),
|
||||
"elevation": config.get("elevation"),
|
||||
"unit_system": config.get("unit_system"),
|
||||
"time_zone": config.get("time_zone"),
|
||||
"state": config.get("state"),
|
||||
"components": sorted(config.get("components", []))[:50],
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_check_api",
|
||||
description="Check if Home Assistant API is reachable and responding.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_check_api(params: EmptyInput) -> str:
|
||||
try:
|
||||
async with httpx.AsyncClient(verify=False, timeout=10) as client:
|
||||
resp = await client.get(f"{HASS_URL}/api/", headers=_headers())
|
||||
resp.raise_for_status()
|
||||
return json.dumps({"status": "connected", "message": resp.json().get("message", "API running")})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Entity Management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_list_entities",
|
||||
description="List all entities, optionally filtered by domain. Returns entity_id, state, and friendly name.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_list_entities(params: EntitiesByDomainInput) -> str:
|
||||
try:
|
||||
states = await _get("states")
|
||||
filtered = [
|
||||
{
|
||||
"entity_id": s["entity_id"],
|
||||
"state": s["state"],
|
||||
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
|
||||
}
|
||||
for s in states
|
||||
if s["entity_id"].startswith(f"{params.domain}.")
|
||||
]
|
||||
return json.dumps(
|
||||
{"domain": params.domain, "count": len(filtered), "entities": filtered},
|
||||
indent=2,
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_get_entity_state",
|
||||
description="Get the full state and attributes of a specific entity.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_get_entity_state(params: EntityIdInput) -> str:
|
||||
try:
|
||||
state = await _get(f"states/{params.entity_id}")
|
||||
return json.dumps(
|
||||
{
|
||||
"entity_id": state["entity_id"],
|
||||
"state": state["state"],
|
||||
"attributes": state.get("attributes", {}),
|
||||
"last_changed": state.get("last_changed"),
|
||||
"last_updated": state.get("last_updated"),
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_get_all_domains",
|
||||
description="Get a summary of all entity domains and their counts.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_get_all_domains(params: EmptyInput) -> str:
|
||||
try:
|
||||
states = await _get("states")
|
||||
domains: dict[str, int] = {}
|
||||
for s in states:
|
||||
domain = s["entity_id"].split(".")[0]
|
||||
domains[domain] = domains.get(domain, 0) + 1
|
||||
sorted_domains = dict(sorted(domains.items(), key=lambda x: -x[1]))
|
||||
return json.dumps(
|
||||
{"total_entities": len(states), "domains": sorted_domains}, indent=2
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Service Calls (actions)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_call_service",
|
||||
description="Call any Home Assistant service. Use for turning on/off lights, switches, setting climate, playing media, etc.",
|
||||
annotations={"readOnlyHint": False, "destructiveHint": False},
|
||||
)
|
||||
async def hass_call_service(params: ServiceCallInput) -> str:
|
||||
try:
|
||||
payload = params.service_data or {}
|
||||
if params.entity_id:
|
||||
payload["entity_id"] = params.entity_id
|
||||
|
||||
result = await _post(f"services/{params.domain}/{params.service}", payload)
|
||||
changed = []
|
||||
if isinstance(result, list):
|
||||
changed = [
|
||||
{"entity_id": s["entity_id"], "state": s["state"]}
|
||||
for s in result[:10]
|
||||
]
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "ok",
|
||||
"service": f"{params.domain}.{params.service}",
|
||||
"affected_entities": changed,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_turn_on",
|
||||
description="Turn on an entity (light, switch, fan, etc.).",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_turn_on(params: EntityIdInput) -> str:
|
||||
try:
|
||||
domain = params.entity_id.split(".")[0]
|
||||
result = await _post(
|
||||
f"services/{domain}/turn_on", {"entity_id": params.entity_id}
|
||||
)
|
||||
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_on"})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_turn_off",
|
||||
description="Turn off an entity (light, switch, fan, etc.).",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_turn_off(params: EntityIdInput) -> str:
|
||||
try:
|
||||
domain = params.entity_id.split(".")[0]
|
||||
result = await _post(
|
||||
f"services/{domain}/turn_off", {"entity_id": params.entity_id}
|
||||
)
|
||||
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_off"})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_toggle",
|
||||
description="Toggle an entity (if on, turn off; if off, turn on).",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_toggle(params: EntityIdInput) -> str:
|
||||
try:
|
||||
domain = params.entity_id.split(".")[0]
|
||||
result = await _post(
|
||||
f"services/{domain}/toggle", {"entity_id": params.entity_id}
|
||||
)
|
||||
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "toggle"})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Automations & Scenes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_list_automations",
|
||||
description="List all automations with their current state (on/off) and last triggered time.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_list_automations(params: EmptyInput) -> str:
|
||||
try:
|
||||
states = await _get("states")
|
||||
automations = [
|
||||
{
|
||||
"entity_id": s["entity_id"],
|
||||
"state": s["state"],
|
||||
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
|
||||
"last_triggered": s.get("attributes", {}).get("last_triggered"),
|
||||
}
|
||||
for s in states
|
||||
if s["entity_id"].startswith("automation.")
|
||||
]
|
||||
return json.dumps({"count": len(automations), "automations": automations}, indent=2)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_trigger_automation",
|
||||
description="Manually trigger an automation.",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_trigger_automation(params: AutomationInput) -> str:
|
||||
try:
|
||||
await _post(
|
||||
"services/automation/trigger", {"entity_id": params.entity_id}
|
||||
)
|
||||
return json.dumps({"status": "ok", "triggered": params.entity_id})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_list_scenes",
|
||||
description="List all available scenes.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_list_scenes(params: EmptyInput) -> str:
|
||||
try:
|
||||
states = await _get("states")
|
||||
scenes = [
|
||||
{
|
||||
"entity_id": s["entity_id"],
|
||||
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
|
||||
}
|
||||
for s in states
|
||||
if s["entity_id"].startswith("scene.")
|
||||
]
|
||||
return json.dumps({"count": len(scenes), "scenes": scenes}, indent=2)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_activate_scene",
|
||||
description="Activate a scene.",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_activate_scene(params: EntityIdInput) -> str:
|
||||
try:
|
||||
await _post("services/scene/turn_on", {"entity_id": params.entity_id})
|
||||
return json.dumps({"status": "ok", "activated": params.entity_id})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — History & Logbook
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_get_history",
|
||||
description="Get state history for an entity over the last N hours.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_get_history(params: HistoryInput) -> str:
|
||||
try:
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
end = datetime.now(timezone.utc)
|
||||
start = end - timedelta(hours=min(params.hours, 72))
|
||||
ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00")
|
||||
|
||||
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||||
resp = await client.get(
|
||||
f"{HASS_URL}/api/history/period/{ts}",
|
||||
headers=_headers(),
|
||||
params={"filter_entity_id": params.entity_id, "minimal_response": ""},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
if not data or not data[0]:
|
||||
return json.dumps({"entity_id": params.entity_id, "history": []})
|
||||
|
||||
entries = [
|
||||
{
|
||||
"state": h.get("state"),
|
||||
"last_changed": h.get("last_changed"),
|
||||
}
|
||||
for h in data[0][:100]
|
||||
]
|
||||
return json.dumps(
|
||||
{"entity_id": params.entity_id, "count": len(entries), "history": entries},
|
||||
indent=2,
|
||||
)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_get_logbook",
|
||||
description="Get logbook entries for the last N hours, optionally filtered by entity.",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_get_logbook(params: LogbookInput) -> str:
|
||||
try:
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
end = datetime.now(timezone.utc)
|
||||
start = end - timedelta(hours=min(params.hours, 72))
|
||||
ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00")
|
||||
|
||||
query_params = {}
|
||||
if params.entity_id:
|
||||
query_params["entity"] = params.entity_id
|
||||
|
||||
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||||
resp = await client.get(
|
||||
f"{HASS_URL}/api/logbook/{ts}",
|
||||
headers=_headers(),
|
||||
params=query_params,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
entries = [
|
||||
{
|
||||
"when": e.get("when"),
|
||||
"name": e.get("name"),
|
||||
"message": e.get("message", ""),
|
||||
"entity_id": e.get("entity_id", ""),
|
||||
"state": e.get("state", ""),
|
||||
}
|
||||
for e in data[:100]
|
||||
]
|
||||
return json.dumps({"count": len(entries), "entries": entries}, indent=2)
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Services Discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_list_services",
|
||||
description="List available services for a specific domain (e.g. light, climate).",
|
||||
annotations={"readOnlyHint": True},
|
||||
)
|
||||
async def hass_list_services(params: EntitiesByDomainInput) -> str:
|
||||
try:
|
||||
services = await _get("services")
|
||||
for svc in services:
|
||||
if svc.get("domain") == params.domain:
|
||||
service_list = []
|
||||
for name, info in svc.get("services", {}).items():
|
||||
service_list.append(
|
||||
{
|
||||
"service": f"{params.domain}.{name}",
|
||||
"description": info.get("description", ""),
|
||||
}
|
||||
)
|
||||
return json.dumps(
|
||||
{"domain": params.domain, "count": len(service_list), "services": service_list},
|
||||
indent=2,
|
||||
)
|
||||
return json.dumps({"error": f"Domain '{params.domain}' not found"})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Events
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_fire_event",
|
||||
description="Fire a custom event on the event bus.",
|
||||
annotations={"readOnlyHint": False, "destructiveHint": False},
|
||||
)
|
||||
async def hass_fire_event(params: EventInput) -> str:
|
||||
try:
|
||||
result = await _post(f"events/{params.event_type}", params.event_data)
|
||||
return json.dumps({"status": "ok", "event_type": params.event_type, "result": result})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tools — Notifications
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class NotifyInput(BaseModel):
|
||||
message: str = Field(description="Notification message text")
|
||||
title: Optional[str] = Field(default=None, description="Optional notification title")
|
||||
target: str = Field(
|
||||
default="notify",
|
||||
description="Notification service target (e.g. 'notify' for default, 'mobile_app_phone' for specific device)",
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="hass_send_notification",
|
||||
description="Send a notification via Home Assistant's notification services.",
|
||||
annotations={"readOnlyHint": False},
|
||||
)
|
||||
async def hass_send_notification(params: NotifyInput) -> str:
|
||||
try:
|
||||
payload: dict[str, Any] = {"message": params.message}
|
||||
if params.title:
|
||||
payload["title"] = params.title
|
||||
await _post(f"services/notify/{params.target}", payload)
|
||||
return json.dumps({"status": "ok", "sent_to": params.target})
|
||||
except Exception as e:
|
||||
return _handle_error(e)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Entry point
|
||||
# =========================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
mcp.run()
|
||||
Loading…
Reference in a new issue