feat: social-mcp 初始实现

多平台社交自动化 MCP 服务,首批支持小红书。

- 13 个 MCP 工具:登录管理、内容浏览、发布、互动
- 13 个 REST API 端点,支持 Bearer token 认证和限流
- BrowserManager:串行队列、背压、崩溃恢复
- Cookie 持久化:原子写入、0600 权限
- 安全:DNS rebinding 防御、错误脱敏、深层日志 redact
- Docker 部署支持
- 28 个单元测试全部通过
This commit is contained in:
2026-02-28 22:57:22 +08:00
commit 8da5f40c9f
38 changed files with 11273 additions and 0 deletions
+342
View File
@@ -0,0 +1,342 @@
import http from 'node:http';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import express from 'express';
import { config } from '../config/index.js';
import { BrowserManager, browserManager } from '../browser/manager.js';
import { logger } from '../utils/logger.js';
import {
dnsRebindingGuard,
shutdownGuard,
errorHandler,
bearerAuth,
initBearerToken,
} from './middleware.js';
// ---------------------------------------------------------------------------
// Package version -- read once at module load for the /health endpoint and
// the MCP server info. Uses a static string so we avoid importing JSON
// (which would require `resolveJsonModule` + ESM assertion gymnastics).
// ---------------------------------------------------------------------------
const PACKAGE_VERSION = '0.1.0';
// ---------------------------------------------------------------------------
// PlatformPlugin interface
// ---------------------------------------------------------------------------
/**
* Contract that every platform adapter (Twitter, Xiaohongshu, etc.) must
* implement to plug into the application.
*
* - `registerTools` is the minimum requirement: expose MCP tools.
* - Optional hooks allow the plugin to mount REST routes, perform async
* initialisation, clean up on shutdown, and report its health.
*/
export interface PlatformPlugin {
/** Human-readable name used in logs and health-check output. */
name: string;
/** Register MCP tools on the shared McpServer instance. */
registerTools(server: McpServer, browser: BrowserManager): void;
/** Optionally mount Express routes (e.g. OAuth callbacks, webhooks). */
registerRoutes?(router: express.Router, browser: BrowserManager): void;
/** Async initialisation (cookie restore, feature detection, etc.). */
init?(): Promise<void>;
/** Teardown hook called during graceful shutdown. */
shutdown?(): Promise<void>;
/** Return platform-specific health information. */
healthCheck?(): Promise<{ healthy: boolean; message?: string }>;
}
// ---------------------------------------------------------------------------
// AppServer
// ---------------------------------------------------------------------------
export class AppServer {
// -- Public surface -------------------------------------------------------
/** The underlying Express application -- use for plugin route mounting. */
readonly app: express.Express;
/** The MCP server instance -- use for plugin tool registration. */
readonly mcpServer: McpServer;
// -- Internal state -------------------------------------------------------
private httpServer: http.Server | null = null;
private shuttingDown = false;
private readonly plugins: PlatformPlugin[] = [];
/**
* SSE transports keyed by session ID so that POST /messages can route
* incoming JSON-RPC messages to the correct transport instance.
*/
private readonly transports = new Map<string, SSEServerTransport>();
// -- Constructor ----------------------------------------------------------
constructor() {
// 1. Express app + body parsing
this.app = express();
this.app.use(express.json());
// 2. Security & availability middleware
this.app.use(dnsRebindingGuard);
this.app.use(shutdownGuard(() => this.shuttingDown));
// 3. MCP server
this.mcpServer = new McpServer(
{ name: 'social-mcp', version: PACKAGE_VERSION },
);
// 4. SSE transport endpoints
this.setupSseEndpoints();
// 5. Health endpoint
this.setupHealthEndpoint();
// 6. Bearer token auth for /api/* routes
initBearerToken();
this.app.use('/api', bearerAuth);
// 7. Error handler (must be registered last -- re-registered after plugins)
this.app.use(errorHandler);
}
// -- Plugin registration --------------------------------------------------
/**
* Register a platform plugin. Call this **before** `start()` so that all
* tools and routes are wired up before the server begins accepting
* connections.
*/
registerPlugin(plugin: PlatformPlugin): void {
logger.info({ plugin: plugin.name }, 'Registering platform plugin');
plugin.registerTools(this.mcpServer, browserManager);
if (plugin.registerRoutes) {
const router = express.Router();
plugin.registerRoutes(router, browserManager);
// Mount REST API routes under /api/xhs (for xiaohongshu)
this.app.use(`/api/xhs`, router);
}
this.plugins.push(plugin);
}
// -- Lifecycle ------------------------------------------------------------
/**
* Initialise all plugins and start listening for HTTP connections on
* `config.host:config.port`.
*
* Returns a promise that resolves once the server is ready.
*/
async start(): Promise<void> {
// Initialise plugins (sequentially so order is deterministic).
for (const plugin of this.plugins) {
if (plugin.init) {
logger.info({ plugin: plugin.name }, 'Initialising plugin');
await plugin.init();
}
}
// Re-register the error handler so it sits after any plugin routes.
this.app.use(errorHandler);
return new Promise<void>((resolve, reject) => {
this.httpServer = this.app
.listen(config.port, config.host, () => {
logger.info(
{ host: config.host, port: config.port },
'AppServer listening',
);
resolve();
})
.on('error', (err: Error) => {
reject(err);
});
});
}
/**
* Initiate graceful shutdown:
* 1. Set the shutting-down flag (new requests get 503).
* 2. Shut down every plugin.
* 3. Close all SSE transports and the MCP server.
* 4. Close the HTTP server.
*/
async close(): Promise<void> {
if (this.shuttingDown) return;
this.shuttingDown = true;
logger.info('AppServer shutting down');
// Shut down plugins
for (const plugin of this.plugins) {
if (plugin.shutdown) {
try {
await plugin.shutdown();
} catch (err: unknown) {
logger.warn({ err, plugin: plugin.name }, 'Error shutting down plugin');
}
}
}
// Close all SSE transports
for (const [sessionId, transport] of this.transports) {
try {
await transport.close();
} catch (err: unknown) {
logger.warn({ err, sessionId }, 'Error closing SSE transport');
}
}
this.transports.clear();
// Close the MCP server
try {
await this.mcpServer.close();
} catch (err: unknown) {
logger.warn({ err }, 'Error closing MCP server');
}
// Close the HTTP server
if (this.httpServer) {
await new Promise<void>((resolve) => {
this.httpServer!.close(() => {
resolve();
});
});
this.httpServer = null;
}
logger.info('AppServer shut down complete');
}
// -- Private: SSE endpoints -----------------------------------------------
private setupSseEndpoints(): void {
// GET /sse -- establish a new SSE connection
this.app.get('/sse', (req, res) => {
logger.debug({ ip: req.ip }, 'New SSE connection request');
const transport = new SSEServerTransport('/messages', res);
const sessionId = transport.sessionId;
this.transports.set(sessionId, transport);
logger.info({ sessionId }, 'SSE transport created');
// Clean up when the client disconnects.
res.on('close', () => {
logger.info({ sessionId }, 'SSE client disconnected');
this.transports.delete(sessionId);
});
// Connect the transport to the MCP server. This starts the SSE
// stream and sends the initial endpoint event to the client.
void this.mcpServer.connect(transport).catch((err: unknown) => {
logger.error({ err, sessionId }, 'Failed to connect SSE transport to MCP server');
this.transports.delete(sessionId);
});
});
// POST /messages -- receive JSON-RPC messages for an existing session
this.app.post('/messages', (req, res) => {
const sessionId = req.query['sessionId'] as string | undefined;
if (!sessionId) {
res.status(400).json({ error: 'Missing sessionId query parameter' });
return;
}
const transport = this.transports.get(sessionId);
if (!transport) {
res.status(404).json({ error: 'Unknown or expired session' });
return;
}
// Delegate to the transport; it will parse the body and route the
// JSON-RPC message to the MCP server.
void transport.handlePostMessage(req, res).catch((err: unknown) => {
logger.error({ err, sessionId }, 'Error handling POST /messages');
if (!res.headersSent) {
res.status(500).json({ error: 'Internal server error' });
}
});
});
}
// -- Private: Health endpoint ---------------------------------------------
private setupHealthEndpoint(): void {
this.app.get('/health', (_req, res) => {
void this.buildHealthResponse()
.then((body) => {
const status = body.healthy ? 200 : 503;
res.status(status).json(body);
})
.catch((err: unknown) => {
logger.error({ err }, 'Health check failed unexpectedly');
res.status(500).json({ healthy: false, error: 'Health check error' });
});
});
}
private async buildHealthResponse(): Promise<Record<string, unknown>> {
// Memory usage
const mem = process.memoryUsage();
const memoryMb = {
rss: Math.round(mem.rss / 1024 / 1024),
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
external: Math.round(mem.external / 1024 / 1024),
};
// Active SSE sessions
const activeSessions = this.transports.size;
// Plugin health checks
const pluginHealth: Record<string, { healthy: boolean; message?: string }> = {};
let allPluginsHealthy = true;
for (const plugin of this.plugins) {
if (plugin.healthCheck) {
try {
const result = await plugin.healthCheck();
pluginHealth[plugin.name] = result;
if (!result.healthy) {
allPluginsHealthy = false;
}
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
pluginHealth[plugin.name] = { healthy: false, message };
allPluginsHealthy = false;
}
} else {
pluginHealth[plugin.name] = { healthy: true };
}
}
const healthy = allPluginsHealthy && !this.shuttingDown;
return {
healthy,
version: PACKAGE_VERSION,
uptime: Math.round(process.uptime()),
shuttingDown: this.shuttingDown,
activeSessions,
plugins: pluginHealth,
memory: memoryMb,
};
}
}
+278
View File
@@ -0,0 +1,278 @@
import crypto from 'node:crypto';
import fs from 'node:fs';
import path from 'node:path';
import type { Request, Response, NextFunction } from 'express';
import { config } from '../config/index.js';
import { logger } from '../utils/logger.js';
import { sanitizeErrorMessage } from '../utils/errors.js';
// ---------------------------------------------------------------------------
// Allowed hosts for DNS rebinding protection
// ---------------------------------------------------------------------------
const allowedHosts = new Set<string>([
'127.0.0.1',
'localhost',
`127.0.0.1:${config.port}`,
`localhost:${config.port}`,
]);
// ---------------------------------------------------------------------------
// 1. DNS Rebinding Guard
// ---------------------------------------------------------------------------
/**
* Reject requests whose `Host` header does not match an expected localhost
* value. This prevents DNS rebinding attacks from reaching the service when
* it is bound to the loopback interface.
*/
export function dnsRebindingGuard(
req: Request,
res: Response,
next: NextFunction,
): void {
const host = req.headers.host;
if (!host || !allowedHosts.has(host)) {
logger.warn(
{ host, ip: req.ip, method: req.method, url: req.originalUrl },
'DNS rebinding guard: blocked request with disallowed Host header',
);
res.status(403).json({ error: 'Forbidden' });
return;
}
next();
}
// ---------------------------------------------------------------------------
// 2. Shutdown Guard (factory)
// ---------------------------------------------------------------------------
/**
* Factory that returns middleware rejecting new requests once the server has
* started its graceful shutdown sequence.
*
* @param getShuttingDown - Callback that returns `true` when shutdown is in progress.
*/
export function shutdownGuard(
getShuttingDown: () => boolean,
): (req: Request, res: Response, next: NextFunction) => void {
return (_req: Request, res: Response, next: NextFunction): void => {
if (getShuttingDown()) {
res.status(503).json({ error: 'Server is shutting down' });
return;
}
next();
};
}
// ---------------------------------------------------------------------------
// 3. Error Handler
// ---------------------------------------------------------------------------
/**
* Express error-handling middleware (four-argument signature).
*
* Logs the full error internally while returning a sanitized message to the
* client so that internal filesystem paths, tokens, and stack traces are
* never exposed.
*/
export function errorHandler(
err: Error,
req: Request,
res: Response,
_next: NextFunction,
): void {
logger.error(
{ err, method: req.method, url: req.originalUrl },
'Unhandled error in request pipeline',
);
const message = sanitizeErrorMessage(err.message || 'Internal server error');
res.status(500).json({ error: message });
}
// ---------------------------------------------------------------------------
// 4. Bearer Token Authentication
// ---------------------------------------------------------------------------
const TOKEN_FILENAME = '.api-token';
/** Cached token once loaded/generated. */
let cachedToken: string | null = null;
/**
* Load or generate the Bearer API token.
*
* - On first start, generates a random 32-byte hex token.
* - Stores it at `config.cookieDir/.api-token` with 0o600 permissions.
* - On subsequent starts, reads the existing token from disk.
* - Logs the token to console so the user can copy it.
*
* Must be called once during server startup.
*/
export function initBearerToken(): string {
if (cachedToken) return cachedToken;
const tokenPath = path.join(config.cookieDir, TOKEN_FILENAME);
// Ensure the directory exists.
try {
fs.mkdirSync(config.cookieDir, { recursive: true, mode: 0o700 });
} catch {
// Directory may already exist.
}
// Try to read an existing token.
try {
const existing = fs.readFileSync(tokenPath, 'utf-8').trim();
if (existing.length >= 32) {
cachedToken = existing;
logger.info('API Bearer token loaded from disk');
// eslint-disable-next-line no-console
console.log(`\n REST API Bearer Token: ${cachedToken}\n`);
return cachedToken;
}
} catch {
// File does not exist or is unreadable — generate a new token.
}
// Generate a new token.
cachedToken = crypto.randomBytes(32).toString('hex');
fs.writeFileSync(tokenPath, cachedToken + '\n', { mode: 0o600 });
logger.info('New API Bearer token generated and saved');
// eslint-disable-next-line no-console
console.log(`\n REST API Bearer Token: ${cachedToken}\n`);
return cachedToken;
}
/**
* Express middleware that validates a `Bearer <token>` header against the
* stored API token. Uses `crypto.timingSafeEqual` to prevent timing attacks.
*
* Apply to `/api/*` routes only.
*/
export function bearerAuth(
req: Request,
res: Response,
next: NextFunction,
): void {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
res.status(401).json({
success: false,
error: { code: 'UNAUTHORIZED', message: 'Missing or invalid Authorization header' },
});
return;
}
const provided = authHeader.slice(7); // Strip "Bearer "
if (!cachedToken) {
res.status(500).json({
success: false,
error: { code: 'INTERNAL', message: 'API token not initialized' },
});
return;
}
// Use timing-safe comparison to prevent timing attacks.
const providedBuf = Buffer.from(provided, 'utf-8');
const expectedBuf = Buffer.from(cachedToken, 'utf-8');
if (
providedBuf.length !== expectedBuf.length ||
!crypto.timingSafeEqual(providedBuf, expectedBuf)
) {
res.status(403).json({
success: false,
error: { code: 'FORBIDDEN', message: 'Invalid Bearer token' },
});
return;
}
next();
}
// ---------------------------------------------------------------------------
// 5. Rate Limiter (in-memory, per-IP)
// ---------------------------------------------------------------------------
interface RateLimiterOptions {
/** Time window in milliseconds. */
windowMs: number;
/** Maximum number of requests allowed in the window. */
maxRequests: number;
}
interface RateLimiterEntry {
/** Request timestamps within the current window. */
timestamps: number[];
}
/**
* Create an in-memory per-IP rate limiter middleware.
*
* Returns 429 when the rate limit is exceeded. Old entries are automatically
* cleaned up every 60 seconds to prevent memory leaks.
*/
export function rateLimiter(opts: RateLimiterOptions) {
const store = new Map<string, RateLimiterEntry>();
// Periodic cleanup of stale entries.
const cleanupInterval = setInterval(() => {
const now = Date.now();
for (const [ip, entry] of store) {
entry.timestamps = entry.timestamps.filter((t) => now - t < opts.windowMs);
if (entry.timestamps.length === 0) {
store.delete(ip);
}
}
}, 60_000);
// Do not let the cleanup timer keep the process alive during shutdown.
if (typeof cleanupInterval === 'object' && 'unref' in cleanupInterval) {
cleanupInterval.unref();
}
return (req: Request, res: Response, next: NextFunction): void => {
const ip = req.ip ?? req.socket.remoteAddress ?? 'unknown';
const now = Date.now();
let entry = store.get(ip);
if (!entry) {
entry = { timestamps: [] };
store.set(ip, entry);
}
// Remove timestamps outside the current window.
entry.timestamps = entry.timestamps.filter((t) => now - t < opts.windowMs);
if (entry.timestamps.length >= opts.maxRequests) {
const retryAfterMs = opts.windowMs - (now - (entry.timestamps[0] ?? now));
const retryAfterSec = Math.ceil(retryAfterMs / 1000);
res.set('Retry-After', String(retryAfterSec));
res.status(429).json({
success: false,
error: {
code: 'RATE_LIMITED',
message: `Too many requests. Try again in ${String(retryAfterSec)} seconds.`,
},
});
return;
}
entry.timestamps.push(now);
next();
};
}