1321 lines
42 KiB
Python
1321 lines
42 KiB
Python
|
|
"""
|
||
|
|
TrueNAS MCP Server
|
||
|
|
==================
|
||
|
|
MCP server for managing TrueNAS SCALE systems via the REST API v2.0.
|
||
|
|
Provides tools for monitoring system health, storage pools, datasets,
|
||
|
|
services, shares, apps, VMs, disks, and alerts.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import urllib.parse
|
||
|
|
from typing import Optional, List, Any, Dict
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
from mcp.server.fastmcp import FastMCP
|
||
|
|
from pydantic import BaseModel, Field, ConfigDict, field_validator
|
||
|
|
from enum import Enum
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Configuration
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
TRUENAS_URL = os.environ.get("TRUENAS_URL", "")
|
||
|
|
TRUENAS_API_KEY = os.environ.get("TRUENAS_API_KEY", "")
|
||
|
|
BASE = f"{TRUENAS_URL}/api/v2.0"
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# MCP Server
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
mcp = FastMCP("truenas_mcp")
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Shared HTTP helpers
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
def _headers() -> dict:
|
||
|
|
return {
|
||
|
|
"Authorization": f"Bearer {TRUENAS_API_KEY}",
|
||
|
|
"Content-Type": "application/json",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
async def _get(path: str, params: Optional[dict] = None) -> Any:
|
||
|
|
"""HTTP GET against the TrueNAS REST API."""
|
||
|
|
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||
|
|
resp = await client.get(f"{BASE}/{path}", headers=_headers(), params=params)
|
||
|
|
resp.raise_for_status()
|
||
|
|
return resp.json()
|
||
|
|
|
||
|
|
|
||
|
|
async def _post(path: str, payload: Any = None) -> Any:
|
||
|
|
"""HTTP POST against the TrueNAS REST API."""
|
||
|
|
async with httpx.AsyncClient(verify=False, timeout=60) as client:
|
||
|
|
resp = await client.post(
|
||
|
|
f"{BASE}/{path}", headers=_headers(), json=payload
|
||
|
|
)
|
||
|
|
resp.raise_for_status()
|
||
|
|
return resp.json()
|
||
|
|
|
||
|
|
|
||
|
|
async def _put(path: str, payload: Any = None) -> Any:
|
||
|
|
"""HTTP PUT against the TrueNAS REST API."""
|
||
|
|
async with httpx.AsyncClient(verify=False, timeout=60) as client:
|
||
|
|
resp = await client.put(
|
||
|
|
f"{BASE}/{path}", headers=_headers(), json=payload
|
||
|
|
)
|
||
|
|
resp.raise_for_status()
|
||
|
|
return resp.json()
|
||
|
|
|
||
|
|
|
||
|
|
async def _delete(path: str) -> Any:
|
||
|
|
"""HTTP DELETE against the TrueNAS REST API."""
|
||
|
|
async with httpx.AsyncClient(verify=False, timeout=30) as client:
|
||
|
|
resp = await client.delete(f"{BASE}/{path}", headers=_headers())
|
||
|
|
resp.raise_for_status()
|
||
|
|
try:
|
||
|
|
return resp.json()
|
||
|
|
except Exception:
|
||
|
|
return {"status": "deleted"}
|
||
|
|
|
||
|
|
|
||
|
|
def _handle_api_error(e: Exception) -> str:
|
||
|
|
if isinstance(e, httpx.HTTPStatusError):
|
||
|
|
status = e.response.status_code
|
||
|
|
try:
|
||
|
|
body = e.response.json()
|
||
|
|
msg = body.get("message", str(body))
|
||
|
|
except Exception:
|
||
|
|
msg = e.response.text[:500]
|
||
|
|
if status == 404:
|
||
|
|
return f"Error 404: Resource not found. {msg}"
|
||
|
|
elif status == 403:
|
||
|
|
return f"Error 403: Permission denied. {msg}"
|
||
|
|
elif status == 405:
|
||
|
|
return f"Error 405: Method not allowed for this endpoint. {msg}"
|
||
|
|
elif status == 422:
|
||
|
|
return f"Error 422: Validation error. {msg}"
|
||
|
|
elif status == 429:
|
||
|
|
return "Error 429: Rate limit exceeded. Wait and retry."
|
||
|
|
return f"Error {status}: {msg}"
|
||
|
|
elif isinstance(e, httpx.TimeoutException):
|
||
|
|
return "Error: Request timed out. The server may be busy."
|
||
|
|
return f"Error: {type(e).__name__}: {e}"
|
||
|
|
|
||
|
|
|
||
|
|
def _bytes_human(n: int) -> str:
|
||
|
|
"""Convert bytes to human-readable string."""
|
||
|
|
for unit in ("B", "KiB", "MiB", "GiB", "TiB", "PiB"):
|
||
|
|
if abs(n) < 1024:
|
||
|
|
return f"{n:.1f} {unit}"
|
||
|
|
n /= 1024
|
||
|
|
return f"{n:.1f} EiB"
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Input models
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class EmptyInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
|
||
|
|
|
||
|
|
class IdInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
id: int = Field(..., description="Numeric ID of the resource")
|
||
|
|
|
||
|
|
|
||
|
|
class NameInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
name: str = Field(..., description="Name of the resource", min_length=1)
|
||
|
|
|
||
|
|
|
||
|
|
class DatasetIdInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
dataset_id: str = Field(
|
||
|
|
...,
|
||
|
|
description="Full dataset path, e.g. 'NVME/SMB' or 'Storage'. URL-encoded automatically.",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class ListDatasetsInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
pool: Optional[str] = Field(
|
||
|
|
default=None,
|
||
|
|
description="Optional pool name to filter datasets (e.g. 'NVME', 'Storage'). If omitted, returns all datasets.",
|
||
|
|
)
|
||
|
|
limit: int = Field(default=50, description="Max datasets to return", ge=1, le=500)
|
||
|
|
offset: int = Field(default=0, description="Offset for pagination", ge=0)
|
||
|
|
|
||
|
|
|
||
|
|
class ServiceActionInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
service: str = Field(
|
||
|
|
...,
|
||
|
|
description="Service name, e.g. 'cifs', 'nfs', 'ssh', 'ftp', 'iscsitarget', 'snmp'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class CreateDatasetInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
name: str = Field(
|
||
|
|
...,
|
||
|
|
description="Full dataset name including pool, e.g. 'NVME/my_new_dataset'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
comments: Optional[str] = Field(default=None, description="Optional description/comments")
|
||
|
|
compression: Optional[str] = Field(
|
||
|
|
default=None,
|
||
|
|
description="Compression algorithm: 'LZ4', 'GZIP', 'ZLE', 'LZJB', 'ZSTD', or 'OFF'",
|
||
|
|
)
|
||
|
|
quota: Optional[int] = Field(
|
||
|
|
default=None,
|
||
|
|
description="Quota in bytes. 0 means no quota.",
|
||
|
|
ge=0,
|
||
|
|
)
|
||
|
|
readonly: Optional[bool] = Field(default=None, description="Set dataset read-only")
|
||
|
|
|
||
|
|
|
||
|
|
class CreateSnapshotInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
dataset: str = Field(
|
||
|
|
...,
|
||
|
|
description="Dataset name to snapshot, e.g. 'NVME/SMB'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
name: str = Field(
|
||
|
|
...,
|
||
|
|
description="Snapshot name suffix, e.g. 'manual-backup-2024'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
recursive: bool = Field(default=False, description="Include child datasets")
|
||
|
|
|
||
|
|
|
||
|
|
class ListSnapshotTasksInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
|
||
|
|
|
||
|
|
class AppActionInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
app_name: str = Field(
|
||
|
|
...,
|
||
|
|
description="Name of the app, e.g. 'plex', 'nextcloud', 'sonarr'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class VMActionInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
vm_id: int = Field(..., description="Numeric VM ID")
|
||
|
|
|
||
|
|
|
||
|
|
class CreateSMBShareInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
path: str = Field(..., description="Filesystem path to share, e.g. '/mnt/NVME/myshare'", min_length=1)
|
||
|
|
name: str = Field(..., description="Share name visible on the network", min_length=1)
|
||
|
|
comment: Optional[str] = Field(default=None, description="Optional description")
|
||
|
|
readonly: bool = Field(default=False, description="Share is read-only")
|
||
|
|
guest_ok: bool = Field(default=False, description="Allow guest access (no auth)")
|
||
|
|
|
||
|
|
|
||
|
|
class DirListInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
|
||
|
|
path: str = Field(
|
||
|
|
...,
|
||
|
|
description="Absolute path to list, e.g. '/mnt/NVME/SMB'",
|
||
|
|
min_length=1,
|
||
|
|
)
|
||
|
|
limit: int = Field(default=100, description="Max entries to return", ge=1, le=1000)
|
||
|
|
|
||
|
|
|
||
|
|
class CronJobInput(BaseModel):
|
||
|
|
model_config = ConfigDict(extra="forbid")
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — System
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_system_info",
|
||
|
|
annotations={
|
||
|
|
"title": "System Information",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_system_info(params: EmptyInput) -> str:
|
||
|
|
"""Get TrueNAS system information including version, CPU, memory, uptime, and hostname."""
|
||
|
|
try:
|
||
|
|
info = await _get("system/info")
|
||
|
|
mem_gb = info.get("physmem", 0) / (1024**3)
|
||
|
|
return json.dumps(
|
||
|
|
{
|
||
|
|
"version": info.get("version"),
|
||
|
|
"hostname": info.get("hostname"),
|
||
|
|
"cpu_model": info.get("model"),
|
||
|
|
"cores": info.get("cores"),
|
||
|
|
"physical_cores": info.get("physical_cores"),
|
||
|
|
"memory_gb": round(mem_gb, 1),
|
||
|
|
"uptime": info.get("uptime"),
|
||
|
|
"timezone": info.get("timezone"),
|
||
|
|
"system_manufacturer": info.get("system_manufacturer"),
|
||
|
|
"system_product": info.get("system_product"),
|
||
|
|
"ecc_memory": info.get("ecc_memory"),
|
||
|
|
"loadavg_1m": round(info.get("loadavg", [0])[0], 2),
|
||
|
|
"loadavg_5m": round(info.get("loadavg", [0, 0])[1], 2),
|
||
|
|
"loadavg_15m": round(info.get("loadavg", [0, 0, 0])[2], 2),
|
||
|
|
},
|
||
|
|
indent=2,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Alerts
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_alerts",
|
||
|
|
annotations={
|
||
|
|
"title": "List Active Alerts",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_alerts(params: EmptyInput) -> str:
|
||
|
|
"""List all active system alerts with severity levels and messages."""
|
||
|
|
try:
|
||
|
|
alerts = await _get("alert/list")
|
||
|
|
if not alerts:
|
||
|
|
return json.dumps({"count": 0, "alerts": [], "message": "No active alerts."})
|
||
|
|
result = []
|
||
|
|
for a in alerts:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": a.get("id"),
|
||
|
|
"level": a.get("level"),
|
||
|
|
"source": a.get("source"),
|
||
|
|
"klass": a.get("klass"),
|
||
|
|
"message": a.get("formatted"),
|
||
|
|
"dismissed": a.get("dismissed", False),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "alerts": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Storage Pools
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_pools",
|
||
|
|
annotations={
|
||
|
|
"title": "List Storage Pools",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_pools(params: EmptyInput) -> str:
|
||
|
|
"""List all ZFS storage pools with status, health, and capacity info."""
|
||
|
|
try:
|
||
|
|
pools = await _get("pool")
|
||
|
|
result = []
|
||
|
|
for p in pools:
|
||
|
|
topology = p.get("topology", {})
|
||
|
|
data_vdevs = topology.get("data", [])
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": p.get("id"),
|
||
|
|
"name": p.get("name"),
|
||
|
|
"status": p.get("status"),
|
||
|
|
"healthy": p.get("healthy"),
|
||
|
|
"is_decrypted": p.get("is_decrypted"),
|
||
|
|
"path": p.get("path"),
|
||
|
|
"data_vdevs": len(data_vdevs),
|
||
|
|
"scan": p.get("scan"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "pools": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_get_pool",
|
||
|
|
annotations={
|
||
|
|
"title": "Get Pool Details",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_get_pool(params: IdInput) -> str:
|
||
|
|
"""Get detailed information about a specific storage pool by ID, including topology and disk layout."""
|
||
|
|
try:
|
||
|
|
p = await _get(f"pool/id/{params.id}")
|
||
|
|
return json.dumps(p, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Datasets
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_datasets",
|
||
|
|
annotations={
|
||
|
|
"title": "List Datasets",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_datasets(params: ListDatasetsInput) -> str:
|
||
|
|
"""List ZFS datasets with usage, compression, and quota information. Optionally filter by pool name."""
|
||
|
|
try:
|
||
|
|
datasets = await _get("pool/dataset")
|
||
|
|
if params.pool:
|
||
|
|
datasets = [d for d in datasets if d.get("pool") == params.pool or d.get("name", "").startswith(params.pool + "/") or d.get("name") == params.pool]
|
||
|
|
|
||
|
|
total = len(datasets)
|
||
|
|
page = datasets[params.offset : params.offset + params.limit]
|
||
|
|
result = []
|
||
|
|
for d in page:
|
||
|
|
used_raw = d.get("used", {}).get("rawvalue", 0)
|
||
|
|
avail_raw = d.get("available", {}).get("rawvalue", 0)
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": d.get("id"),
|
||
|
|
"name": d.get("name"),
|
||
|
|
"pool": d.get("pool"),
|
||
|
|
"type": d.get("type"),
|
||
|
|
"used": _bytes_human(int(used_raw)),
|
||
|
|
"available": _bytes_human(int(avail_raw)),
|
||
|
|
"compression": d.get("compression", {}).get("value"),
|
||
|
|
"readonly": d.get("readonly", {}).get("value"),
|
||
|
|
"mountpoint": d.get("mountpoint"),
|
||
|
|
"quota": d.get("quota", {}).get("rawvalue"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps(
|
||
|
|
{
|
||
|
|
"total": total,
|
||
|
|
"count": len(result),
|
||
|
|
"offset": params.offset,
|
||
|
|
"has_more": total > params.offset + len(result),
|
||
|
|
"datasets": result,
|
||
|
|
},
|
||
|
|
indent=2,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_get_dataset",
|
||
|
|
annotations={
|
||
|
|
"title": "Get Dataset Details",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_get_dataset(params: DatasetIdInput) -> str:
|
||
|
|
"""Get detailed info about a specific dataset by its full path (e.g. 'NVME/SMB')."""
|
||
|
|
try:
|
||
|
|
encoded = urllib.parse.quote(params.dataset_id, safe="")
|
||
|
|
d = await _get(f"pool/dataset/id/{encoded}")
|
||
|
|
return json.dumps(d, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_create_dataset",
|
||
|
|
annotations={
|
||
|
|
"title": "Create Dataset",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": False,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_create_dataset(params: CreateDatasetInput) -> str:
|
||
|
|
"""Create a new ZFS dataset. Provide the full name including pool (e.g. 'NVME/my_dataset')."""
|
||
|
|
try:
|
||
|
|
payload: Dict[str, Any] = {"name": params.name}
|
||
|
|
if params.comments:
|
||
|
|
payload["comments"] = params.comments
|
||
|
|
if params.compression:
|
||
|
|
payload["compression"] = params.compression.upper()
|
||
|
|
if params.quota is not None:
|
||
|
|
payload["quota"] = params.quota
|
||
|
|
if params.readonly is not None:
|
||
|
|
payload["readonly"] = "ON" if params.readonly else "OFF"
|
||
|
|
|
||
|
|
result = await _post("pool/dataset", payload)
|
||
|
|
return json.dumps(
|
||
|
|
{"status": "created", "dataset": result.get("name"), "id": result.get("id")},
|
||
|
|
indent=2,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Snapshots
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_create_snapshot",
|
||
|
|
annotations={
|
||
|
|
"title": "Create ZFS Snapshot",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": False,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_create_snapshot(params: CreateSnapshotInput) -> str:
|
||
|
|
"""Create a manual ZFS snapshot of a dataset."""
|
||
|
|
try:
|
||
|
|
payload = {
|
||
|
|
"dataset": params.dataset,
|
||
|
|
"name": params.name,
|
||
|
|
"recursive": params.recursive,
|
||
|
|
}
|
||
|
|
result = await _post("zfs/snapshot", payload)
|
||
|
|
return json.dumps({"status": "created", "snapshot": result}, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_snapshot_tasks",
|
||
|
|
annotations={
|
||
|
|
"title": "List Snapshot Tasks",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_snapshot_tasks(params: ListSnapshotTasksInput) -> str:
|
||
|
|
"""List all configured periodic snapshot tasks."""
|
||
|
|
try:
|
||
|
|
tasks = await _get("pool/snapshottask")
|
||
|
|
result = []
|
||
|
|
for t in tasks:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": t.get("id"),
|
||
|
|
"dataset": t.get("dataset"),
|
||
|
|
"recursive": t.get("recursive"),
|
||
|
|
"lifetime_value": t.get("lifetime_value"),
|
||
|
|
"lifetime_unit": t.get("lifetime_unit"),
|
||
|
|
"enabled": t.get("enabled"),
|
||
|
|
"naming_schema": t.get("naming_schema"),
|
||
|
|
"schedule": t.get("schedule"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "tasks": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Disks
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_disks",
|
||
|
|
annotations={
|
||
|
|
"title": "List Physical Disks",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_disks(params: EmptyInput) -> str:
|
||
|
|
"""List all physical disks with model, size, serial, temperature, and type info."""
|
||
|
|
try:
|
||
|
|
disks = await _get("disk")
|
||
|
|
result = []
|
||
|
|
for d in disks:
|
||
|
|
size_bytes = d.get("size", 0) or 0
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"name": d.get("name"),
|
||
|
|
"model": d.get("model"),
|
||
|
|
"serial": d.get("serial"),
|
||
|
|
"size": _bytes_human(size_bytes),
|
||
|
|
"size_bytes": size_bytes,
|
||
|
|
"type": d.get("type"),
|
||
|
|
"rotationrate": d.get("rotationrate"),
|
||
|
|
"hddstandby": d.get("hddstandby"),
|
||
|
|
"pool": d.get("pool"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "disks": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Services
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_services",
|
||
|
|
annotations={
|
||
|
|
"title": "List Services",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_services(params: EmptyInput) -> str:
|
||
|
|
"""List all system services with their running state and auto-start configuration."""
|
||
|
|
try:
|
||
|
|
services = await _get("service")
|
||
|
|
result = []
|
||
|
|
for s in services:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": s.get("id"),
|
||
|
|
"service": s.get("service"),
|
||
|
|
"state": s.get("state"),
|
||
|
|
"enable": s.get("enable"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "services": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_start_service",
|
||
|
|
annotations={
|
||
|
|
"title": "Start Service",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_start_service(params: ServiceActionInput) -> str:
|
||
|
|
"""Start a system service by name (e.g. 'cifs', 'nfs', 'ssh')."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"service/start", {"service": params.service})
|
||
|
|
return json.dumps({"status": "started", "service": params.service, "result": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_stop_service",
|
||
|
|
annotations={
|
||
|
|
"title": "Stop Service",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_stop_service(params: ServiceActionInput) -> str:
|
||
|
|
"""Stop a system service by name (e.g. 'cifs', 'nfs', 'ssh')."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"service/stop", {"service": params.service})
|
||
|
|
return json.dumps({"status": "stopped", "service": params.service, "result": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_restart_service",
|
||
|
|
annotations={
|
||
|
|
"title": "Restart Service",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": False,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_restart_service(params: ServiceActionInput) -> str:
|
||
|
|
"""Restart a system service by name."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"service/restart", {"service": params.service})
|
||
|
|
return json.dumps({"status": "restarted", "service": params.service, "result": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — SMB Shares
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_smb_shares",
|
||
|
|
annotations={
|
||
|
|
"title": "List SMB Shares",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_smb_shares(params: EmptyInput) -> str:
|
||
|
|
"""List all SMB/CIFS file shares with their paths and configuration."""
|
||
|
|
try:
|
||
|
|
shares = await _get("sharing/smb")
|
||
|
|
result = []
|
||
|
|
for s in shares:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": s.get("id"),
|
||
|
|
"name": s.get("name"),
|
||
|
|
"path": s.get("path"),
|
||
|
|
"comment": s.get("comment"),
|
||
|
|
"enabled": s.get("enabled"),
|
||
|
|
"readonly": s.get("ro"),
|
||
|
|
"guest_ok": s.get("guestok"),
|
||
|
|
"browsable": s.get("browsable"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "shares": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_create_smb_share",
|
||
|
|
annotations={
|
||
|
|
"title": "Create SMB Share",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": False,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_create_smb_share(params: CreateSMBShareInput) -> str:
|
||
|
|
"""Create a new SMB/CIFS file share."""
|
||
|
|
try:
|
||
|
|
payload: Dict[str, Any] = {
|
||
|
|
"path": params.path,
|
||
|
|
"name": params.name,
|
||
|
|
"ro": params.readonly,
|
||
|
|
"guestok": params.guest_ok,
|
||
|
|
}
|
||
|
|
if params.comment:
|
||
|
|
payload["comment"] = params.comment
|
||
|
|
result = await _post("sharing/smb", payload)
|
||
|
|
return json.dumps(
|
||
|
|
{"status": "created", "share": result.get("name"), "id": result.get("id")},
|
||
|
|
indent=2,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — NFS Shares
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_nfs_shares",
|
||
|
|
annotations={
|
||
|
|
"title": "List NFS Shares",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_nfs_shares(params: EmptyInput) -> str:
|
||
|
|
"""List all NFS exports with their paths and access configuration."""
|
||
|
|
try:
|
||
|
|
shares = await _get("sharing/nfs")
|
||
|
|
result = []
|
||
|
|
for s in shares:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": s.get("id"),
|
||
|
|
"path": s.get("path"),
|
||
|
|
"comment": s.get("comment"),
|
||
|
|
"enabled": s.get("enabled"),
|
||
|
|
"readonly": s.get("ro"),
|
||
|
|
"networks": s.get("networks"),
|
||
|
|
"hosts": s.get("hosts"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "shares": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Apps
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_apps",
|
||
|
|
annotations={
|
||
|
|
"title": "List Installed Apps",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_apps(params: EmptyInput) -> str:
|
||
|
|
"""List all installed TrueNAS apps (Docker-based) with their running state."""
|
||
|
|
try:
|
||
|
|
apps = await _get("app")
|
||
|
|
result = []
|
||
|
|
for a in apps:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"name": a.get("name"),
|
||
|
|
"state": a.get("state"),
|
||
|
|
"version": a.get("version"),
|
||
|
|
"human_version": a.get("human_version"),
|
||
|
|
"update_available": a.get("update_available", False),
|
||
|
|
"portal": a.get("portals"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "apps": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_start_app",
|
||
|
|
annotations={
|
||
|
|
"title": "Start App",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_start_app(params: AppActionInput) -> str:
|
||
|
|
"""Start a TrueNAS app by name."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"app/id/{params.app_name}/start")
|
||
|
|
return json.dumps({"status": "starting", "app": params.app_name, "job_id": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_stop_app",
|
||
|
|
annotations={
|
||
|
|
"title": "Stop App",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_stop_app(params: AppActionInput) -> str:
|
||
|
|
"""Stop a TrueNAS app by name."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"app/id/{params.app_name}/stop")
|
||
|
|
return json.dumps({"status": "stopping", "app": params.app_name, "job_id": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — VMs
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_vms",
|
||
|
|
annotations={
|
||
|
|
"title": "List Virtual Machines",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_vms(params: EmptyInput) -> str:
|
||
|
|
"""List all virtual machines with status, CPU, and memory configuration."""
|
||
|
|
try:
|
||
|
|
vms = await _get("vm")
|
||
|
|
result = []
|
||
|
|
for v in vms:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": v.get("id"),
|
||
|
|
"name": v.get("name"),
|
||
|
|
"status": v.get("status", {}).get("state"),
|
||
|
|
"vcpus": v.get("vcpus"),
|
||
|
|
"memory_mb": v.get("memory"),
|
||
|
|
"autostart": v.get("autostart"),
|
||
|
|
"description": v.get("description"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "vms": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_start_vm",
|
||
|
|
annotations={
|
||
|
|
"title": "Start VM",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_start_vm(params: VMActionInput) -> str:
|
||
|
|
"""Start a virtual machine by its numeric ID."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"vm/id/{params.vm_id}/start")
|
||
|
|
return json.dumps({"status": "starting", "vm_id": params.vm_id, "result": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_stop_vm",
|
||
|
|
annotations={
|
||
|
|
"title": "Stop VM",
|
||
|
|
"readOnlyHint": False,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_stop_vm(params: VMActionInput) -> str:
|
||
|
|
"""Stop a virtual machine by its numeric ID."""
|
||
|
|
try:
|
||
|
|
result = await _post(f"vm/id/{params.vm_id}/stop")
|
||
|
|
return json.dumps({"status": "stopping", "vm_id": params.vm_id, "result": result})
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Network
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_network_summary",
|
||
|
|
annotations={
|
||
|
|
"title": "Network Summary",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_network_summary(params: EmptyInput) -> str:
|
||
|
|
"""Get network configuration summary including IPs, gateways, DNS, and interfaces."""
|
||
|
|
try:
|
||
|
|
summary = await _get("network/general/summary")
|
||
|
|
return json.dumps(summary, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_interfaces",
|
||
|
|
annotations={
|
||
|
|
"title": "List Network Interfaces",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_interfaces(params: EmptyInput) -> str:
|
||
|
|
"""List all network interfaces with their addresses and link state."""
|
||
|
|
try:
|
||
|
|
ifaces = await _get("interface")
|
||
|
|
result = []
|
||
|
|
for i in ifaces:
|
||
|
|
aliases = []
|
||
|
|
for a in i.get("aliases", []):
|
||
|
|
aliases.append(f"{a.get('address')}/{a.get('netmask')}")
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": i.get("id"),
|
||
|
|
"name": i.get("name"),
|
||
|
|
"type": i.get("type"),
|
||
|
|
"state": i.get("state"),
|
||
|
|
"aliases": aliases,
|
||
|
|
"mtu": i.get("mtu"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "interfaces": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Users & Groups
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_users",
|
||
|
|
annotations={
|
||
|
|
"title": "List Users",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_users(params: EmptyInput) -> str:
|
||
|
|
"""List all system users."""
|
||
|
|
try:
|
||
|
|
users = await _get("user")
|
||
|
|
result = []
|
||
|
|
for u in users:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": u.get("id"),
|
||
|
|
"username": u.get("username"),
|
||
|
|
"full_name": u.get("full_name"),
|
||
|
|
"uid": u.get("uid"),
|
||
|
|
"builtin": u.get("builtin"),
|
||
|
|
"locked": u.get("locked"),
|
||
|
|
"shell": u.get("shell"),
|
||
|
|
"home": u.get("home"),
|
||
|
|
"group": u.get("group", {}).get("bsdgrp_group") if isinstance(u.get("group"), dict) else u.get("group"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "users": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_groups",
|
||
|
|
annotations={
|
||
|
|
"title": "List Groups",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_groups(params: EmptyInput) -> str:
|
||
|
|
"""List all system groups."""
|
||
|
|
try:
|
||
|
|
groups = await _get("group")
|
||
|
|
result = []
|
||
|
|
for g in groups:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": g.get("id"),
|
||
|
|
"group": g.get("group"),
|
||
|
|
"gid": g.get("gid"),
|
||
|
|
"builtin": g.get("builtin"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "groups": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Filesystem
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_directory",
|
||
|
|
annotations={
|
||
|
|
"title": "List Directory Contents",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_directory(params: DirListInput) -> str:
|
||
|
|
"""List files and directories at a given path on the TrueNAS filesystem."""
|
||
|
|
try:
|
||
|
|
payload = {"path": params.path, "limit": params.limit}
|
||
|
|
entries = await _post("filesystem/listdir", payload)
|
||
|
|
result = []
|
||
|
|
for e in entries[:params.limit]:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"name": e.get("name"),
|
||
|
|
"path": e.get("path"),
|
||
|
|
"type": e.get("type"),
|
||
|
|
"size": e.get("size"),
|
||
|
|
"uid": e.get("uid"),
|
||
|
|
"gid": e.get("gid"),
|
||
|
|
"mode": e.get("mode"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"path": params.path, "count": len(result), "entries": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Cron Jobs
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_cronjobs",
|
||
|
|
annotations={
|
||
|
|
"title": "List Cron Jobs",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_cronjobs(params: CronJobInput) -> str:
|
||
|
|
"""List all configured cron jobs."""
|
||
|
|
try:
|
||
|
|
jobs = await _get("cronjob")
|
||
|
|
result = []
|
||
|
|
for j in jobs:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": j.get("id"),
|
||
|
|
"description": j.get("description"),
|
||
|
|
"command": j.get("command"),
|
||
|
|
"user": j.get("user"),
|
||
|
|
"schedule": j.get("schedule"),
|
||
|
|
"enabled": j.get("enabled"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "cronjobs": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Replication
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_replications",
|
||
|
|
annotations={
|
||
|
|
"title": "List Replication Tasks",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_replications(params: EmptyInput) -> str:
|
||
|
|
"""List all configured replication tasks with source, target, and schedule."""
|
||
|
|
try:
|
||
|
|
tasks = await _get("replication")
|
||
|
|
result = []
|
||
|
|
for t in tasks:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": t.get("id"),
|
||
|
|
"name": t.get("name"),
|
||
|
|
"direction": t.get("direction"),
|
||
|
|
"transport": t.get("transport"),
|
||
|
|
"source_datasets": t.get("source_datasets"),
|
||
|
|
"target_dataset": t.get("target_dataset"),
|
||
|
|
"enabled": t.get("enabled"),
|
||
|
|
"state": t.get("state"),
|
||
|
|
"schedule": t.get("schedule"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "replications": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Cloud Sync
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_cloudsync",
|
||
|
|
annotations={
|
||
|
|
"title": "List Cloud Sync Tasks",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_cloudsync(params: EmptyInput) -> str:
|
||
|
|
"""List all cloud sync tasks with their configuration and status."""
|
||
|
|
try:
|
||
|
|
tasks = await _get("cloudsync")
|
||
|
|
result = []
|
||
|
|
for t in tasks:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": t.get("id"),
|
||
|
|
"description": t.get("description"),
|
||
|
|
"direction": t.get("direction"),
|
||
|
|
"path": t.get("path"),
|
||
|
|
"enabled": t.get("enabled"),
|
||
|
|
"schedule": t.get("schedule"),
|
||
|
|
"job": t.get("job"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "tasks": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Boot
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_boot_state",
|
||
|
|
annotations={
|
||
|
|
"title": "Boot Pool State",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_boot_state(params: EmptyInput) -> str:
|
||
|
|
"""Get the state of the boot pool (health, devices, capacity)."""
|
||
|
|
try:
|
||
|
|
state = await _get("boot/get_state")
|
||
|
|
return json.dumps(state, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — iSCSI
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_iscsi_targets",
|
||
|
|
annotations={
|
||
|
|
"title": "List iSCSI Targets",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_iscsi_targets(params: EmptyInput) -> str:
|
||
|
|
"""List all configured iSCSI targets."""
|
||
|
|
try:
|
||
|
|
targets = await _get("iscsi/target")
|
||
|
|
return json.dumps({"count": len(targets), "targets": targets}, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_iscsi_extents",
|
||
|
|
annotations={
|
||
|
|
"title": "List iSCSI Extents",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_iscsi_extents(params: EmptyInput) -> str:
|
||
|
|
"""List all configured iSCSI extents (block storage backing)."""
|
||
|
|
try:
|
||
|
|
extents = await _get("iscsi/extent")
|
||
|
|
return json.dumps({"count": len(extents), "extents": extents}, indent=2, default=str)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# TOOLS — Certificates
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
|
||
|
|
@mcp.tool(
|
||
|
|
name="truenas_list_certificates",
|
||
|
|
annotations={
|
||
|
|
"title": "List Certificates",
|
||
|
|
"readOnlyHint": True,
|
||
|
|
"destructiveHint": False,
|
||
|
|
"idempotentHint": True,
|
||
|
|
"openWorldHint": False,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
async def truenas_list_certificates(params: EmptyInput) -> str:
|
||
|
|
"""List all SSL/TLS certificates configured on the system."""
|
||
|
|
try:
|
||
|
|
certs = await _get("certificate")
|
||
|
|
result = []
|
||
|
|
for c in certs:
|
||
|
|
result.append(
|
||
|
|
{
|
||
|
|
"id": c.get("id"),
|
||
|
|
"name": c.get("name"),
|
||
|
|
"issuer": c.get("issuer"),
|
||
|
|
"common": c.get("common"),
|
||
|
|
"from": c.get("from"),
|
||
|
|
"until": c.get("until"),
|
||
|
|
"type": c.get("cert_type"),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return json.dumps({"count": len(result), "certificates": result}, indent=2)
|
||
|
|
except Exception as e:
|
||
|
|
return _handle_api_error(e)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================================
|
||
|
|
# Entry point
|
||
|
|
# =========================================================================
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
mcp.run()
|