441 lines
15 KiB
Python
441 lines
15 KiB
Python
"""
|
|
LinkedIn MCP Server
|
|
===================
|
|
Uses LinkedIn REST API with OAuth 2.0.
|
|
Token is obtained via a one-time browser redirect (on any machine, not the server).
|
|
Once authorized, the token is stored in /data/linkedin_token.json and refreshed automatically.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import secrets
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse
|
|
from starlette.routing import Route
|
|
import uvicorn
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("linkedin-mcp")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CLIENT_ID = os.environ.get("LINKEDIN_CLIENT_ID", "")
|
|
CLIENT_SECRET = os.environ.get("LINKEDIN_CLIENT_SECRET", "")
|
|
REDIRECT_URI = os.environ.get("LINKEDIN_REDIRECT_URI", "http://localhost:8500/linkedin/callback")
|
|
TOKEN_FILE = Path(os.environ.get("TOKEN_FILE", "/data/linkedin_token.json"))
|
|
PORT = int(os.environ.get("PORT", "8500"))
|
|
|
|
TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# In-memory state store for OAuth PKCE
|
|
_oauth_states: dict[str, float] = {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token storage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_token() -> dict | None:
|
|
if TOKEN_FILE.exists():
|
|
try:
|
|
return json.loads(TOKEN_FILE.read_text())
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def save_token(token: dict):
|
|
TOKEN_FILE.write_text(json.dumps(token, indent=2))
|
|
logger.info("LinkedIn token saved.")
|
|
|
|
|
|
def token_is_valid(token: dict) -> bool:
|
|
if not token:
|
|
return False
|
|
expires_at = token.get("expires_at", 0)
|
|
return time.time() < expires_at - 60
|
|
|
|
|
|
async def refresh_token_if_needed(token: dict) -> dict | None:
|
|
"""Attempt to refresh using refresh_token if available."""
|
|
refresh_tok = token.get("refresh_token")
|
|
if not refresh_tok:
|
|
return None
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
"https://www.linkedin.com/oauth/v2/accessToken",
|
|
data={
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": refresh_tok,
|
|
"client_id": CLIENT_ID,
|
|
"client_secret": CLIENT_SECRET,
|
|
},
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
data["expires_at"] = time.time() + data.get("expires_in", 3600)
|
|
# Preserve old refresh token if new one not provided
|
|
if "refresh_token" not in data:
|
|
data["refresh_token"] = refresh_tok
|
|
save_token(data)
|
|
return data
|
|
logger.warning(f"Token refresh failed: {resp.status_code} {resp.text}")
|
|
return None
|
|
|
|
|
|
async def get_valid_token() -> str | None:
|
|
token = load_token()
|
|
if not token:
|
|
return None
|
|
if token_is_valid(token):
|
|
return token["access_token"]
|
|
refreshed = await refresh_token_if_needed(token)
|
|
if refreshed and token_is_valid(refreshed):
|
|
return refreshed["access_token"]
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LinkedIn API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def li_get(path: str, params: dict = None) -> dict:
|
|
access_token = await get_valid_token()
|
|
if not access_token:
|
|
return {"error": "Not authenticated. Visit /linkedin/auth to authorize."}
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"LinkedIn-Version": "202401",
|
|
"X-Restli-Protocol-Version": "2.0.0",
|
|
}
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.get(
|
|
f"https://api.linkedin.com{path}",
|
|
headers=headers,
|
|
params=params or {},
|
|
timeout=15,
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text}
|
|
|
|
|
|
async def li_post(path: str, body: dict) -> dict:
|
|
access_token = await get_valid_token()
|
|
if not access_token:
|
|
return {"error": "Not authenticated. Visit /linkedin/auth to authorize."}
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"LinkedIn-Version": "202401",
|
|
"X-Restli-Protocol-Version": "2.0.0",
|
|
"Content-Type": "application/json",
|
|
}
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
f"https://api.linkedin.com{path}",
|
|
headers=headers,
|
|
json=body,
|
|
timeout=15,
|
|
)
|
|
if resp.status_code in (200, 201):
|
|
try:
|
|
return resp.json()
|
|
except Exception:
|
|
return {"status": "success"}
|
|
return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OAuth flow endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def auth_start(request: Request):
|
|
"""Redirect user to LinkedIn OAuth consent screen."""
|
|
if not CLIENT_ID:
|
|
return HTMLResponse("<h2>Error: LINKEDIN_CLIENT_ID not configured.</h2>", status_code=500)
|
|
|
|
state = secrets.token_urlsafe(16)
|
|
_oauth_states[state] = time.time() + 600 # expires in 10 min
|
|
|
|
params = {
|
|
"response_type": "code",
|
|
"client_id": CLIENT_ID,
|
|
"redirect_uri": REDIRECT_URI,
|
|
"state": state,
|
|
"scope": "openid profile email w_member_social",
|
|
}
|
|
from urllib.parse import urlencode
|
|
url = "https://www.linkedin.com/oauth/v2/authorization?" + urlencode(params)
|
|
return RedirectResponse(url)
|
|
|
|
|
|
async def auth_callback(request: Request):
|
|
"""Handle OAuth callback, exchange code for token."""
|
|
code = request.query_params.get("code")
|
|
state = request.query_params.get("state")
|
|
error = request.query_params.get("error")
|
|
|
|
if error:
|
|
return HTMLResponse(f"<h2>LinkedIn auth error: {error}</h2><p>{request.query_params.get('error_description', '')}</p>")
|
|
|
|
# Validate state
|
|
expected_expiry = _oauth_states.pop(state, None)
|
|
if not expected_expiry or time.time() > expected_expiry:
|
|
return HTMLResponse("<h2>Invalid or expired state. Please try again.</h2>")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
"https://www.linkedin.com/oauth/v2/accessToken",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": REDIRECT_URI,
|
|
"client_id": CLIENT_ID,
|
|
"client_secret": CLIENT_SECRET,
|
|
},
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
return HTMLResponse(f"<h2>Token exchange failed</h2><pre>{resp.text}</pre>")
|
|
|
|
token = resp.json()
|
|
token["expires_at"] = time.time() + token.get("expires_in", 3600)
|
|
save_token(token)
|
|
|
|
return HTMLResponse("""
|
|
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
|
|
<h2 style="color:#0a66c2">✅ LinkedIn Connected!</h2>
|
|
<p>Your LinkedIn account has been authorized successfully.</p>
|
|
<p>You can close this tab. The MCP server is now ready to use.</p>
|
|
</body></html>
|
|
""")
|
|
|
|
|
|
async def auth_status(request: Request):
|
|
"""Show current auth status."""
|
|
token = load_token()
|
|
if not token:
|
|
return HTMLResponse(f"""
|
|
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
|
|
<h2 style="color:#cc0000">❌ Not Authorized</h2>
|
|
<p>LinkedIn has not been connected yet.</p>
|
|
<a href="/linkedin/auth" style="background:#0a66c2;color:white;padding:12px 24px;border-radius:6px;text-decoration:none;font-size:16px">
|
|
Connect LinkedIn
|
|
</a>
|
|
</body></html>
|
|
""")
|
|
|
|
valid = token_is_valid(token)
|
|
expires = time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(token.get("expires_at", 0)))
|
|
status_color = "#00aa00" if valid else "#cc0000"
|
|
status_text = "✅ Token Valid" if valid else "⚠️ Token Expired"
|
|
|
|
return HTMLResponse(f"""
|
|
<html><body style="font-family:sans-serif;max-width:600px;margin:60px auto;text-align:center">
|
|
<h2 style="color:{status_color}">{status_text}</h2>
|
|
<p>Token expires: {expires}</p>
|
|
<a href="/linkedin/auth" style="background:#0a66c2;color:white;padding:12px 24px;border-radius:6px;text-decoration:none;font-size:16px">
|
|
Re-authorize
|
|
</a>
|
|
</body></html>
|
|
""")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP Tool handlers (JSON-RPC over HTTP)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TOOLS = [
|
|
{
|
|
"name": "linkedin_get_profile",
|
|
"description": "Get the authenticated user's LinkedIn profile (name, headline, industry, location).",
|
|
"inputSchema": {"type": "object", "properties": {}, "required": []},
|
|
},
|
|
{
|
|
"name": "linkedin_search_people",
|
|
"description": "Search for LinkedIn members by keywords.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"keywords": {"type": "string", "description": "Search keywords (name, title, company)"},
|
|
"count": {"type": "integer", "description": "Number of results (max 10)", "default": 5},
|
|
},
|
|
"required": ["keywords"],
|
|
},
|
|
},
|
|
{
|
|
"name": "linkedin_create_post",
|
|
"description": "Create a LinkedIn post on behalf of the authenticated user.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"text": {"type": "string", "description": "The text content of the post"},
|
|
"visibility": {
|
|
"type": "string",
|
|
"description": "Who can see the post: PUBLIC or CONNECTIONS",
|
|
"default": "PUBLIC",
|
|
},
|
|
},
|
|
"required": ["text"],
|
|
},
|
|
},
|
|
{
|
|
"name": "linkedin_get_connections",
|
|
"description": "Get a list of the authenticated user's 1st-degree connections.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"count": {"type": "integer", "description": "Number of connections to retrieve (max 50)", "default": 10}
|
|
},
|
|
"required": [],
|
|
},
|
|
},
|
|
]
|
|
|
|
|
|
async def tool_get_profile(_args: dict) -> dict:
|
|
data = await li_get("/v2/userinfo")
|
|
if "error" in data:
|
|
return data
|
|
return {
|
|
"name": data.get("name", ""),
|
|
"given_name": data.get("given_name", ""),
|
|
"family_name": data.get("family_name", ""),
|
|
"email": data.get("email", ""),
|
|
"picture": data.get("picture", ""),
|
|
"locale": data.get("locale", ""),
|
|
}
|
|
|
|
|
|
async def tool_search_people(args: dict) -> dict:
|
|
keywords = args.get("keywords", "")
|
|
count = min(int(args.get("count", 5)), 10)
|
|
data = await li_get("/v2/search", params={
|
|
"q": "people",
|
|
"keywords": keywords,
|
|
"count": count,
|
|
})
|
|
return data
|
|
|
|
|
|
async def tool_create_post(args: dict) -> dict:
|
|
text = args.get("text", "")
|
|
visibility = args.get("visibility", "PUBLIC").upper()
|
|
|
|
# First get author URN
|
|
profile = await li_get("/v2/userinfo")
|
|
if "error" in profile:
|
|
return profile
|
|
|
|
# LinkedIn uses sub as person ID
|
|
person_id = profile.get("sub", "")
|
|
author_urn = f"urn:li:person:{person_id}"
|
|
|
|
body = {
|
|
"author": author_urn,
|
|
"lifecycleState": "PUBLISHED",
|
|
"specificContent": {
|
|
"com.linkedin.ugc.ShareContent": {
|
|
"shareCommentary": {"text": text},
|
|
"shareMediaCategory": "NONE",
|
|
}
|
|
},
|
|
"visibility": {
|
|
"com.linkedin.ugc.MemberNetworkVisibility": visibility
|
|
},
|
|
}
|
|
result = await li_post("/v2/ugcPosts", body)
|
|
return result
|
|
|
|
|
|
async def tool_get_connections(args: dict) -> dict:
|
|
count = min(int(args.get("count", 10)), 50)
|
|
data = await li_get("/v2/connections", params={"q": "viewer", "count": count})
|
|
return data
|
|
|
|
|
|
TOOL_HANDLERS = {
|
|
"linkedin_get_profile": tool_get_profile,
|
|
"linkedin_search_people": tool_search_people,
|
|
"linkedin_create_post": tool_create_post,
|
|
"linkedin_get_connections": tool_get_connections,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP HTTP endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def handle_mcp(request: Request):
|
|
if request.method == "GET":
|
|
# SSE initialize
|
|
return JSONResponse({"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}}, "id": 0})
|
|
|
|
body = await request.json()
|
|
method = body.get("method")
|
|
req_id = body.get("id")
|
|
params = body.get("params", {})
|
|
|
|
if method == "initialize":
|
|
return JSONResponse({"jsonrpc": "2.0", "result": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {"tools": {}},
|
|
"serverInfo": {"name": "linkedin", "version": "1.0.0"},
|
|
}, "id": req_id})
|
|
|
|
elif method == "tools/list":
|
|
return JSONResponse({"jsonrpc": "2.0", "result": {"tools": TOOLS}, "id": req_id})
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
tool_args = params.get("arguments", {})
|
|
handler = TOOL_HANDLERS.get(tool_name)
|
|
if not handler:
|
|
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": req_id})
|
|
try:
|
|
result = await handler(tool_args)
|
|
return JSONResponse({"jsonrpc": "2.0", "result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}, "id": req_id})
|
|
except Exception as e:
|
|
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id})
|
|
|
|
elif method == "ping":
|
|
return JSONResponse({"jsonrpc": "2.0", "result": {}, "id": req_id})
|
|
|
|
return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id})
|
|
|
|
|
|
async def health(request: Request):
|
|
token = load_token()
|
|
valid = token_is_valid(token) if token else False
|
|
return JSONResponse({
|
|
"status": "ok",
|
|
"authenticated": valid,
|
|
"auth_url": "/linkedin/auth",
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# App
|
|
# ---------------------------------------------------------------------------
|
|
|
|
routes = [
|
|
Route("/mcp", handle_mcp, methods=["GET", "POST"]),
|
|
Route("/health", health, methods=["GET"]),
|
|
Route("/linkedin/auth", auth_start, methods=["GET"]),
|
|
Route("/linkedin/callback", auth_callback, methods=["GET"]),
|
|
Route("/linkedin/status", auth_status, methods=["GET"]),
|
|
]
|
|
|
|
app = Starlette(routes=routes)
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")
|