From dfddbf153b2b356f3780003cb27375f06c42fcee Mon Sep 17 00:00:00 2001 From: zgaetano Date: Tue, 31 Mar 2026 15:33:14 -0400 Subject: [PATCH] Remove mcp-gateway/linkedin-mcp/server.py --- mcp-gateway/linkedin-mcp/server.py | 441 ----------------------------- 1 file changed, 441 deletions(-) delete mode 100644 mcp-gateway/linkedin-mcp/server.py diff --git a/mcp-gateway/linkedin-mcp/server.py b/mcp-gateway/linkedin-mcp/server.py deleted file mode 100644 index 1463de6..0000000 --- a/mcp-gateway/linkedin-mcp/server.py +++ /dev/null @@ -1,441 +0,0 @@ -""" -LinkedIn MCP Server -=================== -Uses LinkedIn REST API with OAuth 2.0. -Token is obtained via a one-time browser redirect (on any machine, not the server). -Once authorized, the token is stored in /data/linkedin_token.json and refreshed automatically. -""" - -import asyncio -import hashlib -import json -import logging -import os -import secrets -import time -from pathlib import Path - -import httpx -from starlette.applications import Starlette -from starlette.requests import Request -from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse -from starlette.routing import Route -import uvicorn - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") -logger = logging.getLogger("linkedin-mcp") - -# --------------------------------------------------------------------------- -# Config -# --------------------------------------------------------------------------- - -CLIENT_ID = os.environ.get("LINKEDIN_CLIENT_ID", "") -CLIENT_SECRET = os.environ.get("LINKEDIN_CLIENT_SECRET", "") -REDIRECT_URI = os.environ.get("LINKEDIN_REDIRECT_URI", "http://localhost:8500/linkedin/callback") -TOKEN_FILE = Path(os.environ.get("TOKEN_FILE", "/data/linkedin_token.json")) -PORT = int(os.environ.get("PORT", "8500")) - -TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True) - -# In-memory state store for OAuth PKCE -_oauth_states: dict[str, float] = {} - -# --------------------------------------------------------------------------- -# Token storage -# --------------------------------------------------------------------------- - -def load_token() -> dict | None: - if TOKEN_FILE.exists(): - try: - return json.loads(TOKEN_FILE.read_text()) - except Exception: - return None - return None - - -def save_token(token: dict): - TOKEN_FILE.write_text(json.dumps(token, indent=2)) - logger.info("LinkedIn token saved.") - - -def token_is_valid(token: dict) -> bool: - if not token: - return False - expires_at = token.get("expires_at", 0) - return time.time() < expires_at - 60 - - -async def refresh_token_if_needed(token: dict) -> dict | None: - """Attempt to refresh using refresh_token if available.""" - refresh_tok = token.get("refresh_token") - if not refresh_tok: - return None - async with httpx.AsyncClient() as client: - resp = await client.post( - "https://www.linkedin.com/oauth/v2/accessToken", - data={ - "grant_type": "refresh_token", - "refresh_token": refresh_tok, - "client_id": CLIENT_ID, - "client_secret": CLIENT_SECRET, - }, - ) - if resp.status_code == 200: - data = resp.json() - data["expires_at"] = time.time() + data.get("expires_in", 3600) - # Preserve old refresh token if new one not provided - if "refresh_token" not in data: - data["refresh_token"] = refresh_tok - save_token(data) - return data - logger.warning(f"Token refresh failed: {resp.status_code} {resp.text}") - return None - - -async def get_valid_token() -> str | None: - token = load_token() - if not token: - return None - if token_is_valid(token): - return token["access_token"] - refreshed = await refresh_token_if_needed(token) - if refreshed and token_is_valid(refreshed): - return refreshed["access_token"] - return None - - -# --------------------------------------------------------------------------- -# LinkedIn API helpers -# --------------------------------------------------------------------------- - -async def li_get(path: str, params: dict = None) -> dict: - access_token = await get_valid_token() - if not access_token: - return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} - headers = { - "Authorization": f"Bearer {access_token}", - "LinkedIn-Version": "202401", - "X-Restli-Protocol-Version": "2.0.0", - } - async with httpx.AsyncClient() as client: - resp = await client.get( - f"https://api.linkedin.com{path}", - headers=headers, - params=params or {}, - timeout=15, - ) - if resp.status_code == 200: - return resp.json() - return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} - - -async def li_post(path: str, body: dict) -> dict: - access_token = await get_valid_token() - if not access_token: - return {"error": "Not authenticated. Visit /linkedin/auth to authorize."} - headers = { - "Authorization": f"Bearer {access_token}", - "LinkedIn-Version": "202401", - "X-Restli-Protocol-Version": "2.0.0", - "Content-Type": "application/json", - } - async with httpx.AsyncClient() as client: - resp = await client.post( - f"https://api.linkedin.com{path}", - headers=headers, - json=body, - timeout=15, - ) - if resp.status_code in (200, 201): - try: - return resp.json() - except Exception: - return {"status": "success"} - return {"error": f"LinkedIn API error {resp.status_code}", "detail": resp.text} - - -# --------------------------------------------------------------------------- -# OAuth flow endpoints -# --------------------------------------------------------------------------- - -async def auth_start(request: Request): - """Redirect user to LinkedIn OAuth consent screen.""" - if not CLIENT_ID: - return HTMLResponse("

Error: LINKEDIN_CLIENT_ID not configured.

", status_code=500) - - state = secrets.token_urlsafe(16) - _oauth_states[state] = time.time() + 600 # expires in 10 min - - params = { - "response_type": "code", - "client_id": CLIENT_ID, - "redirect_uri": REDIRECT_URI, - "state": state, - "scope": "openid profile email w_member_social", - } - from urllib.parse import urlencode - url = "https://www.linkedin.com/oauth/v2/authorization?" + urlencode(params) - return RedirectResponse(url) - - -async def auth_callback(request: Request): - """Handle OAuth callback, exchange code for token.""" - code = request.query_params.get("code") - state = request.query_params.get("state") - error = request.query_params.get("error") - - if error: - return HTMLResponse(f"

LinkedIn auth error: {error}

{request.query_params.get('error_description', '')}

") - - # Validate state - expected_expiry = _oauth_states.pop(state, None) - if not expected_expiry or time.time() > expected_expiry: - return HTMLResponse("

Invalid or expired state. Please try again.

") - - async with httpx.AsyncClient() as client: - resp = await client.post( - "https://www.linkedin.com/oauth/v2/accessToken", - data={ - "grant_type": "authorization_code", - "code": code, - "redirect_uri": REDIRECT_URI, - "client_id": CLIENT_ID, - "client_secret": CLIENT_SECRET, - }, - ) - - if resp.status_code != 200: - return HTMLResponse(f"

Token exchange failed

{resp.text}
") - - token = resp.json() - token["expires_at"] = time.time() + token.get("expires_in", 3600) - save_token(token) - - return HTMLResponse(""" - -

✅ LinkedIn Connected!

-

Your LinkedIn account has been authorized successfully.

-

You can close this tab. The MCP server is now ready to use.

- - """) - - -async def auth_status(request: Request): - """Show current auth status.""" - token = load_token() - if not token: - return HTMLResponse(f""" - -

❌ Not Authorized

-

LinkedIn has not been connected yet.

- - Connect LinkedIn - - - """) - - valid = token_is_valid(token) - expires = time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(token.get("expires_at", 0))) - status_color = "#00aa00" if valid else "#cc0000" - status_text = "✅ Token Valid" if valid else "⚠️ Token Expired" - - return HTMLResponse(f""" - -

{status_text}

-

Token expires: {expires}

- - Re-authorize - - - """) - - -# --------------------------------------------------------------------------- -# MCP Tool handlers (JSON-RPC over HTTP) -# --------------------------------------------------------------------------- - -TOOLS = [ - { - "name": "linkedin_get_profile", - "description": "Get the authenticated user's LinkedIn profile (name, headline, industry, location).", - "inputSchema": {"type": "object", "properties": {}, "required": []}, - }, - { - "name": "linkedin_search_people", - "description": "Search for LinkedIn members by keywords.", - "inputSchema": { - "type": "object", - "properties": { - "keywords": {"type": "string", "description": "Search keywords (name, title, company)"}, - "count": {"type": "integer", "description": "Number of results (max 10)", "default": 5}, - }, - "required": ["keywords"], - }, - }, - { - "name": "linkedin_create_post", - "description": "Create a LinkedIn post on behalf of the authenticated user.", - "inputSchema": { - "type": "object", - "properties": { - "text": {"type": "string", "description": "The text content of the post"}, - "visibility": { - "type": "string", - "description": "Who can see the post: PUBLIC or CONNECTIONS", - "default": "PUBLIC", - }, - }, - "required": ["text"], - }, - }, - { - "name": "linkedin_get_connections", - "description": "Get a list of the authenticated user's 1st-degree connections.", - "inputSchema": { - "type": "object", - "properties": { - "count": {"type": "integer", "description": "Number of connections to retrieve (max 50)", "default": 10} - }, - "required": [], - }, - }, -] - - -async def tool_get_profile(_args: dict) -> dict: - data = await li_get("/v2/userinfo") - if "error" in data: - return data - return { - "name": data.get("name", ""), - "given_name": data.get("given_name", ""), - "family_name": data.get("family_name", ""), - "email": data.get("email", ""), - "picture": data.get("picture", ""), - "locale": data.get("locale", ""), - } - - -async def tool_search_people(args: dict) -> dict: - keywords = args.get("keywords", "") - count = min(int(args.get("count", 5)), 10) - data = await li_get("/v2/search", params={ - "q": "people", - "keywords": keywords, - "count": count, - }) - return data - - -async def tool_create_post(args: dict) -> dict: - text = args.get("text", "") - visibility = args.get("visibility", "PUBLIC").upper() - - # First get author URN - profile = await li_get("/v2/userinfo") - if "error" in profile: - return profile - - # LinkedIn uses sub as person ID - person_id = profile.get("sub", "") - author_urn = f"urn:li:person:{person_id}" - - body = { - "author": author_urn, - "lifecycleState": "PUBLISHED", - "specificContent": { - "com.linkedin.ugc.ShareContent": { - "shareCommentary": {"text": text}, - "shareMediaCategory": "NONE", - } - }, - "visibility": { - "com.linkedin.ugc.MemberNetworkVisibility": visibility - }, - } - result = await li_post("/v2/ugcPosts", body) - return result - - -async def tool_get_connections(args: dict) -> dict: - count = min(int(args.get("count", 10)), 50) - data = await li_get("/v2/connections", params={"q": "viewer", "count": count}) - return data - - -TOOL_HANDLERS = { - "linkedin_get_profile": tool_get_profile, - "linkedin_search_people": tool_search_people, - "linkedin_create_post": tool_create_post, - "linkedin_get_connections": tool_get_connections, -} - - -# --------------------------------------------------------------------------- -# MCP HTTP endpoint -# --------------------------------------------------------------------------- - -async def handle_mcp(request: Request): - if request.method == "GET": - # SSE initialize - return JSONResponse({"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "linkedin", "version": "1.0.0"}}, "id": 0}) - - body = await request.json() - method = body.get("method") - req_id = body.get("id") - params = body.get("params", {}) - - if method == "initialize": - return JSONResponse({"jsonrpc": "2.0", "result": { - "protocolVersion": "2024-11-05", - "capabilities": {"tools": {}}, - "serverInfo": {"name": "linkedin", "version": "1.0.0"}, - }, "id": req_id}) - - elif method == "tools/list": - return JSONResponse({"jsonrpc": "2.0", "result": {"tools": TOOLS}, "id": req_id}) - - elif method == "tools/call": - tool_name = params.get("name") - tool_args = params.get("arguments", {}) - handler = TOOL_HANDLERS.get(tool_name) - if not handler: - return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": req_id}) - try: - result = await handler(tool_args) - return JSONResponse({"jsonrpc": "2.0", "result": {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}, "id": req_id}) - except Exception as e: - return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id}) - - elif method == "ping": - return JSONResponse({"jsonrpc": "2.0", "result": {}, "id": req_id}) - - return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id}) - - -async def health(request: Request): - token = load_token() - valid = token_is_valid(token) if token else False - return JSONResponse({ - "status": "ok", - "authenticated": valid, - "auth_url": "/linkedin/auth", - }) - - -# --------------------------------------------------------------------------- -# App -# --------------------------------------------------------------------------- - -routes = [ - Route("/mcp", handle_mcp, methods=["GET", "POST"]), - Route("/health", health, methods=["GET"]), - Route("/linkedin/auth", auth_start, methods=["GET"]), - Route("/linkedin/callback", auth_callback, methods=["GET"]), - Route("/linkedin/status", auth_status, methods=["GET"]), -] - -app = Starlette(routes=routes) - -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")