mcp-servers/memory-bank-mcp/memory_bank_mcp.py

354 lines
9.6 KiB
Python
Raw Permalink Normal View History

"""
Memory Bank MCP Server
======================
MCP server providing persistent memory storage for LLM conversations.
Stores and retrieves key-value memories with metadata, tags, and
semantic search capabilities. Backed by a local JSON file store.
"""
import json
import os
import time
import hashlib
from pathlib import Path
from typing import Optional, List, Any, Dict
from datetime import datetime, timezone
from mcp.server.fastmcp import FastMCP
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
MEMORY_DIR = os.environ.get("MEMORY_DIR", "/data/memories")
MAX_MEMORIES = int(os.environ.get("MAX_MEMORIES", "10000"))
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP("memory_bank_mcp")
# ---------------------------------------------------------------------------
# Storage helpers
# ---------------------------------------------------------------------------
def _ensure_dir():
"""Ensure the memory directory exists."""
Path(MEMORY_DIR).mkdir(parents=True, exist_ok=True)
def _index_path() -> str:
return os.path.join(MEMORY_DIR, "_index.json")
def _load_index() -> Dict[str, Any]:
"""Load the memory index."""
_ensure_dir()
idx_path = _index_path()
if os.path.exists(idx_path):
with open(idx_path, "r") as f:
return json.load(f)
return {"memories": {}, "tags": {}}
def _save_index(index: Dict[str, Any]):
"""Save the memory index."""
_ensure_dir()
with open(_index_path(), "w") as f:
json.dump(index, f, indent=2)
def _memory_path(memory_id: str) -> str:
return os.path.join(MEMORY_DIR, f"{memory_id}.json")
def _generate_id(content: str) -> str:
"""Generate a deterministic ID from content."""
return hashlib.sha256(content.encode()).hexdigest()[:16]
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool()
async def store_memory(
key: str,
content: str,
tags: Optional[List[str]] = None,
category: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Store a memory with a key, content, optional tags, category, and metadata.
If a memory with the same key exists, it will be updated.
Args:
key: Unique key/name for this memory
content: The content to remember
tags: Optional list of tags for categorization
category: Optional category (e.g., 'user_preference', 'fact', 'context')
metadata: Optional additional metadata dict
"""
index = _load_index()
now = datetime.now(timezone.utc).isoformat()
memory = {
"key": key,
"content": content,
"tags": tags or [],
"category": category or "general",
"metadata": metadata or {},
"created_at": now,
"updated_at": now,
"access_count": 0,
}
# Update if exists
if key in index["memories"]:
existing = _load_memory_file(key)
if existing:
memory["created_at"] = existing.get("created_at", now)
memory["access_count"] = existing.get("access_count", 0)
# Save memory file
_ensure_dir()
with open(_memory_path(key), "w") as f:
json.dump(memory, f, indent=2)
# Update index
index["memories"][key] = {
"category": memory["category"],
"tags": memory["tags"],
"updated_at": now,
}
# Update tag index
for tag in memory["tags"]:
if tag not in index["tags"]:
index["tags"][tag] = []
if key not in index["tags"][tag]:
index["tags"][tag].append(key)
_save_index(index)
return {"status": "stored", "key": key, "updated_at": now}
def _load_memory_file(key: str) -> Optional[Dict[str, Any]]:
"""Load a memory file by key."""
path = _memory_path(key)
if os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
return None
@mcp.tool()
async def recall_memory(key: str) -> Dict[str, Any]:
"""
Retrieve a specific memory by its key.
Args:
key: The key of the memory to retrieve
"""
memory = _load_memory_file(key)
if not memory:
return {"error": f"Memory '{key}' not found"}
# Update access count
memory["access_count"] = memory.get("access_count", 0) + 1
memory["last_accessed"] = datetime.now(timezone.utc).isoformat()
with open(_memory_path(key), "w") as f:
json.dump(memory, f, indent=2)
return memory
@mcp.tool()
async def search_memories(
query: Optional[str] = None,
tags: Optional[List[str]] = None,
category: Optional[str] = None,
limit: int = 20,
) -> Dict[str, Any]:
"""
Search memories by text query, tags, or category.
Args:
query: Optional text to search for in memory content and keys
tags: Optional list of tags to filter by (memories must have ALL tags)
category: Optional category to filter by
limit: Maximum results to return (default 20)
"""
index = _load_index()
results = []
# Get candidate keys
candidate_keys = set(index["memories"].keys())
# Filter by tags
if tags:
for tag in tags:
tag_keys = set(index["tags"].get(tag, []))
candidate_keys &= tag_keys
# Filter by category
if category:
candidate_keys = {
k for k in candidate_keys
if index["memories"].get(k, {}).get("category") == category
}
# Load and search
for key in candidate_keys:
memory = _load_memory_file(key)
if not memory:
continue
if query:
query_lower = query.lower()
if (
query_lower not in memory.get("content", "").lower()
and query_lower not in memory.get("key", "").lower()
and not any(query_lower in t.lower() for t in memory.get("tags", []))
):
continue
results.append({
"key": memory["key"],
"content": memory["content"][:200] + ("..." if len(memory.get("content", "")) > 200 else ""),
"category": memory.get("category"),
"tags": memory.get("tags", []),
"updated_at": memory.get("updated_at"),
"access_count": memory.get("access_count", 0),
})
# Sort by most recently updated
results.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
return {
"total": len(results),
"results": results[:limit],
}
@mcp.tool()
async def delete_memory(key: str) -> Dict[str, Any]:
"""
Delete a specific memory by key.
Args:
key: The key of the memory to delete
"""
index = _load_index()
if key not in index["memories"]:
return {"error": f"Memory '{key}' not found"}
# Remove from tag index
mem_info = index["memories"][key]
for tag in mem_info.get("tags", []):
if tag in index["tags"] and key in index["tags"][tag]:
index["tags"][tag].remove(key)
if not index["tags"][tag]:
del index["tags"][tag]
# Remove from index
del index["memories"][key]
_save_index(index)
# Remove file
path = _memory_path(key)
if os.path.exists(path):
os.remove(path)
return {"status": "deleted", "key": key}
@mcp.tool()
async def list_memories(
category: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> Dict[str, Any]:
"""
List all stored memories with optional category filter.
Args:
category: Optional category to filter by
limit: Maximum results (default 50)
offset: Skip first N results (default 0)
"""
index = _load_index()
memories = []
for key, info in index["memories"].items():
if category and info.get("category") != category:
continue
memories.append({
"key": key,
"category": info.get("category"),
"tags": info.get("tags", []),
"updated_at": info.get("updated_at"),
})
memories.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
total = len(memories)
return {
"total": total,
"offset": offset,
"limit": limit,
"memories": memories[offset:offset + limit],
}
@mcp.tool()
async def get_memory_stats() -> Dict[str, Any]:
"""Get statistics about the memory bank."""
index = _load_index()
categories = {}
for info in index["memories"].values():
cat = info.get("category", "general")
categories[cat] = categories.get(cat, 0) + 1
return {
"total_memories": len(index["memories"]),
"total_tags": len(index["tags"]),
"categories": categories,
"max_memories": MAX_MEMORIES,
}
@mcp.tool()
async def bulk_store_memories(
memories: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""
Store multiple memories at once.
Args:
memories: List of memory objects, each with 'key', 'content', and optional 'tags', 'category', 'metadata'
"""
results = []
for mem in memories:
result = await store_memory(
key=mem["key"],
content=mem["content"],
tags=mem.get("tags"),
category=mem.get("category"),
metadata=mem.get("metadata"),
)
results.append(result)
return {
"status": "bulk_stored",
"count": len(results),
"results": results,
}