diff --git a/linkedin-mcp/server.py b/linkedin-mcp/server.py new file mode 100644 index 0000000..1463de6 --- /dev/null +++ b/linkedin-mcp/server.py @@ -0,0 +1,441 @@ +""" +LinkedIn MCP Server +=================== +Uses LinkedIn REST API with OAuth 2.0. +Token is obtained via a one-time browser redirect (on any machine, not the server). +Once authorized, the token is stored in /data/linkedin_token.json and refreshed automatically. +""" + +import asyncio +import hashlib +import json +import logging +import os +import secrets +import time +from pathlib import Path + +import httpx +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse +from starlette.routing import Route +import uvicorn + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger("linkedin-mcp") + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +CLIENT_ID = os.environ.get("LINKEDIN_CLIENT_ID", "") +CLIENT_SECRET = os.environ.get("LINKEDIN_CLIENT_SECRET", "") +REDIRECT_URI = os.environ.get("LINKEDIN_REDIRECT_URI", "http://localhost:8500/linkedin/callback") +TOKEN_FILE = Path(os.environ.get("TOKEN_FILE", "/data/linkedin_token.json")) +PORT = int(os.environ.get("PORT", "8500")) + +TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True) + +# In-memory state store for OAuth PKCE +_oauth_states: dict[str, float] = {} + +# --------------------------------------------------------------------------- +# Token storage +# --------------------------------------------------------------------------- + +def load_token() -> dict | None: + if TOKEN_FILE.exists(): + try: + return json.loads(TOKEN_FILE.read_text()) + except Exception: + return None + return None + + +def save_token(token: dict): + TOKEN_FILE.write_text(json.dumps(token, indent=2)) + logger.info("LinkedIn token saved.") + + +def token_is_valid(token: dict) -> bool: + if not token: + return False + expires_at = token.get("expires_at", 0) + return time.time() < expires_at - 60 + + +async def refresh_token_if_needed(token: dict) -> dict | None: + """Attempt to refresh using refresh_token if available.""" + refresh_tok = token.get("refresh_token") + if not refresh_tok: + return None + async with httpx.AsyncClient() as client: + resp = await client.post( + "https://www.linkedin.com/oauth/v2/accessToken", + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_tok, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + }, + ) + if resp.status_code == 200: + data = resp.json() + data["expires_at"] = time.time() + data.get("expires_in", 3600) + # Preserve old refresh token if new one not provided + if "refresh_token" not in data: + data["refresh_token"] = refresh_tok + save_token(data) + return data + logger.warning(f"Token refresh failed: {resp.status_code} {resp.text}") + return None + + +async def get_valid_token() -> str | None: + token = load_token() + if not token: + return None + if token_is_valid(token): + return token["access_token"] + refreshed = await refresh_token_if_needed(token) + if refreshed and token_is_valid(refreshed): + return refreshed["access_token"] + return None + + +# --------------------------------------------------------------------------- +# LinkedIn API helpers +# --------------------------------------------------------------------------- + +async def li_get(path: str, params: dict = None) -> dict: + access_token = await get_valid_token() + if not access_token: + return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} + headers = { + "Authorization": f"Bearer {access_token}", + "LinkedIn-Version": "202401", + "X-Restli-Protocol-Version": "2.0.0", + } + async with httpx.AsyncClient() as client: + resp = await client.get( + f"https://api.linkedin.com{path}", + headers=headers, + params=params or {}, + timeout=15, + ) + if resp.status_code == 200: + return resp.json() + return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} + + +async def li_post(path: str, body: dict) -> dict: + access_token = await get_valid_token() + if not access_token: + return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} + headers = { + "Authorization": f"Bearer {access_token}", + "LinkedIn-Version": "202401", + "X-Restli-Protocol-Version": "2.0.0", + "Content-Type": "application/json", + } + async with httpx.AsyncClient() as client: + resp = await client.post( + f"https://api.linkedin.com{path}", + headers=headers, + json=body, + timeout=15, + ) + if resp.status_code in (200, 201): + try: + return resp.json() + except Exception: + return {"status": "success"} + return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} + + +# --------------------------------------------------------------------------- +# OAuth flow endpoints +# --------------------------------------------------------------------------- + +async def auth_start(request: Request): + """Redirect user to LinkedIn OAuth consent screen.""" + if not CLIENT_ID: + return HTMLResponse("
{request.query_params.get('error_description', '')}
") + + # Validate state + expected_expiry = _oauth_states.pop(state, None) + if not expected_expiry or time.time() > expected_expiry: + return HTMLResponse("{resp.text}")
+
+ token = resp.json()
+ token["expires_at"] = time.time() + token.get("expires_in", 3600)
+ save_token(token)
+
+ return HTMLResponse("""
+
+ Your LinkedIn account has been authorized successfully.
+You can close this tab. The MCP server is now ready to use.
+ + """) + + +async def auth_status(request: Request): + """Show current auth status.""" + token = load_token() + if not token: + return HTMLResponse(f""" + +LinkedIn has not been connected yet.
+ + Connect LinkedIn + + + """) + + valid = token_is_valid(token) + expires = time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(token.get("expires_at", 0))) + status_color = "#00aa00" if valid else "#cc0000" + status_text = "✅ Token Valid" if valid else "⚠️ Token Expired" + + return HTMLResponse(f""" + +Token expires: {expires}
+ + Re-authorize + + + """) + + +# --------------------------------------------------------------------------- +# MCP Tool handlers (JSON-RPC over HTTP) +# --------------------------------------------------------------------------- + +TOOLS = [ + { + "name": "linkedin_get_profile", + "description": "Get the authenticated user's LinkedIn profile (name, headline, industry, location).", + "inputSchema": {"type": "object", "properties": {}, "required": []}, + }, + { + "name": "linkedin_search_people", + "description": "Search for LinkedIn members by keywords.", + "inputSchema": { + "type": "object", + "properties": { + "keywords": {"type": "string", "description": "Search keywords (name, title, company)"}, + "count": {"type": "integer", "description": "Number of results (max 10)", "default": 5}, + }, + "required": ["keywords"], + }, + }, + { + "name": "linkedin_create_post", + "description": "Create a LinkedIn post on behalf of the authenticated user.", + "inputSchema": { + "type": "object", + "properties": { + "text": {"type": "string", "description": "The text content of the post"}, + "visibility": { + "type": "string", + "description": "Who can see the post: PUBLIC or CONNECTIONS", + "default": "PUBLIC", + }, + }, + "required": ["text"], + }, + }, + { + "name": "linkedin_get_connections", + "description": "Get a list of the authenticated user's 1st-degree connections.", + "inputSchema": { + "type": "object", + "properties": { + "count": {"type": "integer", "description": "Number of connections to retrieve (max 50)", "default": 10} + }, + "required": [], + }, + }, +] + + +async def tool_get_profile(_args: dict) -> dict: + data = await li_get("/v2/userinfo") + if "error" in data: + return data + return { + "name": data.get("name", ""), + "given_name": data.get("given_name", ""), + "family_name": data.get("family_name", ""), + "email": data.get("email", ""), + "picture": data.get("picture", ""), + "locale": data.get("locale", ""), + } + + +async def tool_search_people(args: dict) -> dict: + keywords = args.get("keywords", "") + count = min(int(args.get("count", 5)), 10) + data = await li_get("/v2/search", params={ + "q": "people", + "keywords": keywords, + "count": count, + }) + return data + + +async def tool_create_post(args: dict) -> dict: + text = args.get("text", "") + visibility = args.get("visibility", "PUBLIC").upper() + + # First get author URN + profile = await li_get("/v2/userinfo") + if "error" in profile: + return profile + + # LinkedIn uses sub as person ID + person_id = profile.get("sub", "") + author_urn = f"urn:li:person:{person_id}" + + body = { + "author": author_urn, + "lifecycleState": "PUBLISHED", + "specificContent": { + "com.linkedin.ugc.ShareContent": { + "shareCommentary": {"text": text}, + "shareMediaCategory": "NONE", + } + }, + "visibility": { + "com.linkedin.ugc.MemberNetworkVisibility": visibility + }, + } + result = await li_post("/v2/ugcPosts", body) + return result + + +async def tool_get_connections(args: dict) -> dict: + count = min(int(args.get("count", 10)), 50) + data = await li_get("/v2/connections", params={"q": "viewer", "count": count}) + return data + + +TOOL_HANDLERS = { + "linkedin_get_profile": tool_get_profile, + "linkedin_search_people": tool_search_people, + "linkedin_create_post": tool_create_post, + "linkedin_get_connections": tool_get_connections, +} + + +# --------------------------------------------------------------------------- +# MCP HTTP endpoint +# --------------------------------------------------------------------------- + +async def handle_mcp(request: Request): + if request.method == "GET": + # SSE initialize + return JSONResponse({"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}}, "id": 0}) + + body = await request.json() + method = body.get("method") + req_id = body.get("id") + params = body.get("params", {}) + + if method == "initialize": + return JSONResponse({"jsonrpc": "2.0", "result": { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "linkedin", "version": "1.0.0"}, + }, "id": req_id}) + + elif method == "tools/list": + return JSONResponse({"jsonrpc": "2.0", "result": {"tools": TOOLS}, "id": req_id}) + + elif method == "tools/call": + tool_name = params.get("name") + tool_args = params.get("arguments", {}) + handler = TOOL_HANDLERS.get(tool_name) + if not handler: + return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": req_id}) + try: + result = await handler(tool_args) + return JSONResponse({"jsonrpc": "2.0", "result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}, "id": req_id}) + except Exception as e: + return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id}) + + elif method == "ping": + return JSONResponse({"jsonrpc": "2.0", "result": {}, "id": req_id}) + + return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id}) + + +async def health(request: Request): + token = load_token() + valid = token_is_valid(token) if token else False + return JSONResponse({ + "status": "ok", + "authenticated": valid, + "auth_url": "/linkedin/auth", + }) + + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- + +routes = [ + Route("/mcp", handle_mcp, methods=["GET", "POST"]), + Route("/health", health, methods=["GET"]), + Route("/linkedin/auth", auth_start, methods=["GET"]), + Route("/linkedin/callback", auth_callback, methods=["GET"]), + Route("/linkedin/status", auth_status, methods=["GET"]), +] + +app = Starlette(routes=routes) + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")