mcp-servers/puppeteer-mcp/puppeteer_mcp.py
Zac Gaetano 39fff1e44a Add Memory Bank, Puppeteer, Sequential Thinking, and Docker MCP servers
New MCP servers added to the gateway stack:
- memory-bank-mcp (port 8700): Persistent key-value memory storage with tags, categories, and search
- puppeteer-mcp (port 8800): Headless browser automation via Pyppeteer (navigate, screenshot, click, JS eval, PDF gen)
- sequential-thinking-mcp (port 8900): Structured step-by-step reasoning with branching hypotheses and synthesis
- docker-mcp (port 9000): Docker container/image/network/volume management via Docker socket

All servers follow the existing Python/FastMCP pattern with streamable-http transport.
docker-compose.yml updated with service definitions and gateway backend routes.
2026-03-31 23:02:47 -04:00

335 lines
9.2 KiB
Python
Executable file

"""
Puppeteer MCP Server
====================
MCP server providing headless browser automation via Pyppeteer.
Supports navigation, screenshots, page content extraction, form filling,
clicking elements, JavaScript evaluation, and PDF generation.
"""
import asyncio
import base64
import json
import os
from typing import Optional, List, Dict, Any
from mcp.server.fastmcp import FastMCP
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
CHROMIUM_ARGS = os.environ.get("CHROMIUM_ARGS", "--no-sandbox,--disable-setuid-sandbox,--disable-dev-shm-usage,--disable-gpu").split(",")
DEFAULT_VIEWPORT_WIDTH = int(os.environ.get("VIEWPORT_WIDTH", "1280"))
DEFAULT_VIEWPORT_HEIGHT = int(os.environ.get("VIEWPORT_HEIGHT", "720"))
DEFAULT_TIMEOUT = int(os.environ.get("PAGE_TIMEOUT", "30000"))
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP("puppeteer_mcp")
# ---------------------------------------------------------------------------
# Browser management
# ---------------------------------------------------------------------------
_browser = None
_pages: Dict[str, Any] = {}
async def _get_browser():
"""Get or create the browser instance."""
global _browser
if _browser is None or not _browser.process:
from pyppeteer import launch
_browser = await launch(
headless=True,
args=CHROMIUM_ARGS,
handleSIGINT=False,
handleSIGTERM=False,
handleSIGHUP=False,
)
return _browser
async def _get_page(page_id: str = "default") -> Any:
"""Get or create a page by ID."""
global _pages
browser = await _get_browser()
if page_id not in _pages or _pages[page_id].isClosed():
page = await browser.newPage()
await page.setViewport({
"width": DEFAULT_VIEWPORT_WIDTH,
"height": DEFAULT_VIEWPORT_HEIGHT,
})
_pages[page_id] = page
return _pages[page_id]
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool()
async def navigate(
url: str,
page_id: str = "default",
wait_until: str = "networkidle2",
) -> Dict[str, Any]:
"""
Navigate to a URL in the browser.
Args:
url: The URL to navigate to
page_id: Identifier for the browser tab/page (default: "default")
wait_until: When to consider navigation complete: 'load', 'domcontentloaded', 'networkidle0', 'networkidle2'
"""
page = await _get_page(page_id)
response = await page.goto(url, waitUntil=wait_until, timeout=DEFAULT_TIMEOUT)
return {
"status": "navigated",
"url": page.url,
"status_code": response.status if response else None,
"title": await page.title(),
}
@mcp.tool()
async def screenshot(
page_id: str = "default",
full_page: bool = False,
selector: Optional[str] = None,
) -> Dict[str, Any]:
"""
Take a screenshot of the current page or a specific element.
Args:
page_id: Page identifier (default: "default")
full_page: Whether to capture the full scrollable page
selector: Optional CSS selector to screenshot a specific element
"""
page = await _get_page(page_id)
options: Dict[str, Any] = {"encoding": "binary"}
if selector:
element = await page.querySelector(selector)
if not element:
return {"error": f"Element not found: {selector}"}
screenshot_bytes = await element.screenshot()
else:
options["fullPage"] = full_page
screenshot_bytes = await page.screenshot(options)
b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
return {
"status": "screenshot_taken",
"format": "png",
"base64": b64,
"url": page.url,
}
@mcp.tool()
async def get_page_content(
page_id: str = "default",
content_type: str = "text",
) -> Dict[str, Any]:
"""
Get the content of the current page.
Args:
page_id: Page identifier (default: "default")
content_type: 'text' for visible text, 'html' for full HTML source
"""
page = await _get_page(page_id)
if content_type == "html":
content = await page.content()
else:
content = await page.evaluate("() => document.body.innerText")
return {
"url": page.url,
"title": await page.title(),
"content_type": content_type,
"content": content[:50000], # Limit to 50k chars
"truncated": len(content) > 50000,
}
@mcp.tool()
async def click(
selector: str,
page_id: str = "default",
) -> Dict[str, Any]:
"""
Click an element on the page.
Args:
selector: CSS selector for the element to click
page_id: Page identifier (default: "default")
"""
page = await _get_page(page_id)
try:
await page.waitForSelector(selector, timeout=5000)
await page.click(selector)
await asyncio.sleep(0.5) # Brief wait for any navigation/updates
return {
"status": "clicked",
"selector": selector,
"url": page.url,
}
except Exception as e:
return {"error": f"Failed to click '{selector}': {str(e)}"}
@mcp.tool()
async def type_text(
selector: str,
text: str,
page_id: str = "default",
delay: int = 50,
) -> Dict[str, Any]:
"""
Type text into an input field.
Args:
selector: CSS selector for the input element
text: Text to type
page_id: Page identifier (default: "default")
delay: Delay between key presses in ms (default: 50)
"""
page = await _get_page(page_id)
try:
await page.waitForSelector(selector, timeout=5000)
await page.type(selector, text, delay=delay)
return {
"status": "typed",
"selector": selector,
"text_length": len(text),
}
except Exception as e:
return {"error": f"Failed to type into '{selector}': {str(e)}"}
@mcp.tool()
async def evaluate_javascript(
script: str,
page_id: str = "default",
) -> Dict[str, Any]:
"""
Execute JavaScript code in the browser context.
Args:
script: JavaScript code to evaluate
page_id: Page identifier (default: "default")
"""
page = await _get_page(page_id)
try:
result = await page.evaluate(script)
return {
"status": "evaluated",
"result": result,
}
except Exception as e:
return {"error": f"JS evaluation failed: {str(e)}"}
@mcp.tool()
async def generate_pdf(
page_id: str = "default",
format: str = "A4",
landscape: bool = False,
print_background: bool = True,
) -> Dict[str, Any]:
"""
Generate a PDF of the current page.
Args:
page_id: Page identifier (default: "default")
format: Paper format: 'A4', 'Letter', 'Legal', etc.
landscape: Whether to use landscape orientation
print_background: Whether to print background graphics
"""
page = await _get_page(page_id)
try:
pdf_bytes = await page.pdf({
"format": format,
"landscape": landscape,
"printBackground": print_background,
})
b64 = base64.b64encode(pdf_bytes).decode("utf-8")
return {
"status": "pdf_generated",
"format": format,
"base64": b64,
"url": page.url,
}
except Exception as e:
return {"error": f"PDF generation failed: {str(e)}"}
@mcp.tool()
async def wait_for_selector(
selector: str,
page_id: str = "default",
timeout: int = 10000,
visible: bool = True,
) -> Dict[str, Any]:
"""
Wait for an element to appear on the page.
Args:
selector: CSS selector to wait for
page_id: Page identifier (default: "default")
timeout: Maximum wait time in ms (default: 10000)
visible: Whether element must be visible (default: True)
"""
page = await _get_page(page_id)
try:
await page.waitForSelector(
selector, timeout=timeout, visible=visible
)
return {"status": "found", "selector": selector}
except Exception as e:
return {"error": f"Timeout waiting for '{selector}': {str(e)}"}
@mcp.tool()
async def list_pages() -> Dict[str, Any]:
"""List all open browser pages/tabs."""
result = {}
for pid, page in _pages.items():
if not page.isClosed():
result[pid] = {
"url": page.url,
"title": await page.title(),
}
return {"pages": result, "count": len(result)}
@mcp.tool()
async def close_page(page_id: str = "default") -> Dict[str, Any]:
"""
Close a browser page/tab.
Args:
page_id: Page identifier to close (default: "default")
"""
if page_id in _pages:
if not _pages[page_id].isClosed():
await _pages[page_id].close()
del _pages[page_id]
return {"status": "closed", "page_id": page_id}
return {"error": f"Page '{page_id}' not found"}