diff --git a/.env.example b/.env.example index 290c8fb..cf3bb07 100644 --- a/.env.example +++ b/.env.example @@ -12,3 +12,8 @@ HEADLESS=true # Logging # NODE_ENV=production # LOG_LEVEL=info + +# Notification automation (Xiaohongshu) +# XHS_NOTIFICATION_POLL_ENABLED=true +# XHS_NOTIFICATION_POLL_INTERVAL_SEC=60 +# XHS_NOTIFICATION_POLL_MAX_COUNT=20 diff --git a/README.md b/README.md index 007c891..4130b22 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,11 @@ Multi-platform social media automation service that exposes browser-based action ## Features -- **17 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications) +- **17 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications, automation) - **REST API** with Bearer token authentication and per-route rate limiting - **Browser automation** via `rebrowser-playwright` with per-platform serial queueing - **Cookie persistence** with file-based storage (`0600`, atomic writes) +- **Notification task state** in SQLite (`$COOKIE_DIR/xiaohongshu/automation.db`) for idempotent auto-reply workflows - **Web dashboard** (React + Vite) for login, feed exploration, publishing, and API testing - **Security controls**: timing-safe token comparison, bind-address safety gate, error sanitization, log redaction - **Docker deployment** support with hardened runtime defaults @@ -102,7 +103,7 @@ Add this in `claude_desktop_config.json`: | `xhs_reply_comment` | Reply to a comment | | `xhs_like` | Toggle like state on a note | | `xhs_favorite` | Toggle favorite state on a note | -| `xhs_get_comment_notifications` | Get unread comment/@ notifications | +| `xhs_get_unprocessed_notifications` | Get unprocessed notification tasks from local SQLite state | | `xhs_reply_notification` | Reply to a specific notification | ## REST API @@ -150,6 +151,7 @@ Read endpoints are limited to **60/min** per IP. Write endpoints are limited to | `POST` | `/api/xhs/like` | Toggle like | 10/min | | `POST` | `/api/xhs/favorite` | Toggle favorite | 10/min | | `GET` | `/api/xhs/notifications/comments` | Get comment notifications | 60/min | +| `GET` | `/api/xhs/notifications/unprocessed` | Get unprocessed notification tasks from SQLite | 60/min | | `POST` | `/api/xhs/notifications/reply` | Reply to notification | 10/min | ### Response Format @@ -228,6 +230,9 @@ Note: Docker defaults expose port `3000` because container env sets `PORT=3000`. | `COOKIE_DIR` | `~/.social-mcp` | Cookie/token storage directory | | `MAX_QUEUE_DEPTH` | `10` | Max pending operations per platform queue | | `ALLOW_REMOTE` | (unset) | Must be `yes-i-understand-the-risk` to allow public bind | +| `XHS_NOTIFICATION_POLL_ENABLED` | `true` | Enable periodic notification sync into SQLite | +| `XHS_NOTIFICATION_POLL_INTERVAL_SEC` | `60` | Notification sync interval (seconds, min 15) | +| `XHS_NOTIFICATION_POLL_MAX_COUNT` | `20` | Max notifications fetched per sync | ## Project Structure @@ -264,7 +269,9 @@ social-mcp/ │ ├── publish-video.ts │ ├── comment.ts │ ├── interaction.ts -│ └── notification.ts +│ ├── notification.ts +│ ├── notification-state.ts +│ └── notification-sync.ts └── web/ └── src/ ``` diff --git a/README.zh-CN.md b/README.zh-CN.md index 9f65e82..6761df5 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -6,10 +6,11 @@ ## 功能特性 -- 小红书 **17 个 MCP 工具**(登录、浏览、发布、互动、通知) +- 小红书 **17 个 MCP 工具**(登录、浏览、发布、互动、通知、自动化) - 带 Bearer Token 鉴权与按路由限流的 REST API - 基于 `rebrowser-playwright` 的浏览器自动化,按平台串行队列执行 - 文件型 Cookie 持久化(`0600` 权限、原子写入) +- 通知任务状态入库 SQLite(`$COOKIE_DIR/xiaohongshu/automation.db`),保障自动回复幂等 - Web 控制台(React + Vite):登录、内容浏览、发布、接口测试 - 安全控制:Token 常量时间比对、绑定地址安全门、错误信息脱敏、日志字段脱敏 - 支持 Docker 部署 @@ -102,7 +103,7 @@ pnpm test | `xhs_reply_comment` | 回复评论 | | `xhs_like` | 切换点赞状态 | | `xhs_favorite` | 切换收藏状态 | -| `xhs_get_comment_notifications` | 获取未读评论/@通知 | +| `xhs_get_unprocessed_notifications` | 从本地 SQLite 获取“未处理”通知任务 | | `xhs_reply_notification` | 对通知进行回复 | ## REST API @@ -151,6 +152,7 @@ curl -X POST \ | `POST` | `/api/xhs/like` | 点赞切换 | 10/min | | `POST` | `/api/xhs/favorite` | 收藏切换 | 10/min | | `GET` | `/api/xhs/notifications/comments` | 获取评论通知 | 60/min | +| `GET` | `/api/xhs/notifications/unprocessed` | 获取本地未处理通知任务 | 60/min | | `POST` | `/api/xhs/notifications/reply` | 回复通知 | 10/min | ### 返回格式 @@ -229,6 +231,9 @@ docker run -d \ | `COOKIE_DIR` | `~/.social-mcp` | Cookie/Token 存储目录 | | `MAX_QUEUE_DEPTH` | `10` | 单平台最大排队深度 | | `ALLOW_REMOTE` | 空 | 仅当值为 `yes-i-understand-the-risk` 时允许公网绑定 | +| `XHS_NOTIFICATION_POLL_ENABLED` | `true` | 是否启用通知定时同步到 SQLite | +| `XHS_NOTIFICATION_POLL_INTERVAL_SEC` | `60` | 通知同步周期(秒,最小 15) | +| `XHS_NOTIFICATION_POLL_MAX_COUNT` | `20` | 每次同步最多拉取通知数 | ## 项目结构 @@ -265,7 +270,9 @@ social-mcp/ │ ├── publish-video.ts │ ├── comment.ts │ ├── interaction.ts -│ └── notification.ts +│ ├── notification.ts +│ ├── notification-state.ts +│ └── notification-sync.ts └── web/ └── src/ ``` diff --git a/src/config/index.ts b/src/config/index.ts index d2072cb..ea35472 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -93,6 +93,12 @@ export interface AppConfig { maxQueueDepth: number; /** Per-operation-type timeout in ms */ operationTimeouts: Record; + /** Whether the XHS notification poller is enabled */ + notificationPollEnabled: boolean; + /** Poll interval for XHS notifications (ms) */ + notificationPollIntervalMs: number; + /** Max notifications fetched per poll */ + notificationPollMaxCount: number; } // --------------------------------------------------------------------------- @@ -109,4 +115,7 @@ export const config: AppConfig = { cookieDir: envString('COOKIE_DIR', path.join(os.homedir(), '.social-mcp')), maxQueueDepth: envInt('MAX_QUEUE_DEPTH', 10), operationTimeouts, + notificationPollEnabled: envBool('XHS_NOTIFICATION_POLL_ENABLED', true), + notificationPollIntervalMs: Math.max(15, envInt('XHS_NOTIFICATION_POLL_INTERVAL_SEC', 60)) * 1000, + notificationPollMaxCount: Math.max(1, Math.min(50, envInt('XHS_NOTIFICATION_POLL_MAX_COUNT', 20))), }; diff --git a/src/platforms/xiaohongshu/index.ts b/src/platforms/xiaohongshu/index.ts index b14683d..8767c13 100644 --- a/src/platforms/xiaohongshu/index.ts +++ b/src/platforms/xiaohongshu/index.ts @@ -15,7 +15,9 @@ import { publishVideoNote } from './publish-video.js'; import { listMyNotes } from './my-notes.js'; import { postComment, replyComment } from './comment.js'; import { toggleLike, toggleFavorite } from './interaction.js'; -import { getCommentNotifications, replyNotification } from './notification.js'; +import { replyNotification } from './notification.js'; +import { getNotificationStateStore, type NotificationTaskStatus } from './notification-state.js'; +import { syncCommentNotifications, xhsNotificationPoller } from './notification-sync.js'; import { createXhsRoutes } from './routes.js'; import { CheckLoginSchema, @@ -33,8 +35,8 @@ import { ReplyCommentSchema, LikeSchema, FavoriteSchema, - GetCommentNotificationsSchema, ReplyNotificationSchema, + GetUnprocessedNotificationsSchema, } from './schemas.js'; import type { SearchFilters } from './types.js'; import type { PlatformPlugin } from '../../server/app.js'; @@ -63,10 +65,16 @@ export const xiaohongshuPlugin: PlatformPlugin = { // ========================================================================= registerRoutes(router: Router, browser: BrowserManager): void { + xhsNotificationPoller.start(browser); const xhsRouter = createXhsRoutes(browser); router.use('/', xhsRouter); }, + shutdown(): Promise { + xhsNotificationPoller.stop(); + return Promise.resolve(); + }, + // ========================================================================= // MCP tools // ========================================================================= @@ -597,29 +605,35 @@ export const xiaohongshuPlugin: PlatformPlugin = { // ===================================================================== // ----------------------------------------------------------------------- - // xhs_get_comment_notifications + // xhs_get_unprocessed_notifications // ----------------------------------------------------------------------- server.tool( - 'xhs_get_comment_notifications', - 'Get unread comment and @ notifications from Xiaohongshu. Returns feedId + xsecToken per notification — use xhs_get_feed_detail to read the note content before replying so your response is contextually relevant.', - GetCommentNotificationsSchema, + 'xhs_get_unprocessed_notifications', + 'Get unprocessed notification tasks from local state (SQLite). Optionally sync latest notifications first.', + GetUnprocessedNotificationsSchema, async (args) => { - return withErrorHandling('xhs_get_comment_notifications', async () => { - const timeoutMs = - config.operationTimeouts['feed_detail'] ?? - config.operationTimeouts['default'] ?? - 60_000; + return withErrorHandling('xhs_get_unprocessed_notifications', async () => { + let syncResult: { fetched: number; inserted: number; updated: number } | null = null; + if (args.sync) { + syncResult = await syncCommentNotifications(browser, args.max_count); + } - const notifications = await browser.withPage( - PLATFORM, - async (page) => - getCommentNotifications(page, args.max_count), - timeoutMs, - ); + const statuses: NotificationTaskStatus[] = + args.statuses && args.statuses.length > 0 + ? args.statuses + : ['new', 'failed']; + + const tasks = getNotificationStateStore().listByStatuses(statuses, args.max_count); return { - content: [{ type: 'text' as const, text: JSON.stringify(notifications) }], + content: [{ + type: 'text' as const, + text: JSON.stringify({ + tasks, + ...(syncResult ? { synced: syncResult } : {}), + }), + }], }; }); }, @@ -635,21 +649,53 @@ export const xiaohongshuPlugin: PlatformPlugin = { ReplyNotificationSchema, async (args) => { return withErrorHandling('xhs_reply_notification', async () => { + const targetFingerprint = + args.fingerprint ?? + getNotificationStateStore().findOpenFingerprint(args.user_id, args.comment_content); + if (targetFingerprint) { + getNotificationStateStore().markPending(targetFingerprint); + } + const timeoutMs = config.operationTimeouts['reply'] ?? config.operationTimeouts['default'] ?? 20_000; - const result = await browser.withPage( - PLATFORM, - async (page) => - replyNotification(page, args.user_id, args.comment_content, args.reply_content), - timeoutMs, - ); + try { + const result = await browser.withPage( + PLATFORM, + async (page) => + replyNotification(page, args.user_id, args.comment_content, args.reply_content), + timeoutMs, + ); - return { - content: [{ type: 'text' as const, text: JSON.stringify(result) }], - }; + if (targetFingerprint) { + if (result.success) { + getNotificationStateStore().markReplied(targetFingerprint, args.reply_content); + } else { + getNotificationStateStore().markFailed( + targetFingerprint, + 'Reply action returned success=false', + ); + } + } + + return { + content: [{ + type: 'text' as const, + text: JSON.stringify({ + ...result, + ...(targetFingerprint ? { fingerprint: targetFingerprint } : {}), + }), + }], + }; + } catch (err) { + if (targetFingerprint) { + const message = err instanceof Error ? err.message : String(err); + getNotificationStateStore().markFailed(targetFingerprint, message); + } + throw err; + } }); }, ); diff --git a/src/platforms/xiaohongshu/notification-state.ts b/src/platforms/xiaohongshu/notification-state.ts new file mode 100644 index 0000000..7c6b842 --- /dev/null +++ b/src/platforms/xiaohongshu/notification-state.ts @@ -0,0 +1,317 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import path from 'node:path'; +import { DatabaseSync } from 'node:sqlite'; + +import { config } from '../../config/index.js'; +import { logger } from '../../utils/logger.js'; +import type { CommentNotification } from './types.js'; + +export type NotificationTaskStatus = + | 'new' + | 'pending' + | 'replied' + | 'failed' + | 'ignored'; + +interface NotificationRow { + fingerprint: string; + user_id: string; + nickname: string; + avatar: string; + content: string; + type: string; + time: string; + feed_id: string; + xsec_token: string; + note_image: string; + status: NotificationTaskStatus; + first_seen_at: number; + last_seen_at: number; + retry_count: number; + last_attempt_at: number | null; + replied_at: number | null; + reply_content: string | null; + error_message: string | null; +} + +export interface NotificationTask { + fingerprint: string; + notification: CommentNotification; + status: NotificationTaskStatus; + firstSeenAt: string; + lastSeenAt: string; + retryCount: number; + lastAttemptAt?: string; + repliedAt?: string; + replyContent?: string; + errorMessage?: string; +} + +export interface NotificationUpsertResult { + fetched: number; + inserted: number; + updated: number; +} + +const PLATFORM = 'xiaohongshu'; +const DB_FILENAME = 'automation.db'; +const log = logger.child({ module: 'xhs-notification-state' }); + +export class NotificationStateStore { + private readonly db: DatabaseSync; + private readonly dbPath: string; + + constructor(baseDir = config.cookieDir, dbFilename = DB_FILENAME) { + const dir = path.join(baseDir, PLATFORM); + fs.mkdirSync(dir, { recursive: true, mode: 0o700 }); + this.dbPath = path.join(dir, dbFilename); + + this.db = new DatabaseSync(this.dbPath); + this.db.exec('PRAGMA journal_mode = WAL;'); + this.db.exec('PRAGMA synchronous = NORMAL;'); + + this.db.exec(` + CREATE TABLE IF NOT EXISTS notification_tasks ( + fingerprint TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + nickname TEXT NOT NULL, + avatar TEXT NOT NULL, + content TEXT NOT NULL, + type TEXT NOT NULL, + time TEXT NOT NULL, + feed_id TEXT NOT NULL, + xsec_token TEXT NOT NULL, + note_image TEXT NOT NULL, + status TEXT NOT NULL, + first_seen_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + last_attempt_at INTEGER, + replied_at INTEGER, + reply_content TEXT, + error_message TEXT + ); + CREATE INDEX IF NOT EXISTS idx_notification_tasks_status_first_seen + ON notification_tasks(status, first_seen_at); + CREATE INDEX IF NOT EXISTS idx_notification_tasks_user_content_status + ON notification_tasks(user_id, content, status); + `); + + log.info({ dbPath: this.dbPath }, 'Notification state store initialized'); + } + + buildFingerprint(notification: CommentNotification): string { + const payload = [ + notification.feedId, + notification.userId, + notification.content.trim(), + notification.time.trim(), + notification.type.trim(), + ].join('|'); + + return crypto.createHash('sha256').update(payload).digest('hex'); + } + + upsertNotifications(notifications: CommentNotification[]): NotificationUpsertResult { + if (notifications.length === 0) { + return { fetched: 0, inserted: 0, updated: 0 }; + } + + const now = Date.now(); + let inserted = 0; + let updated = 0; + + const selectStmt = this.db.prepare( + 'SELECT fingerprint FROM notification_tasks WHERE fingerprint = ?', + ); + const insertStmt = this.db.prepare(` + INSERT INTO notification_tasks ( + fingerprint, user_id, nickname, avatar, content, type, time, + feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'new', ?, ?) + `); + const updateStmt = this.db.prepare(` + UPDATE notification_tasks + SET + nickname = ?, + avatar = ?, + type = ?, + time = ?, + feed_id = ?, + xsec_token = ?, + note_image = ?, + last_seen_at = ? + WHERE fingerprint = ? + `); + + this.db.exec('BEGIN'); + try { + for (const n of notifications) { + const fp = this.buildFingerprint(n); + const exists = selectStmt.get(fp) as { fingerprint: string } | undefined; + + if (!exists) { + insertStmt.run( + fp, + n.userId, + n.nickname, + n.avatar, + n.content, + n.type, + n.time, + n.feedId, + n.xsecToken, + n.noteImage, + now, + now, + ); + inserted++; + } else { + updateStmt.run( + n.nickname, + n.avatar, + n.type, + n.time, + n.feedId, + n.xsecToken, + n.noteImage, + now, + fp, + ); + updated++; + } + } + + this.db.exec('COMMIT'); + } catch (err) { + this.db.exec('ROLLBACK'); + throw err; + } + + return { + fetched: notifications.length, + inserted, + updated, + }; + } + + listByStatuses( + statuses: NotificationTaskStatus[], + maxCount: number, + ): NotificationTask[] { + if (statuses.length === 0) return []; + + const placeholders = statuses.map(() => '?').join(', '); + const query = ` + SELECT + fingerprint, user_id, nickname, avatar, content, type, time, + feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at, + retry_count, last_attempt_at, replied_at, reply_content, error_message + FROM notification_tasks + WHERE status IN (${placeholders}) + ORDER BY first_seen_at ASC + LIMIT ? + `; + + const stmt = this.db.prepare(query); + const rows = stmt.all(...statuses, maxCount) as unknown as NotificationRow[]; + return rows.map((r) => this.rowToTask(r)); + } + + findOpenFingerprint(userId: string, content: string): string | null { + const stmt = this.db.prepare(` + SELECT fingerprint + FROM notification_tasks + WHERE user_id = ? + AND content = ? + AND status IN ('new', 'failed', 'pending') + ORDER BY first_seen_at ASC + LIMIT 1 + `); + const row = stmt.get(userId, content) as { fingerprint: string } | undefined; + return row?.fingerprint ?? null; + } + + markPending(fingerprint: string): void { + const now = Date.now(); + const stmt = this.db.prepare(` + UPDATE notification_tasks + SET status = 'pending', last_attempt_at = ?, error_message = NULL + WHERE fingerprint = ? + `); + stmt.run(now, fingerprint); + } + + markReplied(fingerprint: string, replyContent: string): void { + const now = Date.now(); + const stmt = this.db.prepare(` + UPDATE notification_tasks + SET + status = 'replied', + replied_at = ?, + last_attempt_at = ?, + reply_content = ?, + error_message = NULL + WHERE fingerprint = ? + `); + stmt.run(now, now, replyContent, fingerprint); + } + + markFailed(fingerprint: string, errorMessage: string): void { + const now = Date.now(); + const stmt = this.db.prepare(` + UPDATE notification_tasks + SET + status = 'failed', + retry_count = retry_count + 1, + last_attempt_at = ?, + error_message = ? + WHERE fingerprint = ? + `); + stmt.run(now, errorMessage, fingerprint); + } + + markIgnored(fingerprint: string, reason?: string): void { + const stmt = this.db.prepare(` + UPDATE notification_tasks + SET status = 'ignored', error_message = ? + WHERE fingerprint = ? + `); + stmt.run(reason ?? 'Ignored by operator', fingerprint); + } + + private rowToTask(row: NotificationRow): NotificationTask { + return { + fingerprint: row.fingerprint, + notification: { + userId: row.user_id, + nickname: row.nickname, + avatar: row.avatar, + content: row.content, + type: row.type, + time: row.time, + feedId: row.feed_id, + xsecToken: row.xsec_token, + noteImage: row.note_image, + }, + status: row.status, + firstSeenAt: new Date(row.first_seen_at).toISOString(), + lastSeenAt: new Date(row.last_seen_at).toISOString(), + retryCount: row.retry_count, + ...(row.last_attempt_at ? { lastAttemptAt: new Date(row.last_attempt_at).toISOString() } : {}), + ...(row.replied_at ? { repliedAt: new Date(row.replied_at).toISOString() } : {}), + ...(row.reply_content ? { replyContent: row.reply_content } : {}), + ...(row.error_message ? { errorMessage: row.error_message } : {}), + }; + } +} + +let storeSingleton: NotificationStateStore | null = null; + +export function getNotificationStateStore(): NotificationStateStore { + if (!storeSingleton) { + storeSingleton = new NotificationStateStore(); + } + return storeSingleton; +} diff --git a/src/platforms/xiaohongshu/notification-sync.ts b/src/platforms/xiaohongshu/notification-sync.ts new file mode 100644 index 0000000..5187a84 --- /dev/null +++ b/src/platforms/xiaohongshu/notification-sync.ts @@ -0,0 +1,90 @@ +import type { BrowserManager } from '../../browser/manager.js'; +import { config } from '../../config/index.js'; +import { logger } from '../../utils/logger.js'; +import { getCommentNotifications } from './notification.js'; +import { getNotificationStateStore, type NotificationUpsertResult } from './notification-state.js'; + +const PLATFORM = 'xiaohongshu'; +const log = logger.child({ module: 'xhs-notification-sync' }); + +export async function syncCommentNotifications( + browser: BrowserManager, + maxCount = config.notificationPollMaxCount, +): Promise { + const timeoutMs = + config.operationTimeouts['feed_detail'] ?? + config.operationTimeouts['default'] ?? + 60_000; + + const notifications = await browser.withPage( + PLATFORM, + async (page) => getCommentNotifications(page, maxCount), + timeoutMs, + ); + + const result = getNotificationStateStore().upsertNotifications(notifications); + if (result.fetched > 0) { + log.info(result, 'Notifications synced to state store'); + } + return result; +} + +export class XhsNotificationPoller { + private timer: ReturnType | null = null; + private running = false; + private started = false; + + start(browser: BrowserManager): void { + if (this.started) return; + this.started = true; + + if (!config.notificationPollEnabled) { + log.info('Notification poller disabled by config'); + return; + } + + const tick = async (): Promise => { + if (this.running) return; + this.running = true; + + try { + await syncCommentNotifications(browser, config.notificationPollMaxCount); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + // Login expiration and empty unread state are expected in daily usage. + log.debug({ message }, 'Notification poll tick skipped'); + } finally { + this.running = false; + } + }; + + void tick(); + this.timer = setInterval(() => { + void tick(); + }, config.notificationPollIntervalMs); + + if (this.timer && typeof this.timer === 'object' && 'unref' in this.timer) { + this.timer.unref(); + } + + log.info( + { + intervalMs: config.notificationPollIntervalMs, + maxCount: config.notificationPollMaxCount, + }, + 'Notification poller started', + ); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + this.started = false; + this.running = false; + log.info('Notification poller stopped'); + } +} + +export const xhsNotificationPoller = new XhsNotificationPoller(); diff --git a/src/platforms/xiaohongshu/routes.ts b/src/platforms/xiaohongshu/routes.ts index c03f1ab..652855e 100644 --- a/src/platforms/xiaohongshu/routes.ts +++ b/src/platforms/xiaohongshu/routes.ts @@ -20,6 +20,12 @@ import { listMyNotes } from './my-notes.js'; import { postComment, replyComment } from './comment.js'; import { toggleLike, toggleFavorite } from './interaction.js'; import { getCommentNotifications, replyNotification } from './notification.js'; +import { + getNotificationStateStore, + type NotificationTask, + type NotificationTaskStatus, +} from './notification-state.js'; +import { syncCommentNotifications } from './notification-sync.js'; import { SearchSchema, @@ -34,6 +40,7 @@ import { FavoriteSchema, GetCommentNotificationsSchema, ReplyNotificationSchema, + GetUnprocessedNotificationsSchema, } from './schemas.js'; import type { SearchFilters } from './types.js'; @@ -119,11 +126,18 @@ const GetCommentNotificationsQuerySchema = z.object({ }); const ReplyNotificationBodySchema = z.object({ + fingerprint: ReplyNotificationSchema.fingerprint, user_id: ReplyNotificationSchema.user_id, comment_content: ReplyNotificationSchema.comment_content, reply_content: ReplyNotificationSchema.reply_content, }); +const GetUnprocessedNotificationsQuerySchema = z.object({ + max_count: GetUnprocessedNotificationsSchema.max_count, + sync: GetUnprocessedNotificationsSchema.sync, + statuses: GetUnprocessedNotificationsSchema.statuses, +}); + const LikeBodySchema = z.object({ feed_id: LikeSchema.feed_id, xsec_token: LikeSchema.xsec_token, @@ -654,6 +668,7 @@ export function createXhsRoutes(browser: BrowserManager): Router { timeoutMs, ); + getNotificationStateStore().upsertNotifications(notifications); res.json(successResponse(notifications) as ApiResponse); } catch (err) { handleError(res, err); @@ -668,20 +683,96 @@ export function createXhsRoutes(browser: BrowserManager): Router { void (async () => { try { const body = ReplyNotificationBodySchema.parse(req.body); + const targetFingerprint = + body.fingerprint ?? + getNotificationStateStore().findOpenFingerprint(body.user_id, body.comment_content); + if (targetFingerprint) { + getNotificationStateStore().markPending(targetFingerprint); + } const timeoutMs = config.operationTimeouts['reply'] ?? config.operationTimeouts['default'] ?? 20_000; - const result = await browser.withPage( - PLATFORM, - async (page) => - replyNotification(page, body.user_id, body.comment_content, body.reply_content), - timeoutMs, - ); + try { + const result = await browser.withPage( + PLATFORM, + async (page) => + replyNotification(page, body.user_id, body.comment_content, body.reply_content), + timeoutMs, + ); - res.json(successResponse(result) as ApiResponse); + if (targetFingerprint) { + if (result.success) { + getNotificationStateStore().markReplied(targetFingerprint, body.reply_content); + } else { + getNotificationStateStore().markFailed( + targetFingerprint, + 'Reply action returned success=false', + ); + } + } + + res.json(successResponse({ + ...result, + ...(targetFingerprint ? { fingerprint: targetFingerprint } : {}), + }) as ApiResponse); + } catch (err) { + if (targetFingerprint) { + const message = err instanceof Error ? err.message : String(err); + getNotificationStateStore().markFailed(targetFingerprint, message); + } + throw err; + } + } catch (err) { + handleError(res, err); + } + })(); + }); + + // ----------------------------------------------------------------------- + // GET /notifications/unprocessed + // ----------------------------------------------------------------------- + router.get('/notifications/unprocessed', readRateLimiter, (req, res) => { + void (async () => { + try { + const statusesFromQuery = (() => { + const raw = req.query.statuses; + if (!raw) return undefined; + if (Array.isArray(raw)) return raw.flatMap((s) => String(s).split(',')); + return String(raw).split(','); + })() + ?.map((s) => s.trim()) + .filter((s) => s.length > 0); + + const parsed = GetUnprocessedNotificationsQuerySchema.parse({ + max_count: req.query.max_count ? Number(req.query.max_count) : undefined, + sync: req.query.sync === undefined + ? undefined + : ['1', 'true', 'yes'].includes(String(req.query.sync).toLowerCase()), + statuses: statusesFromQuery, + }); + + let syncResult: { fetched: number; inserted: number; updated: number } | null = null; + if (parsed.sync) { + syncResult = await syncCommentNotifications(browser, parsed.max_count); + } + + const statuses: NotificationTaskStatus[] = + parsed.statuses && parsed.statuses.length > 0 + ? parsed.statuses + : ['new', 'failed']; + + const tasks = getNotificationStateStore().listByStatuses(statuses, parsed.max_count); + + res.json(successResponse({ + tasks, + ...(syncResult ? { synced: syncResult } : {}), + }) as ApiResponse<{ + tasks: NotificationTask[]; + synced?: { fetched: number; inserted: number; updated: number }; + }>); } catch (err) { handleError(res, err); } diff --git a/src/platforms/xiaohongshu/schemas.ts b/src/platforms/xiaohongshu/schemas.ts index f040bef..6e346c6 100644 --- a/src/platforms/xiaohongshu/schemas.ts +++ b/src/platforms/xiaohongshu/schemas.ts @@ -161,11 +161,36 @@ export const GetCommentNotificationsSchema = { /** xhs_reply_notification */ export const ReplyNotificationSchema = { + fingerprint: z + .string() + .optional() + .describe('Optional notification fingerprint from xhs_get_unprocessed_notifications'), user_id: z.string().describe('User ID of the comment author (from notification)'), comment_content: z.string().describe('Original comment content to match the notification'), reply_content: z.string().min(1).describe('Reply text to send'), }; +/** xhs_get_unprocessed_notifications */ +export const GetUnprocessedNotificationsSchema = { + max_count: z + .number() + .int() + .min(1) + .max(200) + .optional() + .default(20) + .describe('Maximum number of unprocessed notifications to return (1–200, default 20)'), + statuses: z + .array(z.enum(['new', 'pending', 'failed'])) + .optional() + .describe('Statuses to include. Defaults to ["new", "failed"]'), + sync: z + .boolean() + .optional() + .default(true) + .describe('Whether to sync latest notifications from Xiaohongshu before querying local state'), +}; + /** xhs_favorite */ export const FavoriteSchema = { feed_id: z.string().describe('Feed ID to toggle favorite'), diff --git a/test/notification-state.test.ts b/test/notification-state.test.ts new file mode 100644 index 0000000..2b40f66 --- /dev/null +++ b/test/notification-state.test.ts @@ -0,0 +1,66 @@ +import fs from 'node:fs/promises'; +import os from 'node:os'; +import path from 'node:path'; + +import { describe, expect, it } from 'vitest'; + +import { NotificationStateStore } from '../src/platforms/xiaohongshu/notification-state.js'; +import type { CommentNotification } from '../src/platforms/xiaohongshu/types.js'; + +function makeNotification(overrides?: Partial): CommentNotification { + return { + userId: 'u1', + nickname: 'tester', + avatar: 'https://example.com/a.png', + content: '你好', + type: '评论了你的笔记', + time: '1分钟前', + feedId: 'feed123', + xsecToken: 'token123', + noteImage: 'https://example.com/note.png', + ...overrides, + }; +} + +describe('notification-state store', () => { + it('upserts notifications and tracks status transitions', async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'social-mcp-notif-')); + try { + const store = new NotificationStateStore(tempDir, 'test.db'); + const n1 = makeNotification(); + + const first = store.upsertNotifications([n1]); + expect(first).toEqual({ fetched: 1, inserted: 1, updated: 0 }); + + const second = store.upsertNotifications([n1]); + expect(second).toEqual({ fetched: 1, inserted: 0, updated: 1 }); + + const openTasks = store.listByStatuses(['new'], 10); + expect(openTasks).toHaveLength(1); + expect(openTasks[0]?.notification.userId).toBe('u1'); + + const fp = store.findOpenFingerprint('u1', '你好'); + expect(fp).toBeTypeOf('string'); + + if (!fp) { + throw new Error('fingerprint should not be null'); + } + + store.markPending(fp); + expect(store.listByStatuses(['pending'], 10)).toHaveLength(1); + + store.markFailed(fp, 'network error'); + const failed = store.listByStatuses(['failed'], 10); + expect(failed).toHaveLength(1); + expect(failed[0]?.retryCount).toBe(1); + expect(failed[0]?.errorMessage).toBe('network error'); + + store.markReplied(fp, '收到,感谢反馈'); + const replied = store.listByStatuses(['replied'], 10); + expect(replied).toHaveLength(1); + expect(replied[0]?.replyContent).toBe('收到,感谢反馈'); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); +});