Add linkedin-mcp/server.py

This commit is contained in:
zgaetano 2026-03-31 15:33:44 -04:00
parent 9cc1e0ea51
commit c4c9df329a

441
linkedin-mcp/server.py Normal file
View file

@ -0,0 +1,441 @@
"""
LinkedIn MCP Server
===================
Uses LinkedIn REST API with OAuth 2.0.
Token is obtained via a one-time browser redirect (on any machine, not the server).
Once authorized, the token is stored in /data/linkedin_token.json and refreshed automatically.
"""
import asyncio
import hashlib
import json
import logging
import os
import secrets
import time
from pathlib import Path
import httpx
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse
from starlette.routing import Route
import uvicorn
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("linkedin-mcp")
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
CLIENT_ID = os.environ.get("LINKEDIN_CLIENT_ID", "")
CLIENT_SECRET = os.environ.get("LINKEDIN_CLIENT_SECRET", "")
REDIRECT_URI = os.environ.get("LINKEDIN_REDIRECT_URI", "http://localhost:8500/linkedin/callback")
TOKEN_FILE = Path(os.environ.get("TOKEN_FILE", "/data/linkedin_token.json"))
PORT = int(os.environ.get("PORT", "8500"))
TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
# In-memory state store for OAuth PKCE
_oauth_states: dict[str, float] = {}
# ---------------------------------------------------------------------------
# Token storage
# ---------------------------------------------------------------------------
def load_token() -> dict | None:
if TOKEN_FILE.exists():
try:
return json.loads(TOKEN_FILE.read_text())
except Exception:
return None
return None
def save_token(token: dict):
TOKEN_FILE.write_text(json.dumps(token, indent=2))
logger.info("LinkedIn token saved.")
def token_is_valid(token: dict) -> bool:
if not token:
return False
expires_at = token.get("expires_at", 0)
return time.time() < expires_at - 60
async def refresh_token_if_needed(token: dict) -> dict | None:
"""Attempt to refresh using refresh_token if available."""
refresh_tok = token.get("refresh_token")
if not refresh_tok:
return None
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://www.linkedin.com/oauth/v2/accessToken",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_tok,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
if resp.status_code == 200:
data = resp.json()
data["expires_at"] = time.time() + data.get("expires_in", 3600)
# Preserve old refresh token if new one not provided
if "refresh_token" not in data:
data["refresh_token"] = refresh_tok
save_token(data)
return data
logger.warning(f"Token refresh failed: {resp.status_code} {resp.text}")
return None
async def get_valid_token() -> str | None:
token = load_token()
if not token:
return None
if token_is_valid(token):
return token["access_token"]
refreshed = await refresh_token_if_needed(token)
if refreshed and token_is_valid(refreshed):
return refreshed["access_token"]
return None
# ---------------------------------------------------------------------------
# LinkedIn API helpers
# ---------------------------------------------------------------------------
async def li_get(path: str, params: dict = None) -> dict:
access_token = await get_valid_token()
if not access_token:
return {"error": "Not authenticated. Visit /linkedin/auth to authorize."}
headers = {
"Authorization": f"Bearer {access_token}",
"LinkedIn-Version": "202401",
"X-Restli-Protocol-Version": "2.0.0",
}
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://api.linkedin.com{path}",
headers=headers,
params=params or {},
timeout=15,
)
if resp.status_code == 200:
return resp.json()
return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text}
async def li_post(path: str, body: dict) -> dict:
access_token = await get_valid_token()
if not access_token:
return {"error": "Not authenticated. Visit /linkedin/auth to authorize."}
headers = {
"Authorization": f"Bearer {access_token}",
"LinkedIn-Version": "202401",
"X-Restli-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://api.linkedin.com{path}",
headers=headers,
json=body,
timeout=15,
)
if resp.status_code in (200, 201):
try:
return resp.json()
except Exception:
return {"status": "success"}
return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text}
# ---------------------------------------------------------------------------
# OAuth flow endpoints
# ---------------------------------------------------------------------------
async def auth_start(request: Request):
"""Redirect user to LinkedIn OAuth consent screen."""
if not CLIENT_ID:
return HTMLResponse("<h2>Error: LINKEDIN_CLIENT_ID not configured.</h2>", status_code=500)
state = secrets.token_urlsafe(16)
_oauth_states[state] = time.time() + 600 # expires in 10 min
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"state": state,
"scope": "openid profile email w_member_social",
}
from urllib.parse import urlencode
url = "https://www.linkedin.com/oauth/v2/authorization?" + urlencode(params)
return RedirectResponse(url)
async def auth_callback(request: Request):
"""Handle OAuth callback, exchange code for token."""
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
if error:
return HTMLResponse(f"<h2>LinkedIn auth error: {error}</h2><p>{request.query_params.get('error_description', '')}</p>")
# Validate state
expected_expiry = _oauth_states.pop(state, None)
if not expected_expiry or time.time() > expected_expiry:
return HTMLResponse("<h2>Invalid or expired state. Please try again.</h2>")
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://www.linkedin.com/oauth/v2/accessToken",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
if resp.status_code != 200:
return HTMLResponse(f"<h2>Token exchange failed</h2><pre>{resp.text}</pre>")
token = resp.json()
token["expires_at"] = time.time() + token.get("expires_in", 3600)
save_token(token)
return HTMLResponse("""
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
<h2 style="color:#0a66c2"> LinkedIn Connected!</h2>
<p>Your LinkedIn account has been authorized successfully.</p>
<p>You can close this tab. The MCP server is now ready to use.</p>
</body></html>
""")
async def auth_status(request: Request):
"""Show current auth status."""
token = load_token()
if not token:
return HTMLResponse(f"""
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
<h2 style="color:#cc0000"> Not Authorized</h2>
<p>LinkedIn has not been connected yet.</p>
<a href="/linkedin/auth" style="background:#0a66c2;color:white;padding:12px 24px;border-radius:6px;text-decoration:none;font-size:16px">
Connect LinkedIn
</a>
</body></html>
""")
valid = token_is_valid(token)
expires = time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(token.get("expires_at", 0)))
status_color = "#00aa00" if valid else "#cc0000"
status_text = "✅ Token Valid" if valid else "⚠️ Token Expired"
return HTMLResponse(f"""
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
<h2 style="color:{status_color}">{status_text}</h2>
<p>Token expires: {expires}</p>
<a href="/linkedin/auth" style="background:#0a66c2;color:white;padding:12px 24px;border-radius:6px;text-decoration:none;font-size:16px">
Re-authorize
</a>
</body></html>
""")
# ---------------------------------------------------------------------------
# MCP Tool handlers (JSON-RPC over HTTP)
# ---------------------------------------------------------------------------
TOOLS = [
{
"name": "linkedin_get_profile",
"description": "Get the authenticated user's LinkedIn profile (name, headline, industry, location).",
"inputSchema": {"type": "object", "properties": {}, "required": []},
},
{
"name": "linkedin_search_people",
"description": "Search for LinkedIn members by keywords.",
"inputSchema": {
"type": "object",
"properties": {
"keywords": {"type": "string", "description": "Search keywords (name, title, company)"},
"count": {"type": "integer", "description": "Number of results (max 10)", "default": 5},
},
"required": ["keywords"],
},
},
{
"name": "linkedin_create_post",
"description": "Create a LinkedIn post on behalf of the authenticated user.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "The text content of the post"},
"visibility": {
"type": "string",
"description": "Who can see the post: PUBLIC or CONNECTIONS",
"default": "PUBLIC",
},
},
"required": ["text"],
},
},
{
"name": "linkedin_get_connections",
"description": "Get a list of the authenticated user's 1st-degree connections.",
"inputSchema": {
"type": "object",
"properties": {
"count": {"type": "integer", "description": "Number of connections to retrieve (max 50)", "default": 10}
},
"required": [],
},
},
]
async def tool_get_profile(_args: dict) -> dict:
data = await li_get("/v2/userinfo")
if "error" in data:
return data
return {
"name": data.get("name", ""),
"given_name": data.get("given_name", ""),
"family_name": data.get("family_name", ""),
"email": data.get("email", ""),
"picture": data.get("picture", ""),
"locale": data.get("locale", ""),
}
async def tool_search_people(args: dict) -> dict:
keywords = args.get("keywords", "")
count = min(int(args.get("count", 5)), 10)
data = await li_get("/v2/search", params={
"q": "people",
"keywords": keywords,
"count": count,
})
return data
async def tool_create_post(args: dict) -> dict:
text = args.get("text", "")
visibility = args.get("visibility", "PUBLIC").upper()
# First get author URN
profile = await li_get("/v2/userinfo")
if "error" in profile:
return profile
# LinkedIn uses sub as person ID
person_id = profile.get("sub", "")
author_urn = f"urn:li:person:{person_id}"
body = {
"author": author_urn,
"lifecycleState": "PUBLISHED",
"specificContent": {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {"text": text},
"shareMediaCategory": "NONE",
}
},
"visibility": {
"com.linkedin.ugc.MemberNetworkVisibility": visibility
},
}
result = await li_post("/v2/ugcPosts", body)
return result
async def tool_get_connections(args: dict) -> dict:
count = min(int(args.get("count", 10)), 50)
data = await li_get("/v2/connections", params={"q": "viewer", "count": count})
return data
TOOL_HANDLERS = {
"linkedin_get_profile": tool_get_profile,
"linkedin_search_people": tool_search_people,
"linkedin_create_post": tool_create_post,
"linkedin_get_connections": tool_get_connections,
}
# ---------------------------------------------------------------------------
# MCP HTTP endpoint
# ---------------------------------------------------------------------------
async def handle_mcp(request: Request):
if request.method == "GET":
# SSE initialize
return JSONResponse({"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}}, "id": 0})
body = await request.json()
method = body.get("method")
req_id = body.get("id")
params = body.get("params", {})
if method == "initialize":
return JSONResponse({"jsonrpc": "2.0", "result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "linkedin", "version": "1.0.0"},
}, "id": req_id})
elif method == "tools/list":
return JSONResponse({"jsonrpc": "2.0", "result": {"tools": TOOLS}, "id": req_id})
elif method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments", {})
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": req_id})
try:
result = await handler(tool_args)
return JSONResponse({"jsonrpc": "2.0", "result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}, "id": req_id})
except Exception as e:
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id})
elif method == "ping":
return JSONResponse({"jsonrpc": "2.0", "result": {}, "id": req_id})
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id})
async def health(request: Request):
token = load_token()
valid = token_is_valid(token) if token else False
return JSONResponse({
"status": "ok",
"authenticated": valid,
"auth_url": "/linkedin/auth",
})
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
routes = [
Route("/mcp", handle_mcp, methods=["GET", "POST"]),
Route("/health", health, methods=["GET"]),
Route("/linkedin/auth", auth_start, methods=["GET"]),
Route("/linkedin/callback", auth_callback, methods=["GET"]),
Route("/linkedin/status", auth_status, methods=["GET"]),
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")