""" Puppeteer MCP Server ==================== MCP server providing headless browser automation via Pyppeteer. Supports navigation, screenshots, page content extraction, form filling, clicking elements, JavaScript evaluation, and PDF generation. """ import asyncio import base64 import json import os from typing import Optional, List, Dict, Any from mcp.server.fastmcp import FastMCP # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CHROMIUM_ARGS = os.environ.get("CHROMIUM_ARGS", "--no-sandbox,--disable-setuid-sandbox,--disable-dev-shm-usage,--disable-gpu").split(",") DEFAULT_VIEWPORT_WIDTH = int(os.environ.get("VIEWPORT_WIDTH", "1280")) DEFAULT_VIEWPORT_HEIGHT = int(os.environ.get("VIEWPORT_HEIGHT", "720")) DEFAULT_TIMEOUT = int(os.environ.get("PAGE_TIMEOUT", "30000")) # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- mcp = FastMCP("puppeteer_mcp") # --------------------------------------------------------------------------- # Browser management # --------------------------------------------------------------------------- _browser = None _pages: Dict[str, Any] = {} async def _get_browser(): """Get or create the browser instance.""" global _browser if _browser is None or not _browser.process: from pyppeteer import launch _browser = await launch( headless=True, args=CHROMIUM_ARGS, handleSIGINT=False, handleSIGTERM=False, handleSIGHUP=False, ) return _browser async def _get_page(page_id: str = "default") -> Any: """Get or create a page by ID.""" global _pages browser = await _get_browser() if page_id not in _pages or _pages[page_id].isClosed(): page = await browser.newPage() await page.setViewport({ "width": DEFAULT_VIEWPORT_WIDTH, "height": DEFAULT_VIEWPORT_HEIGHT, }) _pages[page_id] = page return _pages[page_id] # --------------------------------------------------------------------------- # Tools # --------------------------------------------------------------------------- @mcp.tool() async def navigate( url: str, page_id: str = "default", wait_until: str = "networkidle2", ) -> Dict[str, Any]: """ Navigate to a URL in the browser. Args: url: The URL to navigate to page_id: Identifier for the browser tab/page (default: "default") wait_until: When to consider navigation complete: 'load', 'domcontentloaded', 'networkidle0', 'networkidle2' """ page = await _get_page(page_id) response = await page.goto(url, waitUntil=wait_until, timeout=DEFAULT_TIMEOUT) return { "status": "navigated", "url": page.url, "status_code": response.status if response else None, "title": await page.title(), } @mcp.tool() async def screenshot( page_id: str = "default", full_page: bool = False, selector: Optional[str] = None, ) -> Dict[str, Any]: """ Take a screenshot of the current page or a specific element. Args: page_id: Page identifier (default: "default") full_page: Whether to capture the full scrollable page selector: Optional CSS selector to screenshot a specific element """ page = await _get_page(page_id) options: Dict[str, Any] = {"encoding": "binary"} if selector: element = await page.querySelector(selector) if not element: return {"error": f"Element not found: {selector}"} screenshot_bytes = await element.screenshot() else: options["fullPage"] = full_page screenshot_bytes = await page.screenshot(options) b64 = base64.b64encode(screenshot_bytes).decode("utf-8") return { "status": "screenshot_taken", "format": "png", "base64": b64, "url": page.url, } @mcp.tool() async def get_page_content( page_id: str = "default", content_type: str = "text", ) -> Dict[str, Any]: """ Get the content of the current page. Args: page_id: Page identifier (default: "default") content_type: 'text' for visible text, 'html' for full HTML source """ page = await _get_page(page_id) if content_type == "html": content = await page.content() else: content = await page.evaluate("() => document.body.innerText") return { "url": page.url, "title": await page.title(), "content_type": content_type, "content": content[:50000], # Limit to 50k chars "truncated": len(content) > 50000, } @mcp.tool() async def click( selector: str, page_id: str = "default", ) -> Dict[str, Any]: """ Click an element on the page. Args: selector: CSS selector for the element to click page_id: Page identifier (default: "default") """ page = await _get_page(page_id) try: await page.waitForSelector(selector, timeout=5000) await page.click(selector) await asyncio.sleep(0.5) # Brief wait for any navigation/updates return { "status": "clicked", "selector": selector, "url": page.url, } except Exception as e: return {"error": f"Failed to click '{selector}': {str(e)}"} @mcp.tool() async def type_text( selector: str, text: str, page_id: str = "default", delay: int = 50, ) -> Dict[str, Any]: """ Type text into an input field. Args: selector: CSS selector for the input element text: Text to type page_id: Page identifier (default: "default") delay: Delay between key presses in ms (default: 50) """ page = await _get_page(page_id) try: await page.waitForSelector(selector, timeout=5000) await page.type(selector, text, delay=delay) return { "status": "typed", "selector": selector, "text_length": len(text), } except Exception as e: return {"error": f"Failed to type into '{selector}': {str(e)}"} @mcp.tool() async def evaluate_javascript( script: str, page_id: str = "default", ) -> Dict[str, Any]: """ Execute JavaScript code in the browser context. Args: script: JavaScript code to evaluate page_id: Page identifier (default: "default") """ page = await _get_page(page_id) try: result = await page.evaluate(script) return { "status": "evaluated", "result": result, } except Exception as e: return {"error": f"JS evaluation failed: {str(e)}"} @mcp.tool() async def generate_pdf( page_id: str = "default", format: str = "A4", landscape: bool = False, print_background: bool = True, ) -> Dict[str, Any]: """ Generate a PDF of the current page. Args: page_id: Page identifier (default: "default") format: Paper format: 'A4', 'Letter', 'Legal', etc. landscape: Whether to use landscape orientation print_background: Whether to print background graphics """ page = await _get_page(page_id) try: pdf_bytes = await page.pdf({ "format": format, "landscape": landscape, "printBackground": print_background, }) b64 = base64.b64encode(pdf_bytes).decode("utf-8") return { "status": "pdf_generated", "format": format, "base64": b64, "url": page.url, } except Exception as e: return {"error": f"PDF generation failed: {str(e)}"} @mcp.tool() async def wait_for_selector( selector: str, page_id: str = "default", timeout: int = 10000, visible: bool = True, ) -> Dict[str, Any]: """ Wait for an element to appear on the page. Args: selector: CSS selector to wait for page_id: Page identifier (default: "default") timeout: Maximum wait time in ms (default: 10000) visible: Whether element must be visible (default: True) """ page = await _get_page(page_id) try: await page.waitForSelector( selector, timeout=timeout, visible=visible ) return {"status": "found", "selector": selector} except Exception as e: return {"error": f"Timeout waiting for '{selector}': {str(e)}"} @mcp.tool() async def list_pages() -> Dict[str, Any]: """List all open browser pages/tabs.""" result = {} for pid, page in _pages.items(): if not page.isClosed(): result[pid] = { "url": page.url, "title": await page.title(), } return {"pages": result, "count": len(result)} @mcp.tool() async def close_page(page_id: str = "default") -> Dict[str, Any]: """ Close a browser page/tab. Args: page_id: Page identifier to close (default: "default") """ if page_id in _pages: if not _pages[page_id].isClosed(): await _pages[page_id].close() del _pages[page_id] return {"status": "closed", "page_id": page_id} return {"error": f"Page '{page_id}' not found"}