diff --git a/homeassistant-mcp/homeassistant_mcp.py b/homeassistant-mcp/homeassistant_mcp.py new file mode 100644 index 0000000..6d5f90f --- /dev/null +++ b/homeassistant-mcp/homeassistant_mcp.py @@ -0,0 +1,589 @@ +""" +Home Assistant MCP Server +========================= +MCP server for controlling and monitoring Home Assistant via the REST API. +Provides tools for managing entities, services, automations, scenes, and more. +""" + +import json +import os +from typing import Optional, Any + +import httpx +from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel, Field + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +HASS_URL = os.environ.get("HASS_URL", "https://haos.wilddragoncore.online") +HASS_TOKEN = os.environ.get("HASS_TOKEN", "") + +# --------------------------------------------------------------------------- +# MCP Server +# --------------------------------------------------------------------------- + +mcp = FastMCP("homeassistant_mcp") + +# --------------------------------------------------------------------------- +# HTTP helpers +# --------------------------------------------------------------------------- + + +def _headers() -> dict: + return { + "Authorization": f"Bearer {HASS_TOKEN}", + "Content-Type": "application/json", + } + + +async def _get(path: str, params: Optional[dict] = None) -> Any: + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.get( + f"{HASS_URL}/api/{path}", headers=_headers(), params=params + ) + resp.raise_for_status() + return resp.json() + + +async def _post(path: str, payload: Any = None) -> Any: + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.post( + f"{HASS_URL}/api/{path}", headers=_headers(), json=payload + ) + resp.raise_for_status() + try: + return resp.json() + except Exception: + return {"status": "ok"} + + +def _handle_error(e: Exception) -> str: + if isinstance(e, httpx.HTTPStatusError): + status = e.response.status_code + try: + body = e.response.json() + msg = body.get("message", str(body)) + except Exception: + msg = e.response.text[:500] + return f"Error {status}: {msg}" + return f"Error: {str(e)}" + + +# --------------------------------------------------------------------------- +# Input Models +# --------------------------------------------------------------------------- + + +class EmptyInput(BaseModel): + pass + + +class EntityIdInput(BaseModel): + entity_id: str = Field(description="Entity ID (e.g. light.living_room)") + + +class ServiceCallInput(BaseModel): + domain: str = Field(description="Service domain (e.g. light, switch, climate)") + service: str = Field(description="Service name (e.g. turn_on, turn_off, toggle)") + entity_id: Optional[str] = Field( + default=None, description="Target entity ID" + ) + service_data: Optional[dict] = Field( + default=None, + description="Additional service data (e.g. brightness, temperature)", + ) + + +class EntitiesByDomainInput(BaseModel): + domain: str = Field( + description="Entity domain to filter by (e.g. light, switch, sensor, climate, media_player)" + ) + + +class SetStateInput(BaseModel): + entity_id: str = Field(description="Entity ID to update") + state: str = Field(description="New state value") + attributes: Optional[dict] = Field( + default=None, description="Optional attributes to set" + ) + + +class AutomationInput(BaseModel): + entity_id: str = Field( + description="Automation entity ID (e.g. automation.morning_routine)" + ) + + +class EventInput(BaseModel): + event_type: str = Field(description="Event type to fire") + event_data: Optional[dict] = Field( + default=None, description="Optional event data payload" + ) + + +class HistoryInput(BaseModel): + entity_id: str = Field(description="Entity ID to get history for") + hours: int = Field(default=24, description="Number of hours of history (max 72)") + + +class LogbookInput(BaseModel): + hours: int = Field( + default=24, description="Number of hours of logbook entries (max 72)" + ) + entity_id: Optional[str] = Field( + default=None, description="Optional entity ID to filter by" + ) + + +# --------------------------------------------------------------------------- +# Tools — System Info +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_get_config", + description="Get Home Assistant configuration and system info including version, location, units, and components.", + annotations={"readOnlyHint": True}, +) +async def hass_get_config(params: EmptyInput) -> str: + try: + config = await _get("") + return json.dumps( + { + "version": config.get("version"), + "location_name": config.get("location_name"), + "latitude": config.get("latitude"), + "longitude": config.get("longitude"), + "elevation": config.get("elevation"), + "unit_system": config.get("unit_system"), + "time_zone": config.get("time_zone"), + "state": config.get("state"), + "components": sorted(config.get("components", []))[:50], + }, + indent=2, + ) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_check_api", + description="Check if Home Assistant API is reachable and responding.", + annotations={"readOnlyHint": True}, +) +async def hass_check_api(params: EmptyInput) -> str: + try: + async with httpx.AsyncClient(verify=False, timeout=10) as client: + resp = await client.get(f"{HASS_URL}/api/", headers=_headers()) + resp.raise_for_status() + return json.dumps({"status": "connected", "message": resp.json().get("message", "API running")}) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Entity Management +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_list_entities", + description="List all entities, optionally filtered by domain. Returns entity_id, state, and friendly name.", + annotations={"readOnlyHint": True}, +) +async def hass_list_entities(params: EntitiesByDomainInput) -> str: + try: + states = await _get("states") + filtered = [ + { + "entity_id": s["entity_id"], + "state": s["state"], + "friendly_name": s.get("attributes", {}).get("friendly_name", ""), + } + for s in states + if s["entity_id"].startswith(f"{params.domain}.") + ] + return json.dumps( + {"domain": params.domain, "count": len(filtered), "entities": filtered}, + indent=2, + ) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_get_entity_state", + description="Get the full state and attributes of a specific entity.", + annotations={"readOnlyHint": True}, +) +async def hass_get_entity_state(params: EntityIdInput) -> str: + try: + state = await _get(f"states/{params.entity_id}") + return json.dumps( + { + "entity_id": state["entity_id"], + "state": state["state"], + "attributes": state.get("attributes", {}), + "last_changed": state.get("last_changed"), + "last_updated": state.get("last_updated"), + }, + indent=2, + ) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_get_all_domains", + description="Get a summary of all entity domains and their counts.", + annotations={"readOnlyHint": True}, +) +async def hass_get_all_domains(params: EmptyInput) -> str: + try: + states = await _get("states") + domains: dict[str, int] = {} + for s in states: + domain = s["entity_id"].split(".")[0] + domains[domain] = domains.get(domain, 0) + 1 + sorted_domains = dict(sorted(domains.items(), key=lambda x: -x[1])) + return json.dumps( + {"total_entities": len(states), "domains": sorted_domains}, indent=2 + ) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Service Calls (actions) +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_call_service", + description="Call any Home Assistant service. Use for turning on/off lights, switches, setting climate, playing media, etc.", + annotations={"readOnlyHint": False, "destructiveHint": False}, +) +async def hass_call_service(params: ServiceCallInput) -> str: + try: + payload = params.service_data or {} + if params.entity_id: + payload["entity_id"] = params.entity_id + + result = await _post(f"services/{params.domain}/{params.service}", payload) + changed = [] + if isinstance(result, list): + changed = [ + {"entity_id": s["entity_id"], "state": s["state"]} + for s in result[:10] + ] + return json.dumps( + { + "status": "ok", + "service": f"{params.domain}.{params.service}", + "affected_entities": changed, + }, + indent=2, + ) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_turn_on", + description="Turn on an entity (light, switch, fan, etc.).", + annotations={"readOnlyHint": False}, +) +async def hass_turn_on(params: EntityIdInput) -> str: + try: + domain = params.entity_id.split(".")[0] + result = await _post( + f"services/{domain}/turn_on", {"entity_id": params.entity_id} + ) + return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_on"}) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_turn_off", + description="Turn off an entity (light, switch, fan, etc.).", + annotations={"readOnlyHint": False}, +) +async def hass_turn_off(params: EntityIdInput) -> str: + try: + domain = params.entity_id.split(".")[0] + result = await _post( + f"services/{domain}/turn_off", {"entity_id": params.entity_id} + ) + return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_off"}) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_toggle", + description="Toggle an entity (if on, turn off; if off, turn on).", + annotations={"readOnlyHint": False}, +) +async def hass_toggle(params: EntityIdInput) -> str: + try: + domain = params.entity_id.split(".")[0] + result = await _post( + f"services/{domain}/toggle", {"entity_id": params.entity_id} + ) + return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "toggle"}) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Automations & Scenes +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_list_automations", + description="List all automations with their current state (on/off) and last triggered time.", + annotations={"readOnlyHint": True}, +) +async def hass_list_automations(params: EmptyInput) -> str: + try: + states = await _get("states") + automations = [ + { + "entity_id": s["entity_id"], + "state": s["state"], + "friendly_name": s.get("attributes", {}).get("friendly_name", ""), + "last_triggered": s.get("attributes", {}).get("last_triggered"), + } + for s in states + if s["entity_id"].startswith("automation.") + ] + return json.dumps({"count": len(automations), "automations": automations}, indent=2) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_trigger_automation", + description="Manually trigger an automation.", + annotations={"readOnlyHint": False}, +) +async def hass_trigger_automation(params: AutomationInput) -> str: + try: + await _post( + "services/automation/trigger", {"entity_id": params.entity_id} + ) + return json.dumps({"status": "ok", "triggered": params.entity_id}) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_list_scenes", + description="List all available scenes.", + annotations={"readOnlyHint": True}, +) +async def hass_list_scenes(params: EmptyInput) -> str: + try: + states = await _get("states") + scenes = [ + { + "entity_id": s["entity_id"], + "friendly_name": s.get("attributes", {}).get("friendly_name", ""), + } + for s in states + if s["entity_id"].startswith("scene.") + ] + return json.dumps({"count": len(scenes), "scenes": scenes}, indent=2) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_activate_scene", + description="Activate a scene.", + annotations={"readOnlyHint": False}, +) +async def hass_activate_scene(params: EntityIdInput) -> str: + try: + await _post("services/scene/turn_on", {"entity_id": params.entity_id}) + return json.dumps({"status": "ok", "activated": params.entity_id}) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — History & Logbook +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_get_history", + description="Get state history for an entity over the last N hours.", + annotations={"readOnlyHint": True}, +) +async def hass_get_history(params: HistoryInput) -> str: + try: + from datetime import datetime, timedelta, timezone + + end = datetime.now(timezone.utc) + start = end - timedelta(hours=min(params.hours, 72)) + ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00") + + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.get( + f"{HASS_URL}/api/history/period/{ts}", + headers=_headers(), + params={"filter_entity_id": params.entity_id, "minimal_response": ""}, + ) + resp.raise_for_status() + data = resp.json() + + if not data or not data[0]: + return json.dumps({"entity_id": params.entity_id, "history": []}) + + entries = [ + { + "state": h.get("state"), + "last_changed": h.get("last_changed"), + } + for h in data[0][:100] + ] + return json.dumps( + {"entity_id": params.entity_id, "count": len(entries), "history": entries}, + indent=2, + ) + except Exception as e: + return _handle_error(e) + + +@mcp.tool( + name="hass_get_logbook", + description="Get logbook entries for the last N hours, optionally filtered by entity.", + annotations={"readOnlyHint": True}, +) +async def hass_get_logbook(params: LogbookInput) -> str: + try: + from datetime import datetime, timedelta, timezone + + end = datetime.now(timezone.utc) + start = end - timedelta(hours=min(params.hours, 72)) + ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00") + + query_params = {} + if params.entity_id: + query_params["entity"] = params.entity_id + + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.get( + f"{HASS_URL}/api/logbook/{ts}", + headers=_headers(), + params=query_params, + ) + resp.raise_for_status() + data = resp.json() + + entries = [ + { + "when": e.get("when"), + "name": e.get("name"), + "message": e.get("message", ""), + "entity_id": e.get("entity_id", ""), + "state": e.get("state", ""), + } + for e in data[:100] + ] + return json.dumps({"count": len(entries), "entries": entries}, indent=2) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Services Discovery +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_list_services", + description="List available services for a specific domain (e.g. light, climate).", + annotations={"readOnlyHint": True}, +) +async def hass_list_services(params: EntitiesByDomainInput) -> str: + try: + services = await _get("services") + for svc in services: + if svc.get("domain") == params.domain: + service_list = [] + for name, info in svc.get("services", {}).items(): + service_list.append( + { + "service": f"{params.domain}.{name}", + "description": info.get("description", ""), + } + ) + return json.dumps( + {"domain": params.domain, "count": len(service_list), "services": service_list}, + indent=2, + ) + return json.dumps({"error": f"Domain '{params.domain}' not found"}) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Events +# --------------------------------------------------------------------------- + + +@mcp.tool( + name="hass_fire_event", + description="Fire a custom event on the event bus.", + annotations={"readOnlyHint": False, "destructiveHint": False}, +) +async def hass_fire_event(params: EventInput) -> str: + try: + result = await _post(f"events/{params.event_type}", params.event_data) + return json.dumps({"status": "ok", "event_type": params.event_type, "result": result}) + except Exception as e: + return _handle_error(e) + + +# --------------------------------------------------------------------------- +# Tools — Notifications +# --------------------------------------------------------------------------- + + +class NotifyInput(BaseModel): + message: str = Field(description="Notification message text") + title: Optional[str] = Field(default=None, description="Optional notification title") + target: str = Field( + default="notify", + description="Notification service target (e.g. 'notify' for default, 'mobile_app_phone' for specific device)", + ) + + +@mcp.tool( + name="hass_send_notification", + description="Send a notification via Home Assistant's notification services.", + annotations={"readOnlyHint": False}, +) +async def hass_send_notification(params: NotifyInput) -> str: + try: + payload: dict[str, Any] = {"message": params.message} + if params.title: + payload["title"] = params.title + await _post(f"services/notify/{params.target}", payload) + return json.dumps({"status": "ok", "sent_to": params.target}) + except Exception as e: + return _handle_error(e) + + +# ========================================================================= +# Entry point +# ========================================================================= + +if __name__ == "__main__": + mcp.run()