From 9f1b98a5e6a0d938883f8ab7a5aea9ea6d79b444 Mon Sep 17 00:00:00 2001 From: zgaetano Date: Tue, 31 Mar 2026 15:33:45 -0400 Subject: [PATCH] Add truenas-mcp/truenas_mcp.py --- truenas-mcp/truenas_mcp.py | 1320 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1320 insertions(+) create mode 100644 truenas-mcp/truenas_mcp.py diff --git a/truenas-mcp/truenas_mcp.py b/truenas-mcp/truenas_mcp.py new file mode 100644 index 0000000..2794104 --- /dev/null +++ b/truenas-mcp/truenas_mcp.py @@ -0,0 +1,1320 @@ +""" +TrueNAS MCP Server +================== +MCP server for managing TrueNAS SCALE systems via the REST API v2.0. +Provides tools for monitoring system health, storage pools, datasets, +services, shares, apps, VMs, disks, and alerts. +""" + +import json +import os +import urllib.parse +from typing import Optional, List, Any, Dict + +import httpx +from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel, Field, ConfigDict, field_validator +from enum import Enum + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +TRUENAS_URL = os.environ.get("TRUENAS_URL", "") +TRUENAS_API_KEY = os.environ.get("TRUENAS_API_KEY", "") +BASE = f"{TRUENAS_URL}/api/v2.0" + +# --------------------------------------------------------------------------- +# MCP Server +# --------------------------------------------------------------------------- + +mcp = FastMCP("truenas_mcp") + +# --------------------------------------------------------------------------- +# Shared HTTP helpers +# --------------------------------------------------------------------------- + + +def _headers() -> dict: + return { + "Authorization": f"Bearer {TRUENAS_API_KEY}", + "Content-Type": "application/json", + } + + +async def _get(path: str, params: Optional[dict] = None) -> Any: + """HTTP GET against the TrueNAS REST API.""" + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.get(f"{BASE}/{path}", headers=_headers(), params=params) + resp.raise_for_status() + return resp.json() + + +async def _post(path: str, payload: Any = None) -> Any: + """HTTP POST against the TrueNAS REST API.""" + async with httpx.AsyncClient(verify=False, timeout=60) as client: + resp = await client.post( + f"{BASE}/{path}", headers=_headers(), json=payload + ) + resp.raise_for_status() + return resp.json() + + +async def _put(path: str, payload: Any = None) -> Any: + """HTTP PUT against the TrueNAS REST API.""" + async with httpx.AsyncClient(verify=False, timeout=60) as client: + resp = await client.put( + f"{BASE}/{path}", headers=_headers(), json=payload + ) + resp.raise_for_status() + return resp.json() + + +async def _delete(path: str) -> Any: + """HTTP DELETE against the TrueNAS REST API.""" + async with httpx.AsyncClient(verify=False, timeout=30) as client: + resp = await client.delete(f"{BASE}/{path}", headers=_headers()) + resp.raise_for_status() + try: + return resp.json() + except Exception: + return {"status": "deleted"} + + +def _handle_api_error(e: Exception) -> str: + if isinstance(e, httpx.HTTPStatusError): + status = e.response.status_code + try: + body = e.response.json() + msg = body.get("message", str(body)) + except Exception: + msg = e.response.text[:500] + if status == 404: + return f"Error 404: Resource not found. {msg}" + elif status == 403: + return f"Error 403: Permission denied. {msg}" + elif status == 405: + return f"Error 405: Method not allowed for this endpoint. {msg}" + elif status == 422: + return f"Error 422: Validation error. {msg}" + elif status == 429: + return "Error 429: Rate limit exceeded. Wait and retry." + return f"Error {status}: {msg}" + elif isinstance(e, httpx.TimeoutException): + return "Error: Request timed out. The server may be busy." + return f"Error: {type(e).__name__}: {e}" + + +def _bytes_human(n: int) -> str: + """Convert bytes to human-readable string.""" + for unit in ("B", "KiB", "MiB", "GiB", "TiB", "PiB"): + if abs(n) < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} EiB" + + +# --------------------------------------------------------------------------- +# Input models +# --------------------------------------------------------------------------- + + +class EmptyInput(BaseModel): + model_config = ConfigDict(extra="forbid") + + +class IdInput(BaseModel): + model_config = ConfigDict(extra="forbid") + id: int = Field(..., description="Numeric ID of the resource") + + +class NameInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + name: str = Field(..., description="Name of the resource", min_length=1) + + +class DatasetIdInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + dataset_id: str = Field( + ..., + description="Full dataset path, e.g. 'NVME/SMB' or 'Storage'. URL-encoded automatically.", + min_length=1, + ) + + +class ListDatasetsInput(BaseModel): + model_config = ConfigDict(extra="forbid") + pool: Optional[str] = Field( + default=None, + description="Optional pool name to filter datasets (e.g. 'NVME', 'Storage'). If omitted, returns all datasets.", + ) + limit: int = Field(default=50, description="Max datasets to return", ge=1, le=500) + offset: int = Field(default=0, description="Offset for pagination", ge=0) + + +class ServiceActionInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + service: str = Field( + ..., + description="Service name, e.g. 'cifs', 'nfs', 'ssh', 'ftp', 'iscsitarget', 'snmp'", + min_length=1, + ) + + +class CreateDatasetInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + name: str = Field( + ..., + description="Full dataset name including pool, e.g. 'NVME/my_new_dataset'", + min_length=1, + ) + comments: Optional[str] = Field(default=None, description="Optional description/comments") + compression: Optional[str] = Field( + default=None, + description="Compression algorithm: 'LZ4', 'GZIP', 'ZLE', 'LZJB', 'ZSTD', or 'OFF'", + ) + quota: Optional[int] = Field( + default=None, + description="Quota in bytes. 0 means no quota.", + ge=0, + ) + readonly: Optional[bool] = Field(default=None, description="Set dataset read-only") + + +class CreateSnapshotInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + dataset: str = Field( + ..., + description="Dataset name to snapshot, e.g. 'NVME/SMB'", + min_length=1, + ) + name: str = Field( + ..., + description="Snapshot name suffix, e.g. 'manual-backup-2024'", + min_length=1, + ) + recursive: bool = Field(default=False, description="Include child datasets") + + +class ListSnapshotTasksInput(BaseModel): + model_config = ConfigDict(extra="forbid") + + +class AppActionInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + app_name: str = Field( + ..., + description="Name of the app, e.g. 'plex', 'nextcloud', 'sonarr'", + min_length=1, + ) + + +class VMActionInput(BaseModel): + model_config = ConfigDict(extra="forbid") + vm_id: int = Field(..., description="Numeric VM ID") + + +class CreateSMBShareInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + path: str = Field(..., description="Filesystem path to share, e.g. '/mnt/NVME/myshare'", min_length=1) + name: str = Field(..., description="Share name visible on the network", min_length=1) + comment: Optional[str] = Field(default=None, description="Optional description") + readonly: bool = Field(default=False, description="Share is read-only") + guest_ok: bool = Field(default=False, description="Allow guest access (no auth)") + + +class DirListInput(BaseModel): + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + path: str = Field( + ..., + description="Absolute path to list, e.g. '/mnt/NVME/SMB'", + min_length=1, + ) + limit: int = Field(default=100, description="Max entries to return", ge=1, le=1000) + + +class CronJobInput(BaseModel): + model_config = ConfigDict(extra="forbid") + + +# ========================================================================= +# TOOLS — System +# ========================================================================= + + +@mcp.tool( + name="truenas_system_info", + annotations={ + "title": "System Information", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_system_info(params: EmptyInput) -> str: + """Get TrueNAS system information including version, CPU, memory, uptime, and hostname.""" + try: + info = await _get("system/info") + mem_gb = info.get("physmem", 0) / (1024**3) + return json.dumps( + { + "version": info.get("version"), + "hostname": info.get("hostname"), + "cpu_model": info.get("model"), + "cores": info.get("cores"), + "physical_cores": info.get("physical_cores"), + "memory_gb": round(mem_gb, 1), + "uptime": info.get("uptime"), + "timezone": info.get("timezone"), + "system_manufacturer": info.get("system_manufacturer"), + "system_product": info.get("system_product"), + "ecc_memory": info.get("ecc_memory"), + "loadavg_1m": round(info.get("loadavg", [0])[0], 2), + "loadavg_5m": round(info.get("loadavg", [0, 0])[1], 2), + "loadavg_15m": round(info.get("loadavg", [0, 0, 0])[2], 2), + }, + indent=2, + ) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Alerts +# ========================================================================= + + +@mcp.tool( + name="truenas_list_alerts", + annotations={ + "title": "List Active Alerts", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_alerts(params: EmptyInput) -> str: + """List all active system alerts with severity levels and messages.""" + try: + alerts = await _get("alert/list") + if not alerts: + return json.dumps({"count": 0, "alerts": [], "message": "No active alerts."}) + result = [] + for a in alerts: + result.append( + { + "id": a.get("id"), + "level": a.get("level"), + "source": a.get("source"), + "klass": a.get("klass"), + "message": a.get("formatted"), + "dismissed": a.get("dismissed", False), + } + ) + return json.dumps({"count": len(result), "alerts": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Storage Pools +# ========================================================================= + + +@mcp.tool( + name="truenas_list_pools", + annotations={ + "title": "List Storage Pools", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_pools(params: EmptyInput) -> str: + """List all ZFS storage pools with status, health, and capacity info.""" + try: + pools = await _get("pool") + result = [] + for p in pools: + topology = p.get("topology", {}) + data_vdevs = topology.get("data", []) + result.append( + { + "id": p.get("id"), + "name": p.get("name"), + "status": p.get("status"), + "healthy": p.get("healthy"), + "is_decrypted": p.get("is_decrypted"), + "path": p.get("path"), + "data_vdevs": len(data_vdevs), + "scan": p.get("scan"), + } + ) + return json.dumps({"count": len(result), "pools": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_get_pool", + annotations={ + "title": "Get Pool Details", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_get_pool(params: IdInput) -> str: + """Get detailed information about a specific storage pool by ID, including topology and disk layout.""" + try: + p = await _get(f"pool/id/{params.id}") + return json.dumps(p, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Datasets +# ========================================================================= + + +@mcp.tool( + name="truenas_list_datasets", + annotations={ + "title": "List Datasets", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_datasets(params: ListDatasetsInput) -> str: + """List ZFS datasets with usage, compression, and quota information. Optionally filter by pool name.""" + try: + datasets = await _get("pool/dataset") + if params.pool: + datasets = [d for d in datasets if d.get("pool") == params.pool or d.get("name", "").startswith(params.pool + "/") or d.get("name") == params.pool] + + total = len(datasets) + page = datasets[params.offset : params.offset + params.limit] + result = [] + for d in page: + used_raw = d.get("used", {}).get("rawvalue", 0) + avail_raw = d.get("available", {}).get("rawvalue", 0) + result.append( + { + "id": d.get("id"), + "name": d.get("name"), + "pool": d.get("pool"), + "type": d.get("type"), + "used": _bytes_human(int(used_raw)), + "available": _bytes_human(int(avail_raw)), + "compression": d.get("compression", {}).get("value"), + "readonly": d.get("readonly", {}).get("value"), + "mountpoint": d.get("mountpoint"), + "quota": d.get("quota", {}).get("rawvalue"), + } + ) + return json.dumps( + { + "total": total, + "count": len(result), + "offset": params.offset, + "has_more": total > params.offset + len(result), + "datasets": result, + }, + indent=2, + ) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_get_dataset", + annotations={ + "title": "Get Dataset Details", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_get_dataset(params: DatasetIdInput) -> str: + """Get detailed info about a specific dataset by its full path (e.g. 'NVME/SMB').""" + try: + encoded = urllib.parse.quote(params.dataset_id, safe="") + d = await _get(f"pool/dataset/id/{encoded}") + return json.dumps(d, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_create_dataset", + annotations={ + "title": "Create Dataset", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": False, + "openWorldHint": False, + }, +) +async def truenas_create_dataset(params: CreateDatasetInput) -> str: + """Create a new ZFS dataset. Provide the full name including pool (e.g. 'NVME/my_dataset').""" + try: + payload: Dict[str, Any] = {"name": params.name} + if params.comments: + payload["comments"] = params.comments + if params.compression: + payload["compression"] = params.compression.upper() + if params.quota is not None: + payload["quota"] = params.quota + if params.readonly is not None: + payload["readonly"] = "ON" if params.readonly else "OFF" + + result = await _post("pool/dataset", payload) + return json.dumps( + {"status": "created", "dataset": result.get("name"), "id": result.get("id")}, + indent=2, + ) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Snapshots +# ========================================================================= + + +@mcp.tool( + name="truenas_create_snapshot", + annotations={ + "title": "Create ZFS Snapshot", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": False, + "openWorldHint": False, + }, +) +async def truenas_create_snapshot(params: CreateSnapshotInput) -> str: + """Create a manual ZFS snapshot of a dataset.""" + try: + payload = { + "dataset": params.dataset, + "name": params.name, + "recursive": params.recursive, + } + result = await _post("zfs/snapshot", payload) + return json.dumps({"status": "created", "snapshot": result}, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_list_snapshot_tasks", + annotations={ + "title": "List Snapshot Tasks", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_snapshot_tasks(params: ListSnapshotTasksInput) -> str: + """List all configured periodic snapshot tasks.""" + try: + tasks = await _get("pool/snapshottask") + result = [] + for t in tasks: + result.append( + { + "id": t.get("id"), + "dataset": t.get("dataset"), + "recursive": t.get("recursive"), + "lifetime_value": t.get("lifetime_value"), + "lifetime_unit": t.get("lifetime_unit"), + "enabled": t.get("enabled"), + "naming_schema": t.get("naming_schema"), + "schedule": t.get("schedule"), + } + ) + return json.dumps({"count": len(result), "tasks": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Disks +# ========================================================================= + + +@mcp.tool( + name="truenas_list_disks", + annotations={ + "title": "List Physical Disks", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_disks(params: EmptyInput) -> str: + """List all physical disks with model, size, serial, temperature, and type info.""" + try: + disks = await _get("disk") + result = [] + for d in disks: + size_bytes = d.get("size", 0) or 0 + result.append( + { + "name": d.get("name"), + "model": d.get("model"), + "serial": d.get("serial"), + "size": _bytes_human(size_bytes), + "size_bytes": size_bytes, + "type": d.get("type"), + "rotationrate": d.get("rotationrate"), + "hddstandby": d.get("hddstandby"), + "pool": d.get("pool"), + } + ) + return json.dumps({"count": len(result), "disks": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Services +# ========================================================================= + + +@mcp.tool( + name="truenas_list_services", + annotations={ + "title": "List Services", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_services(params: EmptyInput) -> str: + """List all system services with their running state and auto-start configuration.""" + try: + services = await _get("service") + result = [] + for s in services: + result.append( + { + "id": s.get("id"), + "service": s.get("service"), + "state": s.get("state"), + "enable": s.get("enable"), + } + ) + return json.dumps({"count": len(result), "services": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_start_service", + annotations={ + "title": "Start Service", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_start_service(params: ServiceActionInput) -> str: + """Start a system service by name (e.g. 'cifs', 'nfs', 'ssh').""" + try: + result = await _post(f"service/start", {"service": params.service}) + return json.dumps({"status": "started", "service": params.service, "result": result}) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_stop_service", + annotations={ + "title": "Stop Service", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_stop_service(params: ServiceActionInput) -> str: + """Stop a system service by name (e.g. 'cifs', 'nfs', 'ssh').""" + try: + result = await _post(f"service/stop", {"service": params.service}) + return json.dumps({"status": "stopped", "service": params.service, "result": result}) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_restart_service", + annotations={ + "title": "Restart Service", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": False, + "openWorldHint": False, + }, +) +async def truenas_restart_service(params: ServiceActionInput) -> str: + """Restart a system service by name.""" + try: + result = await _post(f"service/restart", {"service": params.service}) + return json.dumps({"status": "restarted", "service": params.service, "result": result}) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — SMB Shares +# ========================================================================= + + +@mcp.tool( + name="truenas_list_smb_shares", + annotations={ + "title": "List SMB Shares", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_smb_shares(params: EmptyInput) -> str: + """List all SMB/CIFS file shares with their paths and configuration.""" + try: + shares = await _get("sharing/smb") + result = [] + for s in shares: + result.append( + { + "id": s.get("id"), + "name": s.get("name"), + "path": s.get("path"), + "comment": s.get("comment"), + "enabled": s.get("enabled"), + "readonly": s.get("ro"), + "guest_ok": s.get("guestok"), + "browsable": s.get("browsable"), + } + ) + return json.dumps({"count": len(result), "shares": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_create_smb_share", + annotations={ + "title": "Create SMB Share", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": False, + "openWorldHint": False, + }, +) +async def truenas_create_smb_share(params: CreateSMBShareInput) -> str: + """Create a new SMB/CIFS file share.""" + try: + payload: Dict[str, Any] = { + "path": params.path, + "name": params.name, + "ro": params.readonly, + "guestok": params.guest_ok, + } + if params.comment: + payload["comment"] = params.comment + result = await _post("sharing/smb", payload) + return json.dumps( + {"status": "created", "share": result.get("name"), "id": result.get("id")}, + indent=2, + ) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — NFS Shares +# ========================================================================= + + +@mcp.tool( + name="truenas_list_nfs_shares", + annotations={ + "title": "List NFS Shares", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_nfs_shares(params: EmptyInput) -> str: + """List all NFS exports with their paths and access configuration.""" + try: + shares = await _get("sharing/nfs") + result = [] + for s in shares: + result.append( + { + "id": s.get("id"), + "path": s.get("path"), + "comment": s.get("comment"), + "enabled": s.get("enabled"), + "readonly": s.get("ro"), + "networks": s.get("networks"), + "hosts": s.get("hosts"), + } + ) + return json.dumps({"count": len(result), "shares": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Apps +# ========================================================================= + + +@mcp.tool( + name="truenas_list_apps", + annotations={ + "title": "List Installed Apps", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_apps(params: EmptyInput) -> str: + """List all installed TrueNAS apps (Docker-based) with their running state.""" + try: + apps = await _get("app") + result = [] + for a in apps: + result.append( + { + "name": a.get("name"), + "state": a.get("state"), + "version": a.get("version"), + "human_version": a.get("human_version"), + "update_available": a.get("update_available", False), + "portal": a.get("portals"), + } + ) + return json.dumps({"count": len(result), "apps": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_start_app", + annotations={ + "title": "Start App", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_start_app(params: AppActionInput) -> str: + """Start a TrueNAS app by name.""" + try: + result = await _post(f"app/id/{params.app_name}/start") + return json.dumps({"status": "starting", "app": params.app_name, "job_id": result}) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_stop_app", + annotations={ + "title": "Stop App", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_stop_app(params: AppActionInput) -> str: + """Stop a TrueNAS app by name.""" + try: + result = await _post(f"app/id/{params.app_name}/stop") + return json.dumps({"status": "stopping", "app": params.app_name, "job_id": result}) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — VMs +# ========================================================================= + + +@mcp.tool( + name="truenas_list_vms", + annotations={ + "title": "List Virtual Machines", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_vms(params: EmptyInput) -> str: + """List all virtual machines with status, CPU, and memory configuration.""" + try: + vms = await _get("vm") + result = [] + for v in vms: + result.append( + { + "id": v.get("id"), + "name": v.get("name"), + "status": v.get("status", {}).get("state"), + "vcpus": v.get("vcpus"), + "memory_mb": v.get("memory"), + "autostart": v.get("autostart"), + "description": v.get("description"), + } + ) + return json.dumps({"count": len(result), "vms": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_start_vm", + annotations={ + "title": "Start VM", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_start_vm(params: VMActionInput) -> str: + """Start a virtual machine by its numeric ID.""" + try: + result = await _post(f"vm/id/{params.vm_id}/start") + return json.dumps({"status": "starting", "vm_id": params.vm_id, "result": result}) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_stop_vm", + annotations={ + "title": "Stop VM", + "readOnlyHint": False, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_stop_vm(params: VMActionInput) -> str: + """Stop a virtual machine by its numeric ID.""" + try: + result = await _post(f"vm/id/{params.vm_id}/stop") + return json.dumps({"status": "stopping", "vm_id": params.vm_id, "result": result}) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Network +# ========================================================================= + + +@mcp.tool( + name="truenas_network_summary", + annotations={ + "title": "Network Summary", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_network_summary(params: EmptyInput) -> str: + """Get network configuration summary including IPs, gateways, DNS, and interfaces.""" + try: + summary = await _get("network/general/summary") + return json.dumps(summary, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_list_interfaces", + annotations={ + "title": "List Network Interfaces", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_interfaces(params: EmptyInput) -> str: + """List all network interfaces with their addresses and link state.""" + try: + ifaces = await _get("interface") + result = [] + for i in ifaces: + aliases = [] + for a in i.get("aliases", []): + aliases.append(f"{a.get('address')}/{a.get('netmask')}") + result.append( + { + "id": i.get("id"), + "name": i.get("name"), + "type": i.get("type"), + "state": i.get("state"), + "aliases": aliases, + "mtu": i.get("mtu"), + } + ) + return json.dumps({"count": len(result), "interfaces": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Users & Groups +# ========================================================================= + + +@mcp.tool( + name="truenas_list_users", + annotations={ + "title": "List Users", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_users(params: EmptyInput) -> str: + """List all system users.""" + try: + users = await _get("user") + result = [] + for u in users: + result.append( + { + "id": u.get("id"), + "username": u.get("username"), + "full_name": u.get("full_name"), + "uid": u.get("uid"), + "builtin": u.get("builtin"), + "locked": u.get("locked"), + "shell": u.get("shell"), + "home": u.get("home"), + "group": u.get("group", {}).get("bsdgrp_group") if isinstance(u.get("group"), dict) else u.get("group"), + } + ) + return json.dumps({"count": len(result), "users": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_list_groups", + annotations={ + "title": "List Groups", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_groups(params: EmptyInput) -> str: + """List all system groups.""" + try: + groups = await _get("group") + result = [] + for g in groups: + result.append( + { + "id": g.get("id"), + "group": g.get("group"), + "gid": g.get("gid"), + "builtin": g.get("builtin"), + } + ) + return json.dumps({"count": len(result), "groups": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Filesystem +# ========================================================================= + + +@mcp.tool( + name="truenas_list_directory", + annotations={ + "title": "List Directory Contents", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_directory(params: DirListInput) -> str: + """List files and directories at a given path on the TrueNAS filesystem.""" + try: + payload = {"path": params.path, "limit": params.limit} + entries = await _post("filesystem/listdir", payload) + result = [] + for e in entries[:params.limit]: + result.append( + { + "name": e.get("name"), + "path": e.get("path"), + "type": e.get("type"), + "size": e.get("size"), + "uid": e.get("uid"), + "gid": e.get("gid"), + "mode": e.get("mode"), + } + ) + return json.dumps({"path": params.path, "count": len(result), "entries": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Cron Jobs +# ========================================================================= + + +@mcp.tool( + name="truenas_list_cronjobs", + annotations={ + "title": "List Cron Jobs", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_cronjobs(params: CronJobInput) -> str: + """List all configured cron jobs.""" + try: + jobs = await _get("cronjob") + result = [] + for j in jobs: + result.append( + { + "id": j.get("id"), + "description": j.get("description"), + "command": j.get("command"), + "user": j.get("user"), + "schedule": j.get("schedule"), + "enabled": j.get("enabled"), + } + ) + return json.dumps({"count": len(result), "cronjobs": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Replication +# ========================================================================= + + +@mcp.tool( + name="truenas_list_replications", + annotations={ + "title": "List Replication Tasks", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_replications(params: EmptyInput) -> str: + """List all configured replication tasks with source, target, and schedule.""" + try: + tasks = await _get("replication") + result = [] + for t in tasks: + result.append( + { + "id": t.get("id"), + "name": t.get("name"), + "direction": t.get("direction"), + "transport": t.get("transport"), + "source_datasets": t.get("source_datasets"), + "target_dataset": t.get("target_dataset"), + "enabled": t.get("enabled"), + "state": t.get("state"), + "schedule": t.get("schedule"), + } + ) + return json.dumps({"count": len(result), "replications": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Cloud Sync +# ========================================================================= + + +@mcp.tool( + name="truenas_list_cloudsync", + annotations={ + "title": "List Cloud Sync Tasks", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_cloudsync(params: EmptyInput) -> str: + """List all cloud sync tasks with their configuration and status.""" + try: + tasks = await _get("cloudsync") + result = [] + for t in tasks: + result.append( + { + "id": t.get("id"), + "description": t.get("description"), + "direction": t.get("direction"), + "path": t.get("path"), + "enabled": t.get("enabled"), + "schedule": t.get("schedule"), + "job": t.get("job"), + } + ) + return json.dumps({"count": len(result), "tasks": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Boot +# ========================================================================= + + +@mcp.tool( + name="truenas_boot_state", + annotations={ + "title": "Boot Pool State", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_boot_state(params: EmptyInput) -> str: + """Get the state of the boot pool (health, devices, capacity).""" + try: + state = await _get("boot/get_state") + return json.dumps(state, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — iSCSI +# ========================================================================= + + +@mcp.tool( + name="truenas_list_iscsi_targets", + annotations={ + "title": "List iSCSI Targets", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_iscsi_targets(params: EmptyInput) -> str: + """List all configured iSCSI targets.""" + try: + targets = await _get("iscsi/target") + return json.dumps({"count": len(targets), "targets": targets}, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +@mcp.tool( + name="truenas_list_iscsi_extents", + annotations={ + "title": "List iSCSI Extents", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_iscsi_extents(params: EmptyInput) -> str: + """List all configured iSCSI extents (block storage backing).""" + try: + extents = await _get("iscsi/extent") + return json.dumps({"count": len(extents), "extents": extents}, indent=2, default=str) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# TOOLS — Certificates +# ========================================================================= + + +@mcp.tool( + name="truenas_list_certificates", + annotations={ + "title": "List Certificates", + "readOnlyHint": True, + "destructiveHint": False, + "idempotentHint": True, + "openWorldHint": False, + }, +) +async def truenas_list_certificates(params: EmptyInput) -> str: + """List all SSL/TLS certificates configured on the system.""" + try: + certs = await _get("certificate") + result = [] + for c in certs: + result.append( + { + "id": c.get("id"), + "name": c.get("name"), + "issuer": c.get("issuer"), + "common": c.get("common"), + "from": c.get("from"), + "until": c.get("until"), + "type": c.get("cert_type"), + } + ) + return json.dumps({"count": len(result), "certificates": result}, indent=2) + except Exception as e: + return _handle_api_error(e) + + +# ========================================================================= +# Entry point +# ========================================================================= + +if __name__ == "__main__": + mcp.run()