From be3acaf5dec450d38cba2ab21cdc2f89ef943075 Mon Sep 17 00:00:00 2001 From: zgaetano Date: Tue, 31 Mar 2026 15:33:12 -0400 Subject: [PATCH] Remove mcp-gateway/homeassistant-mcp/homeassistant_mcp.py --- .../homeassistant-mcp/homeassistant_mcp.py | 589 ------------------ 1 file changed, 589 deletions(-) delete mode 100644 mcp-gateway/homeassistant-mcp/homeassistant_mcp.py diff --git a/mcp-gateway/homeassistant-mcp/homeassistant_mcp.py b/mcp-gateway/homeassistant-mcp/homeassistant_mcp.py deleted file mode 100644 index 6d5f90f..0000000 --- a/mcp-gateway/homeassistant-mcp/homeassistant_mcp.py +++ /dev/null @@ -1,589 +0,0 @@ -""" -Home Assistant MCP Server -========================= -MCP server for controlling and monitoring Home Assistant via the REST API. -Provides tools for managing entities, services, automations, scenes, and more. -""" - -import json -import os -from typing import Optional, Any - -import httpx -from mcp.server.fastmcp import FastMCP -from pydantic import BaseModel, Field - -# --------------------------------------------------------------------------- -# Configuration -# --------------------------------------------------------------------------- - -HASS_URL = os.environ.get("HASS_URL", "https://haos.wilddragoncore.online") -HASS_TOKEN = os.environ.get("HASS_TOKEN", "") - -# --------------------------------------------------------------------------- -# MCP Server -# --------------------------------------------------------------------------- - -mcp = FastMCP("homeassistant_mcp") - -# --------------------------------------------------------------------------- -# HTTP helpers -# --------------------------------------------------------------------------- - - -def _headers() -> dict: - return { - "Authorization": f"Bearer {HASS_TOKEN}", - "Content-Type": "application/json", - } - - -async def _get(path: str, params: Optional[dict] = None) -> Any: - async with httpx.AsyncClient(verify=False, timeout=30) as client: - resp = await client.get( - f"{HASS_URL}/api/{path}", headers=_headers(), params=params - ) - resp.raise_for_status() - return resp.json() - - -async def _post(path: str, payload: Any = None) -> Any: - async with httpx.AsyncClient(verify=False, timeout=30) as client: - resp = await client.post( - f"{HASS_URL}/api/{path}", headers=_headers(), json=payload - ) - resp.raise_for_status() - try: - return resp.json() - except Exception: - return {"status": "ok"} - - -def _handle_error(e: Exception) -> str: - if isinstance(e, httpx.HTTPStatusError): - status = e.response.status_code - try: - body = e.response.json() - msg = body.get("message", str(body)) - except Exception: - msg = e.response.text[:500] - return f"Error {status}: {msg}" - return f"Error: {str(e)}" - - -# --------------------------------------------------------------------------- -# Input Models -# --------------------------------------------------------------------------- - - -class EmptyInput(BaseModel): - pass - - -class EntityIdInput(BaseModel): - entity_id: str = Field(description="Entity ID (e.g. light.living_room)") - - -class ServiceCallInput(BaseModel): - domain: str = Field(description="Service domain (e.g. light, switch, climate)") - service: str = Field(description="Service name (e.g. turn_on, turn_off, toggle)") - entity_id: Optional[str] = Field( - default=None, description="Target entity ID" - ) - service_data: Optional[dict] = Field( - default=None, - description="Additional service data (e.g. brightness, temperature)", - ) - - -class EntitiesByDomainInput(BaseModel): - domain: str = Field( - description="Entity domain to filter by (e.g. light, switch, sensor, climate, media_player)" - ) - - -class SetStateInput(BaseModel): - entity_id: str = Field(description="Entity ID to update") - state: str = Field(description="New state value") - attributes: Optional[dict] = Field( - default=None, description="Optional attributes to set" - ) - - -class AutomationInput(BaseModel): - entity_id: str = Field( - description="Automation entity ID (e.g. automation.morning_routine)" - ) - - -class EventInput(BaseModel): - event_type: str = Field(description="Event type to fire") - event_data: Optional[dict] = Field( - default=None, description="Optional event data payload" - ) - - -class HistoryInput(BaseModel): - entity_id: str = Field(description="Entity ID to get history for") - hours: int = Field(default=24, description="Number of hours of history (max 72)") - - -class LogbookInput(BaseModel): - hours: int = Field( - default=24, description="Number of hours of logbook entries (max 72)" - ) - entity_id: Optional[str] = Field( - default=None, description="Optional entity ID to filter by" - ) - - -# --------------------------------------------------------------------------- -# Tools — System Info -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_get_config", - description="Get Home Assistant configuration and system info including version, location, units, and components.", - annotations={"readOnlyHint": True}, -) -async def hass_get_config(params: EmptyInput) -> str: - try: - config = await _get("") - return json.dumps( - { - "version": config.get("version"), - "location_name": config.get("location_name"), - "latitude": config.get("latitude"), - "longitude": config.get("longitude"), - "elevation": config.get("elevation"), - "unit_system": config.get("unit_system"), - "time_zone": config.get("time_zone"), - "state": config.get("state"), - "components": sorted(config.get("components", []))[:50], - }, - indent=2, - ) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_check_api", - description="Check if Home Assistant API is reachable and responding.", - annotations={"readOnlyHint": True}, -) -async def hass_check_api(params: EmptyInput) -> str: - try: - async with httpx.AsyncClient(verify=False, timeout=10) as client: - resp = await client.get(f"{HASS_URL}/api/", headers=_headers()) - resp.raise_for_status() - return json.dumps({"status": "connected", "message": resp.json().get("message", "API running")}) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Entity Management -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_list_entities", - description="List all entities, optionally filtered by domain. Returns entity_id, state, and friendly name.", - annotations={"readOnlyHint": True}, -) -async def hass_list_entities(params: EntitiesByDomainInput) -> str: - try: - states = await _get("states") - filtered = [ - { - "entity_id": s["entity_id"], - "state": s["state"], - "friendly_name": s.get("attributes", {}).get("friendly_name", ""), - } - for s in states - if s["entity_id"].startswith(f"{params.domain}.") - ] - return json.dumps( - {"domain": params.domain, "count": len(filtered), "entities": filtered}, - indent=2, - ) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_get_entity_state", - description="Get the full state and attributes of a specific entity.", - annotations={"readOnlyHint": True}, -) -async def hass_get_entity_state(params: EntityIdInput) -> str: - try: - state = await _get(f"states/{params.entity_id}") - return json.dumps( - { - "entity_id": state["entity_id"], - "state": state["state"], - "attributes": state.get("attributes", {}), - "last_changed": state.get("last_changed"), - "last_updated": state.get("last_updated"), - }, - indent=2, - ) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_get_all_domains", - description="Get a summary of all entity domains and their counts.", - annotations={"readOnlyHint": True}, -) -async def hass_get_all_domains(params: EmptyInput) -> str: - try: - states = await _get("states") - domains: dict[str, int] = {} - for s in states: - domain = s["entity_id"].split(".")[0] - domains[domain] = domains.get(domain, 0) + 1 - sorted_domains = dict(sorted(domains.items(), key=lambda x: -x[1])) - return json.dumps( - {"total_entities": len(states), "domains": sorted_domains}, indent=2 - ) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Service Calls (actions) -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_call_service", - description="Call any Home Assistant service. Use for turning on/off lights, switches, setting climate, playing media, etc.", - annotations={"readOnlyHint": False, "destructiveHint": False}, -) -async def hass_call_service(params: ServiceCallInput) -> str: - try: - payload = params.service_data or {} - if params.entity_id: - payload["entity_id"] = params.entity_id - - result = await _post(f"services/{params.domain}/{params.service}", payload) - changed = [] - if isinstance(result, list): - changed = [ - {"entity_id": s["entity_id"], "state": s["state"]} - for s in result[:10] - ] - return json.dumps( - { - "status": "ok", - "service": f"{params.domain}.{params.service}", - "affected_entities": changed, - }, - indent=2, - ) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_turn_on", - description="Turn on an entity (light, switch, fan, etc.).", - annotations={"readOnlyHint": False}, -) -async def hass_turn_on(params: EntityIdInput) -> str: - try: - domain = params.entity_id.split(".")[0] - result = await _post( - f"services/{domain}/turn_on", {"entity_id": params.entity_id} - ) - return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_on"}) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_turn_off", - description="Turn off an entity (light, switch, fan, etc.).", - annotations={"readOnlyHint": False}, -) -async def hass_turn_off(params: EntityIdInput) -> str: - try: - domain = params.entity_id.split(".")[0] - result = await _post( - f"services/{domain}/turn_off", {"entity_id": params.entity_id} - ) - return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "turn_off"}) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_toggle", - description="Toggle an entity (if on, turn off; if off, turn on).", - annotations={"readOnlyHint": False}, -) -async def hass_toggle(params: EntityIdInput) -> str: - try: - domain = params.entity_id.split(".")[0] - result = await _post( - f"services/{domain}/toggle", {"entity_id": params.entity_id} - ) - return json.dumps({"status": "ok", "entity_id": params.entity_id, "action": "toggle"}) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Automations & Scenes -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_list_automations", - description="List all automations with their current state (on/off) and last triggered time.", - annotations={"readOnlyHint": True}, -) -async def hass_list_automations(params: EmptyInput) -> str: - try: - states = await _get("states") - automations = [ - { - "entity_id": s["entity_id"], - "state": s["state"], - "friendly_name": s.get("attributes", {}).get("friendly_name", ""), - "last_triggered": s.get("attributes", {}).get("last_triggered"), - } - for s in states - if s["entity_id"].startswith("automation.") - ] - return json.dumps({"count": len(automations), "automations": automations}, indent=2) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_trigger_automation", - description="Manually trigger an automation.", - annotations={"readOnlyHint": False}, -) -async def hass_trigger_automation(params: AutomationInput) -> str: - try: - await _post( - "services/automation/trigger", {"entity_id": params.entity_id} - ) - return json.dumps({"status": "ok", "triggered": params.entity_id}) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_list_scenes", - description="List all available scenes.", - annotations={"readOnlyHint": True}, -) -async def hass_list_scenes(params: EmptyInput) -> str: - try: - states = await _get("states") - scenes = [ - { - "entity_id": s["entity_id"], - "friendly_name": s.get("attributes", {}).get("friendly_name", ""), - } - for s in states - if s["entity_id"].startswith("scene.") - ] - return json.dumps({"count": len(scenes), "scenes": scenes}, indent=2) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_activate_scene", - description="Activate a scene.", - annotations={"readOnlyHint": False}, -) -async def hass_activate_scene(params: EntityIdInput) -> str: - try: - await _post("services/scene/turn_on", {"entity_id": params.entity_id}) - return json.dumps({"status": "ok", "activated": params.entity_id}) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — History & Logbook -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_get_history", - description="Get state history for an entity over the last N hours.", - annotations={"readOnlyHint": True}, -) -async def hass_get_history(params: HistoryInput) -> str: - try: - from datetime import datetime, timedelta, timezone - - end = datetime.now(timezone.utc) - start = end - timedelta(hours=min(params.hours, 72)) - ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00") - - async with httpx.AsyncClient(verify=False, timeout=30) as client: - resp = await client.get( - f"{HASS_URL}/api/history/period/{ts}", - headers=_headers(), - params={"filter_entity_id": params.entity_id, "minimal_response": ""}, - ) - resp.raise_for_status() - data = resp.json() - - if not data or not data[0]: - return json.dumps({"entity_id": params.entity_id, "history": []}) - - entries = [ - { - "state": h.get("state"), - "last_changed": h.get("last_changed"), - } - for h in data[0][:100] - ] - return json.dumps( - {"entity_id": params.entity_id, "count": len(entries), "history": entries}, - indent=2, - ) - except Exception as e: - return _handle_error(e) - - -@mcp.tool( - name="hass_get_logbook", - description="Get logbook entries for the last N hours, optionally filtered by entity.", - annotations={"readOnlyHint": True}, -) -async def hass_get_logbook(params: LogbookInput) -> str: - try: - from datetime import datetime, timedelta, timezone - - end = datetime.now(timezone.utc) - start = end - timedelta(hours=min(params.hours, 72)) - ts = start.strftime("%Y-%m-%dT%H:%M:%S+00:00") - - query_params = {} - if params.entity_id: - query_params["entity"] = params.entity_id - - async with httpx.AsyncClient(verify=False, timeout=30) as client: - resp = await client.get( - f"{HASS_URL}/api/logbook/{ts}", - headers=_headers(), - params=query_params, - ) - resp.raise_for_status() - data = resp.json() - - entries = [ - { - "when": e.get("when"), - "name": e.get("name"), - "message": e.get("message", ""), - "entity_id": e.get("entity_id", ""), - "state": e.get("state", ""), - } - for e in data[:100] - ] - return json.dumps({"count": len(entries), "entries": entries}, indent=2) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Services Discovery -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_list_services", - description="List available services for a specific domain (e.g. light, climate).", - annotations={"readOnlyHint": True}, -) -async def hass_list_services(params: EntitiesByDomainInput) -> str: - try: - services = await _get("services") - for svc in services: - if svc.get("domain") == params.domain: - service_list = [] - for name, info in svc.get("services", {}).items(): - service_list.append( - { - "service": f"{params.domain}.{name}", - "description": info.get("description", ""), - } - ) - return json.dumps( - {"domain": params.domain, "count": len(service_list), "services": service_list}, - indent=2, - ) - return json.dumps({"error": f"Domain '{params.domain}' not found"}) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Events -# --------------------------------------------------------------------------- - - -@mcp.tool( - name="hass_fire_event", - description="Fire a custom event on the event bus.", - annotations={"readOnlyHint": False, "destructiveHint": False}, -) -async def hass_fire_event(params: EventInput) -> str: - try: - result = await _post(f"events/{params.event_type}", params.event_data) - return json.dumps({"status": "ok", "event_type": params.event_type, "result": result}) - except Exception as e: - return _handle_error(e) - - -# --------------------------------------------------------------------------- -# Tools — Notifications -# --------------------------------------------------------------------------- - - -class NotifyInput(BaseModel): - message: str = Field(description="Notification message text") - title: Optional[str] = Field(default=None, description="Optional notification title") - target: str = Field( - default="notify", - description="Notification service target (e.g. 'notify' for default, 'mobile_app_phone' for specific device)", - ) - - -@mcp.tool( - name="hass_send_notification", - description="Send a notification via Home Assistant's notification services.", - annotations={"readOnlyHint": False}, -) -async def hass_send_notification(params: NotifyInput) -> str: - try: - payload: dict[str, Any] = {"message": params.message} - if params.title: - payload["title"] = params.title - await _post(f"services/notify/{params.target}", payload) - return json.dumps({"status": "ok", "sent_to": params.target}) - except Exception as e: - return _handle_error(e) - - -# ========================================================================= -# Entry point -# ========================================================================= - -if __name__ == "__main__": - mcp.run()