实现通知自动化闭环:SQLite任务状态、定时同步、未处理查询与回复状态回写;下线MCP对外未读通知工具

This commit is contained in:
2026-03-03 11:49:25 +08:00
parent d2d96de857
commit 237b528f08
10 changed files with 703 additions and 40 deletions
+5
View File
@@ -12,3 +12,8 @@ HEADLESS=true
# Logging # Logging
# NODE_ENV=production # NODE_ENV=production
# LOG_LEVEL=info # LOG_LEVEL=info
# Notification automation (Xiaohongshu)
# XHS_NOTIFICATION_POLL_ENABLED=true
# XHS_NOTIFICATION_POLL_INTERVAL_SEC=60
# XHS_NOTIFICATION_POLL_MAX_COUNT=20
+10 -3
View File
@@ -6,10 +6,11 @@ Multi-platform social media automation service that exposes browser-based action
## Features ## Features
- **17 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications) - **17 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications, automation)
- **REST API** with Bearer token authentication and per-route rate limiting - **REST API** with Bearer token authentication and per-route rate limiting
- **Browser automation** via `rebrowser-playwright` with per-platform serial queueing - **Browser automation** via `rebrowser-playwright` with per-platform serial queueing
- **Cookie persistence** with file-based storage (`0600`, atomic writes) - **Cookie persistence** with file-based storage (`0600`, atomic writes)
- **Notification task state** in SQLite (`$COOKIE_DIR/xiaohongshu/automation.db`) for idempotent auto-reply workflows
- **Web dashboard** (React + Vite) for login, feed exploration, publishing, and API testing - **Web dashboard** (React + Vite) for login, feed exploration, publishing, and API testing
- **Security controls**: timing-safe token comparison, bind-address safety gate, error sanitization, log redaction - **Security controls**: timing-safe token comparison, bind-address safety gate, error sanitization, log redaction
- **Docker deployment** support with hardened runtime defaults - **Docker deployment** support with hardened runtime defaults
@@ -102,7 +103,7 @@ Add this in `claude_desktop_config.json`:
| `xhs_reply_comment` | Reply to a comment | | `xhs_reply_comment` | Reply to a comment |
| `xhs_like` | Toggle like state on a note | | `xhs_like` | Toggle like state on a note |
| `xhs_favorite` | Toggle favorite state on a note | | `xhs_favorite` | Toggle favorite state on a note |
| `xhs_get_comment_notifications` | Get unread comment/@ notifications | | `xhs_get_unprocessed_notifications` | Get unprocessed notification tasks from local SQLite state |
| `xhs_reply_notification` | Reply to a specific notification | | `xhs_reply_notification` | Reply to a specific notification |
## REST API ## REST API
@@ -150,6 +151,7 @@ Read endpoints are limited to **60/min** per IP. Write endpoints are limited to
| `POST` | `/api/xhs/like` | Toggle like | 10/min | | `POST` | `/api/xhs/like` | Toggle like | 10/min |
| `POST` | `/api/xhs/favorite` | Toggle favorite | 10/min | | `POST` | `/api/xhs/favorite` | Toggle favorite | 10/min |
| `GET` | `/api/xhs/notifications/comments` | Get comment notifications | 60/min | | `GET` | `/api/xhs/notifications/comments` | Get comment notifications | 60/min |
| `GET` | `/api/xhs/notifications/unprocessed` | Get unprocessed notification tasks from SQLite | 60/min |
| `POST` | `/api/xhs/notifications/reply` | Reply to notification | 10/min | | `POST` | `/api/xhs/notifications/reply` | Reply to notification | 10/min |
### Response Format ### Response Format
@@ -228,6 +230,9 @@ Note: Docker defaults expose port `3000` because container env sets `PORT=3000`.
| `COOKIE_DIR` | `~/.social-mcp` | Cookie/token storage directory | | `COOKIE_DIR` | `~/.social-mcp` | Cookie/token storage directory |
| `MAX_QUEUE_DEPTH` | `10` | Max pending operations per platform queue | | `MAX_QUEUE_DEPTH` | `10` | Max pending operations per platform queue |
| `ALLOW_REMOTE` | (unset) | Must be `yes-i-understand-the-risk` to allow public bind | | `ALLOW_REMOTE` | (unset) | Must be `yes-i-understand-the-risk` to allow public bind |
| `XHS_NOTIFICATION_POLL_ENABLED` | `true` | Enable periodic notification sync into SQLite |
| `XHS_NOTIFICATION_POLL_INTERVAL_SEC` | `60` | Notification sync interval (seconds, min 15) |
| `XHS_NOTIFICATION_POLL_MAX_COUNT` | `20` | Max notifications fetched per sync |
## Project Structure ## Project Structure
@@ -264,7 +269,9 @@ social-mcp/
│ ├── publish-video.ts │ ├── publish-video.ts
│ ├── comment.ts │ ├── comment.ts
│ ├── interaction.ts │ ├── interaction.ts
── notification.ts ── notification.ts
│ ├── notification-state.ts
│ └── notification-sync.ts
└── web/ └── web/
└── src/ └── src/
``` ```
+10 -3
View File
@@ -6,10 +6,11 @@
## 功能特性 ## 功能特性
- 小红书 **17 个 MCP 工具**(登录、浏览、发布、互动、通知) - 小红书 **17 个 MCP 工具**(登录、浏览、发布、互动、通知、自动化
- 带 Bearer Token 鉴权与按路由限流的 REST API - 带 Bearer Token 鉴权与按路由限流的 REST API
- 基于 `rebrowser-playwright` 的浏览器自动化,按平台串行队列执行 - 基于 `rebrowser-playwright` 的浏览器自动化,按平台串行队列执行
- 文件型 Cookie 持久化(`0600` 权限、原子写入) - 文件型 Cookie 持久化(`0600` 权限、原子写入)
- 通知任务状态入库 SQLite`$COOKIE_DIR/xiaohongshu/automation.db`),保障自动回复幂等
- Web 控制台(React + Vite):登录、内容浏览、发布、接口测试 - Web 控制台(React + Vite):登录、内容浏览、发布、接口测试
- 安全控制:Token 常量时间比对、绑定地址安全门、错误信息脱敏、日志字段脱敏 - 安全控制:Token 常量时间比对、绑定地址安全门、错误信息脱敏、日志字段脱敏
- 支持 Docker 部署 - 支持 Docker 部署
@@ -102,7 +103,7 @@ pnpm test
| `xhs_reply_comment` | 回复评论 | | `xhs_reply_comment` | 回复评论 |
| `xhs_like` | 切换点赞状态 | | `xhs_like` | 切换点赞状态 |
| `xhs_favorite` | 切换收藏状态 | | `xhs_favorite` | 切换收藏状态 |
| `xhs_get_comment_notifications` | 获取未读评论/@通知 | | `xhs_get_unprocessed_notifications` | 从本地 SQLite 获取“未处理”通知任务 |
| `xhs_reply_notification` | 对通知进行回复 | | `xhs_reply_notification` | 对通知进行回复 |
## REST API ## REST API
@@ -151,6 +152,7 @@ curl -X POST \
| `POST` | `/api/xhs/like` | 点赞切换 | 10/min | | `POST` | `/api/xhs/like` | 点赞切换 | 10/min |
| `POST` | `/api/xhs/favorite` | 收藏切换 | 10/min | | `POST` | `/api/xhs/favorite` | 收藏切换 | 10/min |
| `GET` | `/api/xhs/notifications/comments` | 获取评论通知 | 60/min | | `GET` | `/api/xhs/notifications/comments` | 获取评论通知 | 60/min |
| `GET` | `/api/xhs/notifications/unprocessed` | 获取本地未处理通知任务 | 60/min |
| `POST` | `/api/xhs/notifications/reply` | 回复通知 | 10/min | | `POST` | `/api/xhs/notifications/reply` | 回复通知 | 10/min |
### 返回格式 ### 返回格式
@@ -229,6 +231,9 @@ docker run -d \
| `COOKIE_DIR` | `~/.social-mcp` | Cookie/Token 存储目录 | | `COOKIE_DIR` | `~/.social-mcp` | Cookie/Token 存储目录 |
| `MAX_QUEUE_DEPTH` | `10` | 单平台最大排队深度 | | `MAX_QUEUE_DEPTH` | `10` | 单平台最大排队深度 |
| `ALLOW_REMOTE` | 空 | 仅当值为 `yes-i-understand-the-risk` 时允许公网绑定 | | `ALLOW_REMOTE` | 空 | 仅当值为 `yes-i-understand-the-risk` 时允许公网绑定 |
| `XHS_NOTIFICATION_POLL_ENABLED` | `true` | 是否启用通知定时同步到 SQLite |
| `XHS_NOTIFICATION_POLL_INTERVAL_SEC` | `60` | 通知同步周期(秒,最小 15) |
| `XHS_NOTIFICATION_POLL_MAX_COUNT` | `20` | 每次同步最多拉取通知数 |
## 项目结构 ## 项目结构
@@ -265,7 +270,9 @@ social-mcp/
│ ├── publish-video.ts │ ├── publish-video.ts
│ ├── comment.ts │ ├── comment.ts
│ ├── interaction.ts │ ├── interaction.ts
── notification.ts ── notification.ts
│ ├── notification-state.ts
│ └── notification-sync.ts
└── web/ └── web/
└── src/ └── src/
``` ```
+9
View File
@@ -93,6 +93,12 @@ export interface AppConfig {
maxQueueDepth: number; maxQueueDepth: number;
/** Per-operation-type timeout in ms */ /** Per-operation-type timeout in ms */
operationTimeouts: Record<string, number>; operationTimeouts: Record<string, number>;
/** Whether the XHS notification poller is enabled */
notificationPollEnabled: boolean;
/** Poll interval for XHS notifications (ms) */
notificationPollIntervalMs: number;
/** Max notifications fetched per poll */
notificationPollMaxCount: number;
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -109,4 +115,7 @@ export const config: AppConfig = {
cookieDir: envString('COOKIE_DIR', path.join(os.homedir(), '.social-mcp')), cookieDir: envString('COOKIE_DIR', path.join(os.homedir(), '.social-mcp')),
maxQueueDepth: envInt('MAX_QUEUE_DEPTH', 10), maxQueueDepth: envInt('MAX_QUEUE_DEPTH', 10),
operationTimeouts, operationTimeouts,
notificationPollEnabled: envBool('XHS_NOTIFICATION_POLL_ENABLED', true),
notificationPollIntervalMs: Math.max(15, envInt('XHS_NOTIFICATION_POLL_INTERVAL_SEC', 60)) * 1000,
notificationPollMaxCount: Math.max(1, Math.min(50, envInt('XHS_NOTIFICATION_POLL_MAX_COUNT', 20))),
}; };
+73 -27
View File
@@ -15,7 +15,9 @@ import { publishVideoNote } from './publish-video.js';
import { listMyNotes } from './my-notes.js'; import { listMyNotes } from './my-notes.js';
import { postComment, replyComment } from './comment.js'; import { postComment, replyComment } from './comment.js';
import { toggleLike, toggleFavorite } from './interaction.js'; import { toggleLike, toggleFavorite } from './interaction.js';
import { getCommentNotifications, replyNotification } from './notification.js'; import { replyNotification } from './notification.js';
import { getNotificationStateStore, type NotificationTaskStatus } from './notification-state.js';
import { syncCommentNotifications, xhsNotificationPoller } from './notification-sync.js';
import { createXhsRoutes } from './routes.js'; import { createXhsRoutes } from './routes.js';
import { import {
CheckLoginSchema, CheckLoginSchema,
@@ -33,8 +35,8 @@ import {
ReplyCommentSchema, ReplyCommentSchema,
LikeSchema, LikeSchema,
FavoriteSchema, FavoriteSchema,
GetCommentNotificationsSchema,
ReplyNotificationSchema, ReplyNotificationSchema,
GetUnprocessedNotificationsSchema,
} from './schemas.js'; } from './schemas.js';
import type { SearchFilters } from './types.js'; import type { SearchFilters } from './types.js';
import type { PlatformPlugin } from '../../server/app.js'; import type { PlatformPlugin } from '../../server/app.js';
@@ -63,10 +65,16 @@ export const xiaohongshuPlugin: PlatformPlugin = {
// ========================================================================= // =========================================================================
registerRoutes(router: Router, browser: BrowserManager): void { registerRoutes(router: Router, browser: BrowserManager): void {
xhsNotificationPoller.start(browser);
const xhsRouter = createXhsRoutes(browser); const xhsRouter = createXhsRoutes(browser);
router.use('/', xhsRouter); router.use('/', xhsRouter);
}, },
shutdown(): Promise<void> {
xhsNotificationPoller.stop();
return Promise.resolve();
},
// ========================================================================= // =========================================================================
// MCP tools // MCP tools
// ========================================================================= // =========================================================================
@@ -597,29 +605,35 @@ export const xiaohongshuPlugin: PlatformPlugin = {
// ===================================================================== // =====================================================================
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// xhs_get_comment_notifications // xhs_get_unprocessed_notifications
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
server.tool( server.tool(
'xhs_get_comment_notifications', 'xhs_get_unprocessed_notifications',
'Get unread comment and @ notifications from Xiaohongshu. Returns feedId + xsecToken per notification — use xhs_get_feed_detail to read the note content before replying so your response is contextually relevant.', 'Get unprocessed notification tasks from local state (SQLite). Optionally sync latest notifications first.',
GetCommentNotificationsSchema, GetUnprocessedNotificationsSchema,
async (args) => { async (args) => {
return withErrorHandling('xhs_get_comment_notifications', async () => { return withErrorHandling('xhs_get_unprocessed_notifications', async () => {
const timeoutMs = let syncResult: { fetched: number; inserted: number; updated: number } | null = null;
config.operationTimeouts['feed_detail'] ?? if (args.sync) {
config.operationTimeouts['default'] ?? syncResult = await syncCommentNotifications(browser, args.max_count);
60_000; }
const notifications = await browser.withPage( const statuses: NotificationTaskStatus[] =
PLATFORM, args.statuses && args.statuses.length > 0
async (page) => ? args.statuses
getCommentNotifications(page, args.max_count), : ['new', 'failed'];
timeoutMs,
); const tasks = getNotificationStateStore().listByStatuses(statuses, args.max_count);
return { return {
content: [{ type: 'text' as const, text: JSON.stringify(notifications) }], content: [{
type: 'text' as const,
text: JSON.stringify({
tasks,
...(syncResult ? { synced: syncResult } : {}),
}),
}],
}; };
}); });
}, },
@@ -635,21 +649,53 @@ export const xiaohongshuPlugin: PlatformPlugin = {
ReplyNotificationSchema, ReplyNotificationSchema,
async (args) => { async (args) => {
return withErrorHandling('xhs_reply_notification', async () => { return withErrorHandling('xhs_reply_notification', async () => {
const targetFingerprint =
args.fingerprint ??
getNotificationStateStore().findOpenFingerprint(args.user_id, args.comment_content);
if (targetFingerprint) {
getNotificationStateStore().markPending(targetFingerprint);
}
const timeoutMs = const timeoutMs =
config.operationTimeouts['reply'] ?? config.operationTimeouts['reply'] ??
config.operationTimeouts['default'] ?? config.operationTimeouts['default'] ??
20_000; 20_000;
const result = await browser.withPage( try {
PLATFORM, const result = await browser.withPage(
async (page) => PLATFORM,
replyNotification(page, args.user_id, args.comment_content, args.reply_content), async (page) =>
timeoutMs, replyNotification(page, args.user_id, args.comment_content, args.reply_content),
); timeoutMs,
);
return { if (targetFingerprint) {
content: [{ type: 'text' as const, text: JSON.stringify(result) }], if (result.success) {
}; getNotificationStateStore().markReplied(targetFingerprint, args.reply_content);
} else {
getNotificationStateStore().markFailed(
targetFingerprint,
'Reply action returned success=false',
);
}
}
return {
content: [{
type: 'text' as const,
text: JSON.stringify({
...result,
...(targetFingerprint ? { fingerprint: targetFingerprint } : {}),
}),
}],
};
} catch (err) {
if (targetFingerprint) {
const message = err instanceof Error ? err.message : String(err);
getNotificationStateStore().markFailed(targetFingerprint, message);
}
throw err;
}
}); });
}, },
); );
@@ -0,0 +1,317 @@
import crypto from 'node:crypto';
import fs from 'node:fs';
import path from 'node:path';
import { DatabaseSync } from 'node:sqlite';
import { config } from '../../config/index.js';
import { logger } from '../../utils/logger.js';
import type { CommentNotification } from './types.js';
export type NotificationTaskStatus =
| 'new'
| 'pending'
| 'replied'
| 'failed'
| 'ignored';
interface NotificationRow {
fingerprint: string;
user_id: string;
nickname: string;
avatar: string;
content: string;
type: string;
time: string;
feed_id: string;
xsec_token: string;
note_image: string;
status: NotificationTaskStatus;
first_seen_at: number;
last_seen_at: number;
retry_count: number;
last_attempt_at: number | null;
replied_at: number | null;
reply_content: string | null;
error_message: string | null;
}
export interface NotificationTask {
fingerprint: string;
notification: CommentNotification;
status: NotificationTaskStatus;
firstSeenAt: string;
lastSeenAt: string;
retryCount: number;
lastAttemptAt?: string;
repliedAt?: string;
replyContent?: string;
errorMessage?: string;
}
export interface NotificationUpsertResult {
fetched: number;
inserted: number;
updated: number;
}
const PLATFORM = 'xiaohongshu';
const DB_FILENAME = 'automation.db';
const log = logger.child({ module: 'xhs-notification-state' });
export class NotificationStateStore {
private readonly db: DatabaseSync;
private readonly dbPath: string;
constructor(baseDir = config.cookieDir, dbFilename = DB_FILENAME) {
const dir = path.join(baseDir, PLATFORM);
fs.mkdirSync(dir, { recursive: true, mode: 0o700 });
this.dbPath = path.join(dir, dbFilename);
this.db = new DatabaseSync(this.dbPath);
this.db.exec('PRAGMA journal_mode = WAL;');
this.db.exec('PRAGMA synchronous = NORMAL;');
this.db.exec(`
CREATE TABLE IF NOT EXISTS notification_tasks (
fingerprint TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
nickname TEXT NOT NULL,
avatar TEXT NOT NULL,
content TEXT NOT NULL,
type TEXT NOT NULL,
time TEXT NOT NULL,
feed_id TEXT NOT NULL,
xsec_token TEXT NOT NULL,
note_image TEXT NOT NULL,
status TEXT NOT NULL,
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
replied_at INTEGER,
reply_content TEXT,
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_notification_tasks_status_first_seen
ON notification_tasks(status, first_seen_at);
CREATE INDEX IF NOT EXISTS idx_notification_tasks_user_content_status
ON notification_tasks(user_id, content, status);
`);
log.info({ dbPath: this.dbPath }, 'Notification state store initialized');
}
buildFingerprint(notification: CommentNotification): string {
const payload = [
notification.feedId,
notification.userId,
notification.content.trim(),
notification.time.trim(),
notification.type.trim(),
].join('|');
return crypto.createHash('sha256').update(payload).digest('hex');
}
upsertNotifications(notifications: CommentNotification[]): NotificationUpsertResult {
if (notifications.length === 0) {
return { fetched: 0, inserted: 0, updated: 0 };
}
const now = Date.now();
let inserted = 0;
let updated = 0;
const selectStmt = this.db.prepare(
'SELECT fingerprint FROM notification_tasks WHERE fingerprint = ?',
);
const insertStmt = this.db.prepare(`
INSERT INTO notification_tasks (
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'new', ?, ?)
`);
const updateStmt = this.db.prepare(`
UPDATE notification_tasks
SET
nickname = ?,
avatar = ?,
type = ?,
time = ?,
feed_id = ?,
xsec_token = ?,
note_image = ?,
last_seen_at = ?
WHERE fingerprint = ?
`);
this.db.exec('BEGIN');
try {
for (const n of notifications) {
const fp = this.buildFingerprint(n);
const exists = selectStmt.get(fp) as { fingerprint: string } | undefined;
if (!exists) {
insertStmt.run(
fp,
n.userId,
n.nickname,
n.avatar,
n.content,
n.type,
n.time,
n.feedId,
n.xsecToken,
n.noteImage,
now,
now,
);
inserted++;
} else {
updateStmt.run(
n.nickname,
n.avatar,
n.type,
n.time,
n.feedId,
n.xsecToken,
n.noteImage,
now,
fp,
);
updated++;
}
}
this.db.exec('COMMIT');
} catch (err) {
this.db.exec('ROLLBACK');
throw err;
}
return {
fetched: notifications.length,
inserted,
updated,
};
}
listByStatuses(
statuses: NotificationTaskStatus[],
maxCount: number,
): NotificationTask[] {
if (statuses.length === 0) return [];
const placeholders = statuses.map(() => '?').join(', ');
const query = `
SELECT
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at,
retry_count, last_attempt_at, replied_at, reply_content, error_message
FROM notification_tasks
WHERE status IN (${placeholders})
ORDER BY first_seen_at ASC
LIMIT ?
`;
const stmt = this.db.prepare(query);
const rows = stmt.all(...statuses, maxCount) as unknown as NotificationRow[];
return rows.map((r) => this.rowToTask(r));
}
findOpenFingerprint(userId: string, content: string): string | null {
const stmt = this.db.prepare(`
SELECT fingerprint
FROM notification_tasks
WHERE user_id = ?
AND content = ?
AND status IN ('new', 'failed', 'pending')
ORDER BY first_seen_at ASC
LIMIT 1
`);
const row = stmt.get(userId, content) as { fingerprint: string } | undefined;
return row?.fingerprint ?? null;
}
markPending(fingerprint: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET status = 'pending', last_attempt_at = ?, error_message = NULL
WHERE fingerprint = ?
`);
stmt.run(now, fingerprint);
}
markReplied(fingerprint: string, replyContent: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET
status = 'replied',
replied_at = ?,
last_attempt_at = ?,
reply_content = ?,
error_message = NULL
WHERE fingerprint = ?
`);
stmt.run(now, now, replyContent, fingerprint);
}
markFailed(fingerprint: string, errorMessage: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET
status = 'failed',
retry_count = retry_count + 1,
last_attempt_at = ?,
error_message = ?
WHERE fingerprint = ?
`);
stmt.run(now, errorMessage, fingerprint);
}
markIgnored(fingerprint: string, reason?: string): void {
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET status = 'ignored', error_message = ?
WHERE fingerprint = ?
`);
stmt.run(reason ?? 'Ignored by operator', fingerprint);
}
private rowToTask(row: NotificationRow): NotificationTask {
return {
fingerprint: row.fingerprint,
notification: {
userId: row.user_id,
nickname: row.nickname,
avatar: row.avatar,
content: row.content,
type: row.type,
time: row.time,
feedId: row.feed_id,
xsecToken: row.xsec_token,
noteImage: row.note_image,
},
status: row.status,
firstSeenAt: new Date(row.first_seen_at).toISOString(),
lastSeenAt: new Date(row.last_seen_at).toISOString(),
retryCount: row.retry_count,
...(row.last_attempt_at ? { lastAttemptAt: new Date(row.last_attempt_at).toISOString() } : {}),
...(row.replied_at ? { repliedAt: new Date(row.replied_at).toISOString() } : {}),
...(row.reply_content ? { replyContent: row.reply_content } : {}),
...(row.error_message ? { errorMessage: row.error_message } : {}),
};
}
}
let storeSingleton: NotificationStateStore | null = null;
export function getNotificationStateStore(): NotificationStateStore {
if (!storeSingleton) {
storeSingleton = new NotificationStateStore();
}
return storeSingleton;
}
@@ -0,0 +1,90 @@
import type { BrowserManager } from '../../browser/manager.js';
import { config } from '../../config/index.js';
import { logger } from '../../utils/logger.js';
import { getCommentNotifications } from './notification.js';
import { getNotificationStateStore, type NotificationUpsertResult } from './notification-state.js';
const PLATFORM = 'xiaohongshu';
const log = logger.child({ module: 'xhs-notification-sync' });
export async function syncCommentNotifications(
browser: BrowserManager,
maxCount = config.notificationPollMaxCount,
): Promise<NotificationUpsertResult> {
const timeoutMs =
config.operationTimeouts['feed_detail'] ??
config.operationTimeouts['default'] ??
60_000;
const notifications = await browser.withPage(
PLATFORM,
async (page) => getCommentNotifications(page, maxCount),
timeoutMs,
);
const result = getNotificationStateStore().upsertNotifications(notifications);
if (result.fetched > 0) {
log.info(result, 'Notifications synced to state store');
}
return result;
}
export class XhsNotificationPoller {
private timer: ReturnType<typeof setInterval> | null = null;
private running = false;
private started = false;
start(browser: BrowserManager): void {
if (this.started) return;
this.started = true;
if (!config.notificationPollEnabled) {
log.info('Notification poller disabled by config');
return;
}
const tick = async (): Promise<void> => {
if (this.running) return;
this.running = true;
try {
await syncCommentNotifications(browser, config.notificationPollMaxCount);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
// Login expiration and empty unread state are expected in daily usage.
log.debug({ message }, 'Notification poll tick skipped');
} finally {
this.running = false;
}
};
void tick();
this.timer = setInterval(() => {
void tick();
}, config.notificationPollIntervalMs);
if (this.timer && typeof this.timer === 'object' && 'unref' in this.timer) {
this.timer.unref();
}
log.info(
{
intervalMs: config.notificationPollIntervalMs,
maxCount: config.notificationPollMaxCount,
},
'Notification poller started',
);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
this.started = false;
this.running = false;
log.info('Notification poller stopped');
}
}
export const xhsNotificationPoller = new XhsNotificationPoller();
+98 -7
View File
@@ -20,6 +20,12 @@ import { listMyNotes } from './my-notes.js';
import { postComment, replyComment } from './comment.js'; import { postComment, replyComment } from './comment.js';
import { toggleLike, toggleFavorite } from './interaction.js'; import { toggleLike, toggleFavorite } from './interaction.js';
import { getCommentNotifications, replyNotification } from './notification.js'; import { getCommentNotifications, replyNotification } from './notification.js';
import {
getNotificationStateStore,
type NotificationTask,
type NotificationTaskStatus,
} from './notification-state.js';
import { syncCommentNotifications } from './notification-sync.js';
import { import {
SearchSchema, SearchSchema,
@@ -34,6 +40,7 @@ import {
FavoriteSchema, FavoriteSchema,
GetCommentNotificationsSchema, GetCommentNotificationsSchema,
ReplyNotificationSchema, ReplyNotificationSchema,
GetUnprocessedNotificationsSchema,
} from './schemas.js'; } from './schemas.js';
import type { SearchFilters } from './types.js'; import type { SearchFilters } from './types.js';
@@ -119,11 +126,18 @@ const GetCommentNotificationsQuerySchema = z.object({
}); });
const ReplyNotificationBodySchema = z.object({ const ReplyNotificationBodySchema = z.object({
fingerprint: ReplyNotificationSchema.fingerprint,
user_id: ReplyNotificationSchema.user_id, user_id: ReplyNotificationSchema.user_id,
comment_content: ReplyNotificationSchema.comment_content, comment_content: ReplyNotificationSchema.comment_content,
reply_content: ReplyNotificationSchema.reply_content, reply_content: ReplyNotificationSchema.reply_content,
}); });
const GetUnprocessedNotificationsQuerySchema = z.object({
max_count: GetUnprocessedNotificationsSchema.max_count,
sync: GetUnprocessedNotificationsSchema.sync,
statuses: GetUnprocessedNotificationsSchema.statuses,
});
const LikeBodySchema = z.object({ const LikeBodySchema = z.object({
feed_id: LikeSchema.feed_id, feed_id: LikeSchema.feed_id,
xsec_token: LikeSchema.xsec_token, xsec_token: LikeSchema.xsec_token,
@@ -654,6 +668,7 @@ export function createXhsRoutes(browser: BrowserManager): Router {
timeoutMs, timeoutMs,
); );
getNotificationStateStore().upsertNotifications(notifications);
res.json(successResponse(notifications) as ApiResponse<typeof notifications>); res.json(successResponse(notifications) as ApiResponse<typeof notifications>);
} catch (err) { } catch (err) {
handleError(res, err); handleError(res, err);
@@ -668,20 +683,96 @@ export function createXhsRoutes(browser: BrowserManager): Router {
void (async () => { void (async () => {
try { try {
const body = ReplyNotificationBodySchema.parse(req.body); const body = ReplyNotificationBodySchema.parse(req.body);
const targetFingerprint =
body.fingerprint ??
getNotificationStateStore().findOpenFingerprint(body.user_id, body.comment_content);
if (targetFingerprint) {
getNotificationStateStore().markPending(targetFingerprint);
}
const timeoutMs = const timeoutMs =
config.operationTimeouts['reply'] ?? config.operationTimeouts['reply'] ??
config.operationTimeouts['default'] ?? config.operationTimeouts['default'] ??
20_000; 20_000;
const result = await browser.withPage( try {
PLATFORM, const result = await browser.withPage(
async (page) => PLATFORM,
replyNotification(page, body.user_id, body.comment_content, body.reply_content), async (page) =>
timeoutMs, replyNotification(page, body.user_id, body.comment_content, body.reply_content),
); timeoutMs,
);
res.json(successResponse(result) as ApiResponse<typeof result>); if (targetFingerprint) {
if (result.success) {
getNotificationStateStore().markReplied(targetFingerprint, body.reply_content);
} else {
getNotificationStateStore().markFailed(
targetFingerprint,
'Reply action returned success=false',
);
}
}
res.json(successResponse({
...result,
...(targetFingerprint ? { fingerprint: targetFingerprint } : {}),
}) as ApiResponse<typeof result & { fingerprint?: string }>);
} catch (err) {
if (targetFingerprint) {
const message = err instanceof Error ? err.message : String(err);
getNotificationStateStore().markFailed(targetFingerprint, message);
}
throw err;
}
} catch (err) {
handleError(res, err);
}
})();
});
// -----------------------------------------------------------------------
// GET /notifications/unprocessed
// -----------------------------------------------------------------------
router.get('/notifications/unprocessed', readRateLimiter, (req, res) => {
void (async () => {
try {
const statusesFromQuery = (() => {
const raw = req.query.statuses;
if (!raw) return undefined;
if (Array.isArray(raw)) return raw.flatMap((s) => String(s).split(','));
return String(raw).split(',');
})()
?.map((s) => s.trim())
.filter((s) => s.length > 0);
const parsed = GetUnprocessedNotificationsQuerySchema.parse({
max_count: req.query.max_count ? Number(req.query.max_count) : undefined,
sync: req.query.sync === undefined
? undefined
: ['1', 'true', 'yes'].includes(String(req.query.sync).toLowerCase()),
statuses: statusesFromQuery,
});
let syncResult: { fetched: number; inserted: number; updated: number } | null = null;
if (parsed.sync) {
syncResult = await syncCommentNotifications(browser, parsed.max_count);
}
const statuses: NotificationTaskStatus[] =
parsed.statuses && parsed.statuses.length > 0
? parsed.statuses
: ['new', 'failed'];
const tasks = getNotificationStateStore().listByStatuses(statuses, parsed.max_count);
res.json(successResponse({
tasks,
...(syncResult ? { synced: syncResult } : {}),
}) as ApiResponse<{
tasks: NotificationTask[];
synced?: { fetched: number; inserted: number; updated: number };
}>);
} catch (err) { } catch (err) {
handleError(res, err); handleError(res, err);
} }
+25
View File
@@ -161,11 +161,36 @@ export const GetCommentNotificationsSchema = {
/** xhs_reply_notification */ /** xhs_reply_notification */
export const ReplyNotificationSchema = { export const ReplyNotificationSchema = {
fingerprint: z
.string()
.optional()
.describe('Optional notification fingerprint from xhs_get_unprocessed_notifications'),
user_id: z.string().describe('User ID of the comment author (from notification)'), user_id: z.string().describe('User ID of the comment author (from notification)'),
comment_content: z.string().describe('Original comment content to match the notification'), comment_content: z.string().describe('Original comment content to match the notification'),
reply_content: z.string().min(1).describe('Reply text to send'), reply_content: z.string().min(1).describe('Reply text to send'),
}; };
/** xhs_get_unprocessed_notifications */
export const GetUnprocessedNotificationsSchema = {
max_count: z
.number()
.int()
.min(1)
.max(200)
.optional()
.default(20)
.describe('Maximum number of unprocessed notifications to return (1200, default 20)'),
statuses: z
.array(z.enum(['new', 'pending', 'failed']))
.optional()
.describe('Statuses to include. Defaults to ["new", "failed"]'),
sync: z
.boolean()
.optional()
.default(true)
.describe('Whether to sync latest notifications from Xiaohongshu before querying local state'),
};
/** xhs_favorite */ /** xhs_favorite */
export const FavoriteSchema = { export const FavoriteSchema = {
feed_id: z.string().describe('Feed ID to toggle favorite'), feed_id: z.string().describe('Feed ID to toggle favorite'),
+66
View File
@@ -0,0 +1,66 @@
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { describe, expect, it } from 'vitest';
import { NotificationStateStore } from '../src/platforms/xiaohongshu/notification-state.js';
import type { CommentNotification } from '../src/platforms/xiaohongshu/types.js';
function makeNotification(overrides?: Partial<CommentNotification>): CommentNotification {
return {
userId: 'u1',
nickname: 'tester',
avatar: 'https://example.com/a.png',
content: '你好',
type: '评论了你的笔记',
time: '1分钟前',
feedId: 'feed123',
xsecToken: 'token123',
noteImage: 'https://example.com/note.png',
...overrides,
};
}
describe('notification-state store', () => {
it('upserts notifications and tracks status transitions', async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'social-mcp-notif-'));
try {
const store = new NotificationStateStore(tempDir, 'test.db');
const n1 = makeNotification();
const first = store.upsertNotifications([n1]);
expect(first).toEqual({ fetched: 1, inserted: 1, updated: 0 });
const second = store.upsertNotifications([n1]);
expect(second).toEqual({ fetched: 1, inserted: 0, updated: 1 });
const openTasks = store.listByStatuses(['new'], 10);
expect(openTasks).toHaveLength(1);
expect(openTasks[0]?.notification.userId).toBe('u1');
const fp = store.findOpenFingerprint('u1', '你好');
expect(fp).toBeTypeOf('string');
if (!fp) {
throw new Error('fingerprint should not be null');
}
store.markPending(fp);
expect(store.listByStatuses(['pending'], 10)).toHaveLength(1);
store.markFailed(fp, 'network error');
const failed = store.listByStatuses(['failed'], 10);
expect(failed).toHaveLength(1);
expect(failed[0]?.retryCount).toBe(1);
expect(failed[0]?.errorMessage).toBe('network error');
store.markReplied(fp, '收到,感谢反馈');
const replied = store.listByStatuses(['replied'], 10);
expect(replied).toHaveLength(1);
expect(replied[0]?.replyContent).toBe('收到,感谢反馈');
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
});