mcp-servers/truenas-mcp/truenas_mcp.py

1321 lines
42 KiB
Python
Raw Normal View History

2026-03-31 15:33:45 -04:00
"""
TrueNAS MCP Server
==================
MCP server for managing TrueNAS SCALE systems via the REST API v2.0.
Provides tools for monitoring system health, storage pools, datasets,
services, shares, apps, VMs, disks, and alerts.
"""
import json
import os
import urllib.parse
from typing import Optional, List, Any, Dict
import httpx
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field, ConfigDict, field_validator
from enum import Enum
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
TRUENAS_URL = os.environ.get("TRUENAS_URL", "")
TRUENAS_API_KEY = os.environ.get("TRUENAS_API_KEY", "")
BASE = f"{TRUENAS_URL}/api/v2.0"
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP("truenas_mcp")
# ---------------------------------------------------------------------------
# Shared HTTP helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"Authorization": f"Bearer {TRUENAS_API_KEY}",
"Content-Type": "application/json",
}
async def _get(path: str, params: Optional[dict] = None) -> Any:
"""HTTP GET against the TrueNAS REST API."""
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.get(f"{BASE}/{path}", headers=_headers(), params=params)
resp.raise_for_status()
return resp.json()
async def _post(path: str, payload: Any = None) -> Any:
"""HTTP POST against the TrueNAS REST API."""
async with httpx.AsyncClient(verify=False, timeout=60) as client:
resp = await client.post(
f"{BASE}/{path}", headers=_headers(), json=payload
)
resp.raise_for_status()
return resp.json()
async def _put(path: str, payload: Any = None) -> Any:
"""HTTP PUT against the TrueNAS REST API."""
async with httpx.AsyncClient(verify=False, timeout=60) as client:
resp = await client.put(
f"{BASE}/{path}", headers=_headers(), json=payload
)
resp.raise_for_status()
return resp.json()
async def _delete(path: str) -> Any:
"""HTTP DELETE against the TrueNAS REST API."""
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.delete(f"{BASE}/{path}", headers=_headers())
resp.raise_for_status()
try:
return resp.json()
except Exception:
return {"status": "deleted"}
def _handle_api_error(e: Exception) -> str:
if isinstance(e, httpx.HTTPStatusError):
status = e.response.status_code
try:
body = e.response.json()
msg = body.get("message", str(body))
except Exception:
msg = e.response.text[:500]
if status == 404:
return f"Error 404: Resource not found. {msg}"
elif status == 403:
return f"Error 403: Permission denied. {msg}"
elif status == 405:
return f"Error 405: Method not allowed for this endpoint. {msg}"
elif status == 422:
return f"Error 422: Validation error. {msg}"
elif status == 429:
return "Error 429: Rate limit exceeded. Wait and retry."
return f"Error {status}: {msg}"
elif isinstance(e, httpx.TimeoutException):
return "Error: Request timed out. The server may be busy."
return f"Error: {type(e).__name__}: {e}"
def _bytes_human(n: int) -> str:
"""Convert bytes to human-readable string."""
for unit in ("B", "KiB", "MiB", "GiB", "TiB", "PiB"):
if abs(n) < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} EiB"
# ---------------------------------------------------------------------------
# Input models
# ---------------------------------------------------------------------------
class EmptyInput(BaseModel):
model_config = ConfigDict(extra="forbid")
class IdInput(BaseModel):
model_config = ConfigDict(extra="forbid")
id: int = Field(..., description="Numeric ID of the resource")
class NameInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
name: str = Field(..., description="Name of the resource", min_length=1)
class DatasetIdInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
dataset_id: str = Field(
...,
description="Full dataset path, e.g. 'NVME/SMB' or 'Storage'. URL-encoded automatically.",
min_length=1,
)
class ListDatasetsInput(BaseModel):
model_config = ConfigDict(extra="forbid")
pool: Optional[str] = Field(
default=None,
description="Optional pool name to filter datasets (e.g. 'NVME', 'Storage'). If omitted, returns all datasets.",
)
limit: int = Field(default=50, description="Max datasets to return", ge=1, le=500)
offset: int = Field(default=0, description="Offset for pagination", ge=0)
class ServiceActionInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
service: str = Field(
...,
description="Service name, e.g. 'cifs', 'nfs', 'ssh', 'ftp', 'iscsitarget', 'snmp'",
min_length=1,
)
class CreateDatasetInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
name: str = Field(
...,
description="Full dataset name including pool, e.g. 'NVME/my_new_dataset'",
min_length=1,
)
comments: Optional[str] = Field(default=None, description="Optional description/comments")
compression: Optional[str] = Field(
default=None,
description="Compression algorithm: 'LZ4', 'GZIP', 'ZLE', 'LZJB', 'ZSTD', or 'OFF'",
)
quota: Optional[int] = Field(
default=None,
description="Quota in bytes. 0 means no quota.",
ge=0,
)
readonly: Optional[bool] = Field(default=None, description="Set dataset read-only")
class CreateSnapshotInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
dataset: str = Field(
...,
description="Dataset name to snapshot, e.g. 'NVME/SMB'",
min_length=1,
)
name: str = Field(
...,
description="Snapshot name suffix, e.g. 'manual-backup-2024'",
min_length=1,
)
recursive: bool = Field(default=False, description="Include child datasets")
class ListSnapshotTasksInput(BaseModel):
model_config = ConfigDict(extra="forbid")
class AppActionInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
app_name: str = Field(
...,
description="Name of the app, e.g. 'plex', 'nextcloud', 'sonarr'",
min_length=1,
)
class VMActionInput(BaseModel):
model_config = ConfigDict(extra="forbid")
vm_id: int = Field(..., description="Numeric VM ID")
class CreateSMBShareInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
path: str = Field(..., description="Filesystem path to share, e.g. '/mnt/NVME/myshare'", min_length=1)
name: str = Field(..., description="Share name visible on the network", min_length=1)
comment: Optional[str] = Field(default=None, description="Optional description")
readonly: bool = Field(default=False, description="Share is read-only")
guest_ok: bool = Field(default=False, description="Allow guest access (no auth)")
class DirListInput(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
path: str = Field(
...,
description="Absolute path to list, e.g. '/mnt/NVME/SMB'",
min_length=1,
)
limit: int = Field(default=100, description="Max entries to return", ge=1, le=1000)
class CronJobInput(BaseModel):
model_config = ConfigDict(extra="forbid")
# =========================================================================
# TOOLS — System
# =========================================================================
@mcp.tool(
name="truenas_system_info",
annotations={
"title": "System Information",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_system_info(params: EmptyInput) -> str:
"""Get TrueNAS system information including version, CPU, memory, uptime, and hostname."""
try:
info = await _get("system/info")
mem_gb = info.get("physmem", 0) / (1024**3)
return json.dumps(
{
"version": info.get("version"),
"hostname": info.get("hostname"),
"cpu_model": info.get("model"),
"cores": info.get("cores"),
"physical_cores": info.get("physical_cores"),
"memory_gb": round(mem_gb, 1),
"uptime": info.get("uptime"),
"timezone": info.get("timezone"),
"system_manufacturer": info.get("system_manufacturer"),
"system_product": info.get("system_product"),
"ecc_memory": info.get("ecc_memory"),
"loadavg_1m": round(info.get("loadavg", [0])[0], 2),
"loadavg_5m": round(info.get("loadavg", [0, 0])[1], 2),
"loadavg_15m": round(info.get("loadavg", [0, 0, 0])[2], 2),
},
indent=2,
)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Alerts
# =========================================================================
@mcp.tool(
name="truenas_list_alerts",
annotations={
"title": "List Active Alerts",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_alerts(params: EmptyInput) -> str:
"""List all active system alerts with severity levels and messages."""
try:
alerts = await _get("alert/list")
if not alerts:
return json.dumps({"count": 0, "alerts": [], "message": "No active alerts."})
result = []
for a in alerts:
result.append(
{
"id": a.get("id"),
"level": a.get("level"),
"source": a.get("source"),
"klass": a.get("klass"),
"message": a.get("formatted"),
"dismissed": a.get("dismissed", False),
}
)
return json.dumps({"count": len(result), "alerts": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Storage Pools
# =========================================================================
@mcp.tool(
name="truenas_list_pools",
annotations={
"title": "List Storage Pools",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_pools(params: EmptyInput) -> str:
"""List all ZFS storage pools with status, health, and capacity info."""
try:
pools = await _get("pool")
result = []
for p in pools:
topology = p.get("topology", {})
data_vdevs = topology.get("data", [])
result.append(
{
"id": p.get("id"),
"name": p.get("name"),
"status": p.get("status"),
"healthy": p.get("healthy"),
"is_decrypted": p.get("is_decrypted"),
"path": p.get("path"),
"data_vdevs": len(data_vdevs),
"scan": p.get("scan"),
}
)
return json.dumps({"count": len(result), "pools": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_get_pool",
annotations={
"title": "Get Pool Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_get_pool(params: IdInput) -> str:
"""Get detailed information about a specific storage pool by ID, including topology and disk layout."""
try:
p = await _get(f"pool/id/{params.id}")
return json.dumps(p, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Datasets
# =========================================================================
@mcp.tool(
name="truenas_list_datasets",
annotations={
"title": "List Datasets",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_datasets(params: ListDatasetsInput) -> str:
"""List ZFS datasets with usage, compression, and quota information. Optionally filter by pool name."""
try:
datasets = await _get("pool/dataset")
if params.pool:
datasets = [d for d in datasets if d.get("pool") == params.pool or d.get("name", "").startswith(params.pool + "/") or d.get("name") == params.pool]
total = len(datasets)
page = datasets[params.offset : params.offset + params.limit]
result = []
for d in page:
used_raw = d.get("used", {}).get("rawvalue", 0)
avail_raw = d.get("available", {}).get("rawvalue", 0)
result.append(
{
"id": d.get("id"),
"name": d.get("name"),
"pool": d.get("pool"),
"type": d.get("type"),
"used": _bytes_human(int(used_raw)),
"available": _bytes_human(int(avail_raw)),
"compression": d.get("compression", {}).get("value"),
"readonly": d.get("readonly", {}).get("value"),
"mountpoint": d.get("mountpoint"),
"quota": d.get("quota", {}).get("rawvalue"),
}
)
return json.dumps(
{
"total": total,
"count": len(result),
"offset": params.offset,
"has_more": total > params.offset + len(result),
"datasets": result,
},
indent=2,
)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_get_dataset",
annotations={
"title": "Get Dataset Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_get_dataset(params: DatasetIdInput) -> str:
"""Get detailed info about a specific dataset by its full path (e.g. 'NVME/SMB')."""
try:
encoded = urllib.parse.quote(params.dataset_id, safe="")
d = await _get(f"pool/dataset/id/{encoded}")
return json.dumps(d, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_create_dataset",
annotations={
"title": "Create Dataset",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False,
},
)
async def truenas_create_dataset(params: CreateDatasetInput) -> str:
"""Create a new ZFS dataset. Provide the full name including pool (e.g. 'NVME/my_dataset')."""
try:
payload: Dict[str, Any] = {"name": params.name}
if params.comments:
payload["comments"] = params.comments
if params.compression:
payload["compression"] = params.compression.upper()
if params.quota is not None:
payload["quota"] = params.quota
if params.readonly is not None:
payload["readonly"] = "ON" if params.readonly else "OFF"
result = await _post("pool/dataset", payload)
return json.dumps(
{"status": "created", "dataset": result.get("name"), "id": result.get("id")},
indent=2,
)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Snapshots
# =========================================================================
@mcp.tool(
name="truenas_create_snapshot",
annotations={
"title": "Create ZFS Snapshot",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False,
},
)
async def truenas_create_snapshot(params: CreateSnapshotInput) -> str:
"""Create a manual ZFS snapshot of a dataset."""
try:
payload = {
"dataset": params.dataset,
"name": params.name,
"recursive": params.recursive,
}
result = await _post("zfs/snapshot", payload)
return json.dumps({"status": "created", "snapshot": result}, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_list_snapshot_tasks",
annotations={
"title": "List Snapshot Tasks",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_snapshot_tasks(params: ListSnapshotTasksInput) -> str:
"""List all configured periodic snapshot tasks."""
try:
tasks = await _get("pool/snapshottask")
result = []
for t in tasks:
result.append(
{
"id": t.get("id"),
"dataset": t.get("dataset"),
"recursive": t.get("recursive"),
"lifetime_value": t.get("lifetime_value"),
"lifetime_unit": t.get("lifetime_unit"),
"enabled": t.get("enabled"),
"naming_schema": t.get("naming_schema"),
"schedule": t.get("schedule"),
}
)
return json.dumps({"count": len(result), "tasks": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Disks
# =========================================================================
@mcp.tool(
name="truenas_list_disks",
annotations={
"title": "List Physical Disks",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_disks(params: EmptyInput) -> str:
"""List all physical disks with model, size, serial, temperature, and type info."""
try:
disks = await _get("disk")
result = []
for d in disks:
size_bytes = d.get("size", 0) or 0
result.append(
{
"name": d.get("name"),
"model": d.get("model"),
"serial": d.get("serial"),
"size": _bytes_human(size_bytes),
"size_bytes": size_bytes,
"type": d.get("type"),
"rotationrate": d.get("rotationrate"),
"hddstandby": d.get("hddstandby"),
"pool": d.get("pool"),
}
)
return json.dumps({"count": len(result), "disks": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Services
# =========================================================================
@mcp.tool(
name="truenas_list_services",
annotations={
"title": "List Services",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_services(params: EmptyInput) -> str:
"""List all system services with their running state and auto-start configuration."""
try:
services = await _get("service")
result = []
for s in services:
result.append(
{
"id": s.get("id"),
"service": s.get("service"),
"state": s.get("state"),
"enable": s.get("enable"),
}
)
return json.dumps({"count": len(result), "services": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_start_service",
annotations={
"title": "Start Service",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_start_service(params: ServiceActionInput) -> str:
"""Start a system service by name (e.g. 'cifs', 'nfs', 'ssh')."""
try:
result = await _post(f"service/start", {"service": params.service})
return json.dumps({"status": "started", "service": params.service, "result": result})
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_stop_service",
annotations={
"title": "Stop Service",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_stop_service(params: ServiceActionInput) -> str:
"""Stop a system service by name (e.g. 'cifs', 'nfs', 'ssh')."""
try:
result = await _post(f"service/stop", {"service": params.service})
return json.dumps({"status": "stopped", "service": params.service, "result": result})
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_restart_service",
annotations={
"title": "Restart Service",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False,
},
)
async def truenas_restart_service(params: ServiceActionInput) -> str:
"""Restart a system service by name."""
try:
result = await _post(f"service/restart", {"service": params.service})
return json.dumps({"status": "restarted", "service": params.service, "result": result})
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — SMB Shares
# =========================================================================
@mcp.tool(
name="truenas_list_smb_shares",
annotations={
"title": "List SMB Shares",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_smb_shares(params: EmptyInput) -> str:
"""List all SMB/CIFS file shares with their paths and configuration."""
try:
shares = await _get("sharing/smb")
result = []
for s in shares:
result.append(
{
"id": s.get("id"),
"name": s.get("name"),
"path": s.get("path"),
"comment": s.get("comment"),
"enabled": s.get("enabled"),
"readonly": s.get("ro"),
"guest_ok": s.get("guestok"),
"browsable": s.get("browsable"),
}
)
return json.dumps({"count": len(result), "shares": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_create_smb_share",
annotations={
"title": "Create SMB Share",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False,
},
)
async def truenas_create_smb_share(params: CreateSMBShareInput) -> str:
"""Create a new SMB/CIFS file share."""
try:
payload: Dict[str, Any] = {
"path": params.path,
"name": params.name,
"ro": params.readonly,
"guestok": params.guest_ok,
}
if params.comment:
payload["comment"] = params.comment
result = await _post("sharing/smb", payload)
return json.dumps(
{"status": "created", "share": result.get("name"), "id": result.get("id")},
indent=2,
)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — NFS Shares
# =========================================================================
@mcp.tool(
name="truenas_list_nfs_shares",
annotations={
"title": "List NFS Shares",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_nfs_shares(params: EmptyInput) -> str:
"""List all NFS exports with their paths and access configuration."""
try:
shares = await _get("sharing/nfs")
result = []
for s in shares:
result.append(
{
"id": s.get("id"),
"path": s.get("path"),
"comment": s.get("comment"),
"enabled": s.get("enabled"),
"readonly": s.get("ro"),
"networks": s.get("networks"),
"hosts": s.get("hosts"),
}
)
return json.dumps({"count": len(result), "shares": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Apps
# =========================================================================
@mcp.tool(
name="truenas_list_apps",
annotations={
"title": "List Installed Apps",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_apps(params: EmptyInput) -> str:
"""List all installed TrueNAS apps (Docker-based) with their running state."""
try:
apps = await _get("app")
result = []
for a in apps:
result.append(
{
"name": a.get("name"),
"state": a.get("state"),
"version": a.get("version"),
"human_version": a.get("human_version"),
"update_available": a.get("update_available", False),
"portal": a.get("portals"),
}
)
return json.dumps({"count": len(result), "apps": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_start_app",
annotations={
"title": "Start App",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_start_app(params: AppActionInput) -> str:
"""Start a TrueNAS app by name."""
try:
result = await _post(f"app/id/{params.app_name}/start")
return json.dumps({"status": "starting", "app": params.app_name, "job_id": result})
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_stop_app",
annotations={
"title": "Stop App",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_stop_app(params: AppActionInput) -> str:
"""Stop a TrueNAS app by name."""
try:
result = await _post(f"app/id/{params.app_name}/stop")
return json.dumps({"status": "stopping", "app": params.app_name, "job_id": result})
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — VMs
# =========================================================================
@mcp.tool(
name="truenas_list_vms",
annotations={
"title": "List Virtual Machines",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_vms(params: EmptyInput) -> str:
"""List all virtual machines with status, CPU, and memory configuration."""
try:
vms = await _get("vm")
result = []
for v in vms:
result.append(
{
"id": v.get("id"),
"name": v.get("name"),
"status": v.get("status", {}).get("state"),
"vcpus": v.get("vcpus"),
"memory_mb": v.get("memory"),
"autostart": v.get("autostart"),
"description": v.get("description"),
}
)
return json.dumps({"count": len(result), "vms": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_start_vm",
annotations={
"title": "Start VM",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_start_vm(params: VMActionInput) -> str:
"""Start a virtual machine by its numeric ID."""
try:
result = await _post(f"vm/id/{params.vm_id}/start")
return json.dumps({"status": "starting", "vm_id": params.vm_id, "result": result})
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_stop_vm",
annotations={
"title": "Stop VM",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_stop_vm(params: VMActionInput) -> str:
"""Stop a virtual machine by its numeric ID."""
try:
result = await _post(f"vm/id/{params.vm_id}/stop")
return json.dumps({"status": "stopping", "vm_id": params.vm_id, "result": result})
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Network
# =========================================================================
@mcp.tool(
name="truenas_network_summary",
annotations={
"title": "Network Summary",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_network_summary(params: EmptyInput) -> str:
"""Get network configuration summary including IPs, gateways, DNS, and interfaces."""
try:
summary = await _get("network/general/summary")
return json.dumps(summary, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_list_interfaces",
annotations={
"title": "List Network Interfaces",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_interfaces(params: EmptyInput) -> str:
"""List all network interfaces with their addresses and link state."""
try:
ifaces = await _get("interface")
result = []
for i in ifaces:
aliases = []
for a in i.get("aliases", []):
aliases.append(f"{a.get('address')}/{a.get('netmask')}")
result.append(
{
"id": i.get("id"),
"name": i.get("name"),
"type": i.get("type"),
"state": i.get("state"),
"aliases": aliases,
"mtu": i.get("mtu"),
}
)
return json.dumps({"count": len(result), "interfaces": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Users & Groups
# =========================================================================
@mcp.tool(
name="truenas_list_users",
annotations={
"title": "List Users",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_users(params: EmptyInput) -> str:
"""List all system users."""
try:
users = await _get("user")
result = []
for u in users:
result.append(
{
"id": u.get("id"),
"username": u.get("username"),
"full_name": u.get("full_name"),
"uid": u.get("uid"),
"builtin": u.get("builtin"),
"locked": u.get("locked"),
"shell": u.get("shell"),
"home": u.get("home"),
"group": u.get("group", {}).get("bsdgrp_group") if isinstance(u.get("group"), dict) else u.get("group"),
}
)
return json.dumps({"count": len(result), "users": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_list_groups",
annotations={
"title": "List Groups",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_groups(params: EmptyInput) -> str:
"""List all system groups."""
try:
groups = await _get("group")
result = []
for g in groups:
result.append(
{
"id": g.get("id"),
"group": g.get("group"),
"gid": g.get("gid"),
"builtin": g.get("builtin"),
}
)
return json.dumps({"count": len(result), "groups": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Filesystem
# =========================================================================
@mcp.tool(
name="truenas_list_directory",
annotations={
"title": "List Directory Contents",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_directory(params: DirListInput) -> str:
"""List files and directories at a given path on the TrueNAS filesystem."""
try:
payload = {"path": params.path, "limit": params.limit}
entries = await _post("filesystem/listdir", payload)
result = []
for e in entries[:params.limit]:
result.append(
{
"name": e.get("name"),
"path": e.get("path"),
"type": e.get("type"),
"size": e.get("size"),
"uid": e.get("uid"),
"gid": e.get("gid"),
"mode": e.get("mode"),
}
)
return json.dumps({"path": params.path, "count": len(result), "entries": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Cron Jobs
# =========================================================================
@mcp.tool(
name="truenas_list_cronjobs",
annotations={
"title": "List Cron Jobs",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_cronjobs(params: CronJobInput) -> str:
"""List all configured cron jobs."""
try:
jobs = await _get("cronjob")
result = []
for j in jobs:
result.append(
{
"id": j.get("id"),
"description": j.get("description"),
"command": j.get("command"),
"user": j.get("user"),
"schedule": j.get("schedule"),
"enabled": j.get("enabled"),
}
)
return json.dumps({"count": len(result), "cronjobs": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Replication
# =========================================================================
@mcp.tool(
name="truenas_list_replications",
annotations={
"title": "List Replication Tasks",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_replications(params: EmptyInput) -> str:
"""List all configured replication tasks with source, target, and schedule."""
try:
tasks = await _get("replication")
result = []
for t in tasks:
result.append(
{
"id": t.get("id"),
"name": t.get("name"),
"direction": t.get("direction"),
"transport": t.get("transport"),
"source_datasets": t.get("source_datasets"),
"target_dataset": t.get("target_dataset"),
"enabled": t.get("enabled"),
"state": t.get("state"),
"schedule": t.get("schedule"),
}
)
return json.dumps({"count": len(result), "replications": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Cloud Sync
# =========================================================================
@mcp.tool(
name="truenas_list_cloudsync",
annotations={
"title": "List Cloud Sync Tasks",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_cloudsync(params: EmptyInput) -> str:
"""List all cloud sync tasks with their configuration and status."""
try:
tasks = await _get("cloudsync")
result = []
for t in tasks:
result.append(
{
"id": t.get("id"),
"description": t.get("description"),
"direction": t.get("direction"),
"path": t.get("path"),
"enabled": t.get("enabled"),
"schedule": t.get("schedule"),
"job": t.get("job"),
}
)
return json.dumps({"count": len(result), "tasks": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Boot
# =========================================================================
@mcp.tool(
name="truenas_boot_state",
annotations={
"title": "Boot Pool State",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_boot_state(params: EmptyInput) -> str:
"""Get the state of the boot pool (health, devices, capacity)."""
try:
state = await _get("boot/get_state")
return json.dumps(state, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — iSCSI
# =========================================================================
@mcp.tool(
name="truenas_list_iscsi_targets",
annotations={
"title": "List iSCSI Targets",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_iscsi_targets(params: EmptyInput) -> str:
"""List all configured iSCSI targets."""
try:
targets = await _get("iscsi/target")
return json.dumps({"count": len(targets), "targets": targets}, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="truenas_list_iscsi_extents",
annotations={
"title": "List iSCSI Extents",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_iscsi_extents(params: EmptyInput) -> str:
"""List all configured iSCSI extents (block storage backing)."""
try:
extents = await _get("iscsi/extent")
return json.dumps({"count": len(extents), "extents": extents}, indent=2, default=str)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# TOOLS — Certificates
# =========================================================================
@mcp.tool(
name="truenas_list_certificates",
annotations={
"title": "List Certificates",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def truenas_list_certificates(params: EmptyInput) -> str:
"""List all SSL/TLS certificates configured on the system."""
try:
certs = await _get("certificate")
result = []
for c in certs:
result.append(
{
"id": c.get("id"),
"name": c.get("name"),
"issuer": c.get("issuer"),
"common": c.get("common"),
"from": c.get("from"),
"until": c.get("until"),
"type": c.get("cert_type"),
}
)
return json.dumps({"count": len(result), "certificates": result}, indent=2)
except Exception as e:
return _handle_api_error(e)
# =========================================================================
# Entry point
# =========================================================================
if __name__ == "__main__":
mcp.run()