mcp-servers/mcp-gateway/homeassistant-mcp/homeassistant_mcp.py

590 lines
19 KiB
Python
Raw Normal View History

"""
Home Assistant MCP Server
=========================
MCP server for controlling and monitoring Home Assistant via the REST API.
Provides tools for managing entities, services, automations, scenes, and more.
"""
import json
import os
from typing import Optional, Any
import httpx
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
HASS_URL = os.environ.get("HASS_URL", "https://haos.wilddragoncore.online")
HASS_TOKEN = os.environ.get("HASS_TOKEN", "")
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP("homeassistant_mcp")
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"Authorization": f"Bearer {HASS_TOKEN}",
"Content-Type": "application/json",
}
async def _get(path: str, params: Optional[dict] = None) -> Any:
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.get(
f"{HASS_URL}/api/{path}", headers=_headers(), params=params
)
resp.raise_for_status()
return resp.json()
async def _post(path: str, payload: Any = None) -> Any:
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.post(
f"{HASS_URL}/api/{path}", headers=_headers(), json=payload
)
resp.raise_for_status()
try:
return resp.json()
except Exception:
return {"status": "ok"}
def _handle_error(e: Exception) -> str:
if isinstance(e, httpx.HTTPStatusError):
status = e.response.status_code
try:
body = e.response.json()
msg = body.get("message", str(body))
except Exception:
msg = e.response.text[:500]
return f"Error {status}: {msg}"
return f"Error: {str(e)}"
# ---------------------------------------------------------------------------
# Input Models
# ---------------------------------------------------------------------------
class EmptyInput(BaseModel):
pass
class EntityIdInput(BaseModel):
entity_id: str = Field(description="Entity ID (e.g. light.living_room)")
class ServiceCallInput(BaseModel):
domain: str = Field(description="Service domain (e.g. light, switch, climate)")
service: str = Field(description="Service name (e.g. turn_on, turn_off, toggle)")
entity_id: Optional[str] = Field(
default=None, description="Target entity ID"
)
service_data: Optional[dict] = Field(
default=None,
description="Additional service data (e.g. brightness, temperature)",
)
class EntitiesByDomainInput(BaseModel):
domain: str = Field(
description="Entity domain to filter by (e.g. light, switch, sensor, climate, media_player)"
)
class SetStateInput(BaseModel):
entity_id: str = Field(description="Entity ID to update")
state: str = Field(description="New state value")
attributes: Optional[dict] = Field(
default=None, description="Optional attributes to set"
)
class AutomationInput(BaseModel):
entity_id: str = Field(
description="Automation entity ID (e.g. automation.morning_routine)"
)
class EventInput(BaseModel):
event_type: str = Field(description="Event type to fire")
event_data: Optional[dict] = Field(
default=None, description="Optional event data payload"
)
class HistoryInput(BaseModel):
entity_id: str = Field(description="Entity ID to get history for")
hours: int = Field(default=24, description="Number of hours of history (max 72)")
class LogbookInput(BaseModel):
hours: int = Field(
default=24, description="Number of hours of logbook entries (max 72)"
)
entity_id: Optional[str] = Field(
default=None, description="Optional entity ID to filter by"
)
# ---------------------------------------------------------------------------
# Tools — System Info
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_get_config",
description="Get Home Assistant configuration and system info including version, location, units, and components.",
annotations={"readOnlyHint": True},
)
async def hass_get_config(params: EmptyInput) -> str:
try:
config = await _get("")
return json.dumps(
{
"version": config.get("version"),
"location_name": config.get("location_name"),
"latitude": config.get("latitude"),
"longitude": config.get("longitude"),
"elevation": config.get("elevation"),
"unit_system": config.get("unit_system"),
"time_zone": config.get("time_zone"),
"state": config.get("state"),
"components": sorted(config.get("components", []))[:50],
},
indent=2,
)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_check_api",
description="Check if Home Assistant API is reachable and responding.",
annotations={"readOnlyHint": True},
)
async def hass_check_api(params: EmptyInput) -> str:
try:
async with httpx.AsyncClient(verify=False, timeout=10) as client:
resp = await client.get(f"{HASS_URL}/api/", headers=_headers())
resp.raise_for_status()
return json.dumps({"status": "connected", "message": resp.json().get("message", "API running")})
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Entity Management
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_list_entities",
description="List all entities, optionally filtered by domain. Returns entity_id, state, and friendly name.",
annotations={"readOnlyHint": True},
)
async def hass_list_entities(params: EntitiesByDomainInput) -> str:
try:
states = await _get("states")
filtered = [
{
"entity_id": s["entity_id"],
"state": s["state"],
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
}
for s in states
if s["entity_id"].startswith(f"{params.domain}.")
]
return json.dumps(
{"domain": params.domain, "count": len(filtered), "entities": filtered},
indent=2,
)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_get_entity_state",
description="Get the full state and attributes of a specific entity.",
annotations={"readOnlyHint": True},
)
async def hass_get_entity_state(params: EntityIdInput) -> str:
try:
state = await _get(f"states/{params.entity_id}")
return json.dumps(
{
"entity_id": state["entity_id"],
"state": state["state"],
"attributes": state.get("attributes", {}),
"last_changed": state.get("last_changed"),
"last_updated": state.get("last_updated"),
},
indent=2,
)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_get_all_domains",
description="Get a summary of all entity domains and their counts.",
annotations={"readOnlyHint": True},
)
async def hass_get_all_domains(params: EmptyInput) -> str:
try:
states = await _get("states")
domains: dict[str, int] = {}
for s in states:
domain = s["entity_id"].split(".")[0]
domains[domain] = domains.get(domain, 0) + 1
sorted_domains = dict(sorted(domains.items(), key=lambda x: -x[1]))
return json.dumps(
{"total_entities": len(states), "domains": sorted_domains}, indent=2
)
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Service Calls (actions)
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_call_service",
description="Call any Home Assistant service. Use for turning on/off lights, switches, setting climate, playing media, etc.",
annotations={"readOnlyHint": False, "destructiveHint": False},
)
async def hass_call_service(params: ServiceCallInput) -> str:
try:
payload = params.service_data or {}
if params.entity_id:
payload["entity_id"] = params.entity_id
result = await _post(f"services/{params.domain}/{params.service}", payload)
changed = []
if isinstance(result, list):
changed = [
{"entity_id": s["entity_id"], "state": s["state"]}
for s in result[:10]
]
return json.dumps(
{
"status": "ok",
"service": f"{params.domain}.{params.service}",
"affected_entities": changed,
},
indent=2,
)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_turn_on",
description="Turn on an entity (light, switch, fan, etc.).",
annotations={"readOnlyHint": False},
)
async def hass_turn_on(params: EntityIdInput) -> str:
try:
domain = params.entity_id.split(".")[0]
result = await _post(
f"services/{domain}/turn_on", {"entity_id": params.entity_id}
)
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_on"})
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_turn_off",
description="Turn off an entity (light, switch, fan, etc.).",
annotations={"readOnlyHint": False},
)
async def hass_turn_off(params: EntityIdInput) -> str:
try:
domain = params.entity_id.split(".")[0]
result = await _post(
f"services/{domain}/turn_off", {"entity_id": params.entity_id}
)
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_off"})
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_toggle",
description="Toggle an entity (if on, turn off; if off, turn on).",
annotations={"readOnlyHint": False},
)
async def hass_toggle(params: EntityIdInput) -> str:
try:
domain = params.entity_id.split(".")[0]
result = await _post(
f"services/{domain}/toggle", {"entity_id": params.entity_id}
)
return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "toggle"})
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Automations & Scenes
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_list_automations",
description="List all automations with their current state (on/off) and last triggered time.",
annotations={"readOnlyHint": True},
)
async def hass_list_automations(params: EmptyInput) -> str:
try:
states = await _get("states")
automations = [
{
"entity_id": s["entity_id"],
"state": s["state"],
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
"last_triggered": s.get("attributes", {}).get("last_triggered"),
}
for s in states
if s["entity_id"].startswith("automation.")
]
return json.dumps({"count": len(automations), "automations": automations}, indent=2)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_trigger_automation",
description="Manually trigger an automation.",
annotations={"readOnlyHint": False},
)
async def hass_trigger_automation(params: AutomationInput) -> str:
try:
await _post(
"services/automation/trigger", {"entity_id": params.entity_id}
)
return json.dumps({"status": "ok", "triggered": params.entity_id})
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_list_scenes",
description="List all available scenes.",
annotations={"readOnlyHint": True},
)
async def hass_list_scenes(params: EmptyInput) -> str:
try:
states = await _get("states")
scenes = [
{
"entity_id": s["entity_id"],
"friendly_name": s.get("attributes", {}).get("friendly_name", ""),
}
for s in states
if s["entity_id"].startswith("scene.")
]
return json.dumps({"count": len(scenes), "scenes": scenes}, indent=2)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_activate_scene",
description="Activate a scene.",
annotations={"readOnlyHint": False},
)
async def hass_activate_scene(params: EntityIdInput) -> str:
try:
await _post("services/scene/turn_on", {"entity_id": params.entity_id})
return json.dumps({"status": "ok", "activated": params.entity_id})
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — History & Logbook
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_get_history",
description="Get state history for an entity over the last N hours.",
annotations={"readOnlyHint": True},
)
async def hass_get_history(params: HistoryInput) -> str:
try:
from datetime import datetime, timedelta, timezone
end = datetime.now(timezone.utc)
start = end - timedelta(hours=min(params.hours, 72))
ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00")
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.get(
f"{HASS_URL}/api/history/period/{ts}",
headers=_headers(),
params={"filter_entity_id": params.entity_id, "minimal_response": ""},
)
resp.raise_for_status()
data = resp.json()
if not data or not data[0]:
return json.dumps({"entity_id": params.entity_id, "history": []})
entries = [
{
"state": h.get("state"),
"last_changed": h.get("last_changed"),
}
for h in data[0][:100]
]
return json.dumps(
{"entity_id": params.entity_id, "count": len(entries), "history": entries},
indent=2,
)
except Exception as e:
return _handle_error(e)
@mcp.tool(
name="hass_get_logbook",
description="Get logbook entries for the last N hours, optionally filtered by entity.",
annotations={"readOnlyHint": True},
)
async def hass_get_logbook(params: LogbookInput) -> str:
try:
from datetime import datetime, timedelta, timezone
end = datetime.now(timezone.utc)
start = end - timedelta(hours=min(params.hours, 72))
ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00")
query_params = {}
if params.entity_id:
query_params["entity"] = params.entity_id
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.get(
f"{HASS_URL}/api/logbook/{ts}",
headers=_headers(),
params=query_params,
)
resp.raise_for_status()
data = resp.json()
entries = [
{
"when": e.get("when"),
"name": e.get("name"),
"message": e.get("message", ""),
"entity_id": e.get("entity_id", ""),
"state": e.get("state", ""),
}
for e in data[:100]
]
return json.dumps({"count": len(entries), "entries": entries}, indent=2)
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Services Discovery
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_list_services",
description="List available services for a specific domain (e.g. light, climate).",
annotations={"readOnlyHint": True},
)
async def hass_list_services(params: EntitiesByDomainInput) -> str:
try:
services = await _get("services")
for svc in services:
if svc.get("domain") == params.domain:
service_list = []
for name, info in svc.get("services", {}).items():
service_list.append(
{
"service": f"{params.domain}.{name}",
"description": info.get("description", ""),
}
)
return json.dumps(
{"domain": params.domain, "count": len(service_list), "services": service_list},
indent=2,
)
return json.dumps({"error": f"Domain '{params.domain}' not found"})
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Events
# ---------------------------------------------------------------------------
@mcp.tool(
name="hass_fire_event",
description="Fire a custom event on the event bus.",
annotations={"readOnlyHint": False, "destructiveHint": False},
)
async def hass_fire_event(params: EventInput) -> str:
try:
result = await _post(f"events/{params.event_type}", params.event_data)
return json.dumps({"status": "ok", "event_type": params.event_type, "result": result})
except Exception as e:
return _handle_error(e)
# ---------------------------------------------------------------------------
# Tools — Notifications
# ---------------------------------------------------------------------------
class NotifyInput(BaseModel):
message: str = Field(description="Notification message text")
title: Optional[str] = Field(default=None, description="Optional notification title")
target: str = Field(
default="notify",
description="Notification service target (e.g. 'notify' for default, 'mobile_app_phone' for specific device)",
)
@mcp.tool(
name="hass_send_notification",
description="Send a notification via Home Assistant's notification services.",
annotations={"readOnlyHint": False},
)
async def hass_send_notification(params: NotifyInput) -> str:
try:
payload: dict[str, Any] = {"message": params.message}
if params.title:
payload["title"] = params.title
await _post(f"services/notify/{params.target}", payload)
return json.dumps({"status": "ok", "sent_to": params.target})
except Exception as e:
return _handle_error(e)
# =========================================================================
# Entry point
# =========================================================================
if __name__ == "__main__":
mcp.run()