""" LinkedIn MCP Server =================== Uses LinkedIn REST API with OAuth 2.0. Token is obtained via a one-time browser redirect (on any machine, not the server). Once authorized, the token is stored in /data/linkedin_token.json and refreshed automatically. """ import asyncio import hashlib import json import logging import os import secrets import time from pathlib import Path import httpx from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse from starlette.routing import Route import uvicorn logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("linkedin-mcp") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- CLIENT_ID = os.environ.get("LINKEDIN_CLIENT_ID", "") CLIENT_SECRET = os.environ.get("LINKEDIN_CLIENT_SECRET", "") REDIRECT_URI = os.environ.get("LINKEDIN_REDIRECT_URI", "http://localhost:8500/linkedin/callback") TOKEN_FILE = Path(os.environ.get("TOKEN_FILE", "/data/linkedin_token.json")) PORT = int(os.environ.get("PORT", "8500")) TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True) # In-memory state store for OAuth PKCE _oauth_states: dict[str, float] = {} # --------------------------------------------------------------------------- # Token storage # --------------------------------------------------------------------------- def load_token() -> dict | None: if TOKEN_FILE.exists(): try: return json.loads(TOKEN_FILE.read_text()) except Exception: return None return None def save_token(token: dict): TOKEN_FILE.write_text(json.dumps(token, indent=2)) logger.info("LinkedIn token saved.") def token_is_valid(token: dict) -> bool: if not token: return False expires_at = token.get("expires_at", 0) return time.time() < expires_at - 60 async def refresh_token_if_needed(token: dict) -> dict | None: """Attempt to refresh using refresh_token if available.""" refresh_tok = token.get("refresh_token") if not refresh_tok: return None async with httpx.AsyncClient() as client: resp = await client.post( "https://www.linkedin.com/oauth/v2/accessToken", data={ "grant_type": "refresh_token", "refresh_token": refresh_tok, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, }, ) if resp.status_code == 200: data = resp.json() data["expires_at"] = time.time() + data.get("expires_in", 3600) # Preserve old refresh token if new one not provided if "refresh_token" not in data: data["refresh_token"] = refresh_tok save_token(data) return data logger.warning(f"Token refresh failed: {resp.status_code} {resp.text}") return None async def get_valid_token() -> str | None: token = load_token() if not token: return None if token_is_valid(token): return token["access_token"] refreshed = await refresh_token_if_needed(token) if refreshed and token_is_valid(refreshed): return refreshed["access_token"] return None # --------------------------------------------------------------------------- # LinkedIn API helpers # --------------------------------------------------------------------------- async def li_get(path: str, params: dict = None) -> dict: access_token = await get_valid_token() if not access_token: return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} headers = { "Authorization": f"Bearer {access_token}", "LinkedIn-Version": "202401", "X-Restli-Protocol-Version": "2.0.0", } async with httpx.AsyncClient() as client: resp = await client.get( f"https://api.linkedin.com{path}", headers=headers, params=params or {}, timeout=15, ) if resp.status_code == 200: return resp.json() return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} async def li_post(path: str, body: dict) -> dict: access_token = await get_valid_token() if not access_token: return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} headers = { "Authorization": f"Bearer {access_token}", "LinkedIn-Version": "202401", "X-Restli-Protocol-Version": "2.0.0", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: resp = await client.post( f"https://api.linkedin.com{path}", headers=headers, json=body, timeout=15, ) if resp.status_code in (200, 201): try: return resp.json() except Exception: return {"status": "success"} return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} # --------------------------------------------------------------------------- # OAuth flow endpoints # --------------------------------------------------------------------------- async def auth_start(request: Request): """Redirect user to LinkedIn OAuth consent screen.""" if not CLIENT_ID: return HTMLResponse("

Error: LINKEDIN_CLIENT_ID not configured.

", status_code=500) state = secrets.token_urlsafe(16) _oauth_states[state] = time.time() + 600 # expires in 10 min params = { "response_type": "code", "client_id": CLIENT_ID, "redirect_uri": REDIRECT_URI, "state": state, "scope": "openid profile email w_member_social", } from urllib.parse import urlencode url = "https://www.linkedin.com/oauth/v2/authorization?" + urlencode(params) return RedirectResponse(url) async def auth_callback(request: Request): """Handle OAuth callback, exchange code for token.""" code = request.query_params.get("code") state = request.query_params.get("state") error = request.query_params.get("error") if error: return HTMLResponse(f"

LinkedIn auth error: {error}

{request.query_params.get('error_description', '')}

") # Validate state expected_expiry = _oauth_states.pop(state, None) if not expected_expiry or time.time() > expected_expiry: return HTMLResponse("

Invalid or expired state. Please try again.

") async with httpx.AsyncClient() as client: resp = await client.post( "https://www.linkedin.com/oauth/v2/accessToken", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, }, ) if resp.status_code != 200: return HTMLResponse(f"

Token exchange failed

{resp.text}
") token = resp.json() token["expires_at"] = time.time() + token.get("expires_in", 3600) save_token(token) return HTMLResponse("""

✅ LinkedIn Connected!

Your LinkedIn account has been authorized successfully.

You can close this tab. The MCP server is now ready to use.

""") async def auth_status(request: Request): """Show current auth status.""" token = load_token() if not token: return HTMLResponse(f"""

❌ Not Authorized

LinkedIn has not been connected yet.

Connect LinkedIn """) valid = token_is_valid(token) expires = time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(token.get("expires_at", 0))) status_color = "#00aa00" if valid else "#cc0000" status_text = "✅ Token Valid" if valid else "⚠️ Token Expired" return HTMLResponse(f"""

{status_text}

Token expires: {expires}

Re-authorize """) # --------------------------------------------------------------------------- # MCP Tool handlers (JSON-RPC over HTTP) # --------------------------------------------------------------------------- TOOLS = [ { "name": "linkedin_get_profile", "description": "Get the authenticated user's LinkedIn profile (name, headline, industry, location).", "inputSchema": {"type": "object", "properties": {}, "required": []}, }, { "name": "linkedin_search_people", "description": "Search for LinkedIn members by keywords.", "inputSchema": { "type": "object", "properties": { "keywords": {"type": "string", "description": "Search keywords (name, title, company)"}, "count": {"type": "integer", "description": "Number of results (max 10)", "default": 5}, }, "required": ["keywords"], }, }, { "name": "linkedin_create_post", "description": "Create a LinkedIn post on behalf of the authenticated user.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "The text content of the post"}, "visibility": { "type": "string", "description": "Who can see the post: PUBLIC or CONNECTIONS", "default": "PUBLIC", }, }, "required": ["text"], }, }, { "name": "linkedin_get_connections", "description": "Get a list of the authenticated user's 1st-degree connections.", "inputSchema": { "type": "object", "properties": { "count": {"type": "integer", "description": "Number of connections to retrieve (max 50)", "default": 10} }, "required": [], }, }, ] async def tool_get_profile(_args: dict) -> dict: data = await li_get("/v2/userinfo") if "error" in data: return data return { "name": data.get("name", ""), "given_name": data.get("given_name", ""), "family_name": data.get("family_name", ""), "email": data.get("email", ""), "picture": data.get("picture", ""), "locale": data.get("locale", ""), } async def tool_search_people(args: dict) -> dict: keywords = args.get("keywords", "") count = min(int(args.get("count", 5)), 10) data = await li_get("/v2/search", params={ "q": "people", "keywords": keywords, "count": count, }) return data async def tool_create_post(args: dict) -> dict: text = args.get("text", "") visibility = args.get("visibility", "PUBLIC").upper() # First get author URN profile = await li_get("/v2/userinfo") if "error" in profile: return profile # LinkedIn uses sub as person ID person_id = profile.get("sub", "") author_urn = f"urn:li:person:{person_id}" body = { "author": author_urn, "lifecycleState": "PUBLISHED", "specificContent": { "com.linkedin.ugc.ShareContent": { "shareCommentary": {"text": text}, "shareMediaCategory": "NONE", } }, "visibility": { "com.linkedin.ugc.MemberNetworkVisibility": visibility }, } result = await li_post("/v2/ugcPosts", body) return result async def tool_get_connections(args: dict) -> dict: count = min(int(args.get("count", 10)), 50) data = await li_get("/v2/connections", params={"q": "viewer", "count": count}) return data TOOL_HANDLERS = { "linkedin_get_profile": tool_get_profile, "linkedin_search_people": tool_search_people, "linkedin_create_post": tool_create_post, "linkedin_get_connections": tool_get_connections, } # --------------------------------------------------------------------------- # MCP HTTP endpoint # --------------------------------------------------------------------------- async def handle_mcp(request: Request): if request.method == "GET": # SSE initialize return JSONResponse({"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}}, "id": 0}) body = await request.json() method = body.get("method") req_id = body.get("id") params = body.get("params", {}) if method == "initialize": return JSONResponse({"jsonrpc": "2.0", "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}, }, "id": req_id}) elif method == "tools/list": return JSONResponse({"jsonrpc": "2.0", "result": {"tools": TOOLS}, "id": req_id}) elif method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments", {}) handler = TOOL_HANDLERS.get(tool_name) if not handler: return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": req_id}) try: result = await handler(tool_args) return JSONResponse({"jsonrpc": "2.0", "result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}, "id": req_id}) except Exception as e: return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id}) elif method == "ping": return JSONResponse({"jsonrpc": "2.0", "result": {}, "id": req_id}) return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id}) async def health(request: Request): token = load_token() valid = token_is_valid(token) if token else False return JSONResponse({ "status": "ok", "authenticated": valid, "auth_url": "/linkedin/auth", }) # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- routes = [ Route("/mcp", handle_mcp, methods=["GET", "POST"]), Route("/health", health, methods=["GET"]), Route("/linkedin/auth", auth_start, methods=["GET"]), Route("/linkedin/callback", auth_callback, methods=["GET"]), Route("/linkedin/status", auth_status, methods=["GET"]), ] app = Starlette(routes=routes) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")