From 892e76f7ed87b2c0054dc9d61dfd7b844ec45249 Mon Sep 17 00:00:00 2001 From: kurihada Date: Tue, 3 Mar 2026 12:14:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=B0=8F=E7=BA=A2=E4=B9=A6MC?= =?UTF-8?q?P=EF=BC=9A=E7=BB=9F=E4=B8=80=E8=BF=94=E5=9B=9E=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E5=B9=B6=E5=A2=9E=E5=BC=BA=E5=88=86=E9=A1=B5=E3=80=81?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E4=B8=8E=E5=B9=82=E7=AD=89=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 +- README.zh-CN.md | 4 +- src/platforms/xiaohongshu/index.ts | 816 +++++++++++++++++---------- src/platforms/xiaohongshu/routes.ts | 41 +- src/platforms/xiaohongshu/schemas.ts | 116 +++- src/utils/errors.ts | 9 +- src/utils/idempotency.ts | 115 ++++ test/errors.test.ts | 14 +- 8 files changed, 790 insertions(+), 329 deletions(-) create mode 100644 src/utils/idempotency.ts diff --git a/README.md b/README.md index bcc88f1..736bbee 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Multi-platform social media automation service that exposes browser-based action ## Features -- **20 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications, automation) +- **22 MCP tools** for Xiaohongshu (login, browsing, publishing, interactions, notifications, automation) - **REST API** with Bearer token authentication and per-route rate limiting - **Browser automation** via `rebrowser-playwright` with per-platform serial queueing - **Cookie persistence** with file-based storage (`0600`, atomic writes) @@ -105,8 +105,10 @@ Add this in `claude_desktop_config.json`: | `xhs_favorite` | Toggle favorite state on a note | | `xhs_get_unprocessed_notifications` | Get unprocessed notification tasks from local SQLite state | | `xhs_mark_notification_task` | Manually mark notification task status (new/pending/ignored/replied/failed) | +| `xhs_mark_notification_tasks` | Batch mark notification task statuses | | `xhs_list_failed_notification_tasks` | List failed notification tasks for triage/retry | | `xhs_retry_notification_task` | Retry a failed notification task by fingerprint | +| `xhs_retry_notification_tasks` | Batch retry failed notification tasks | | `xhs_reply_notification` | Reply to a specific notification | ## REST API diff --git a/README.zh-CN.md b/README.zh-CN.md index 6ea163c..6345735 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -6,7 +6,7 @@ ## 功能特性 -- 小红书 **20 个 MCP 工具**(登录、浏览、发布、互动、通知、自动化) +- 小红书 **22 个 MCP 工具**(登录、浏览、发布、互动、通知、自动化) - 带 Bearer Token 鉴权与按路由限流的 REST API - 基于 `rebrowser-playwright` 的浏览器自动化,按平台串行队列执行 - 文件型 Cookie 持久化(`0600` 权限、原子写入) @@ -105,8 +105,10 @@ pnpm test | `xhs_favorite` | 切换收藏状态 | | `xhs_get_unprocessed_notifications` | 从本地 SQLite 获取“未处理”通知任务 | | `xhs_mark_notification_task` | 手动标记通知任务状态(new/pending/ignored/replied/failed) | +| `xhs_mark_notification_tasks` | 批量标记通知任务状态 | | `xhs_list_failed_notification_tasks` | 获取失败通知任务列表(用于排障/重试) | | `xhs_retry_notification_task` | 按 fingerprint 重试失败通知任务 | +| `xhs_retry_notification_tasks` | 批量重试失败通知任务 | | `xhs_reply_notification` | 对通知进行回复 | ## REST API diff --git a/src/platforms/xiaohongshu/index.ts b/src/platforms/xiaohongshu/index.ts index fefa59d..f427e9d 100644 --- a/src/platforms/xiaohongshu/index.ts +++ b/src/platforms/xiaohongshu/index.ts @@ -3,8 +3,12 @@ import type { Router } from 'express'; import type { BrowserManager } from '../../browser/manager.js'; import { config } from '../../config/index.js'; -import { withErrorHandling } from '../../utils/errors.js'; +import { withErrorHandling, type McpToolResult } from '../../utils/errors.js'; import { resolveMediaInput, cleanupFile } from '../../utils/downloader.js'; +import { + getIdempotencyStore, + computeIdempotencyHash, +} from '../../utils/idempotency.js'; import { checkLoginStatus, getLoginQRCode, deleteCookies } from './login.js'; import { listFeeds } from './feeds.js'; import { searchFeeds } from './search.js'; @@ -16,7 +20,11 @@ import { listMyNotes } from './my-notes.js'; import { postComment, replyComment } from './comment.js'; import { toggleLike, toggleFavorite } from './interaction.js'; import { replyNotification } from './notification.js'; -import { getNotificationStateStore, type NotificationTaskStatus } from './notification-state.js'; +import { + getNotificationStateStore, + type NotificationTask, + type NotificationTaskStatus, +} from './notification-state.js'; import { syncCommentNotifications, xhsNotificationPoller } from './notification-sync.js'; import { createXhsRoutes } from './routes.js'; import { @@ -38,8 +46,10 @@ import { ReplyNotificationSchema, GetUnprocessedNotificationsSchema, MarkNotificationTaskSchema, + MarkNotificationTasksSchema, ListFailedNotificationTasksSchema, RetryNotificationTaskSchema, + RetryNotificationTasksSchema, } from './schemas.js'; import type { SearchFilters } from './types.js'; import type { PlatformPlugin } from '../../server/app.js'; @@ -56,6 +66,190 @@ const VIDEO_MAX_SIZE_MB = 500; /** Maximum file size for image uploads (20 MB — default in validateMediaPath). */ const IMAGE_MAX_SIZE_MB = 20; +/** Default page size for list-like MCP tools. */ +const DEFAULT_PAGE_SIZE = 20; + +/** Maximum page size for list-like MCP tools. */ +const MAX_PAGE_SIZE = 200; + +type McpMeta = Record; + +function ok(data: unknown, meta?: McpMeta): McpToolResult { + return { + content: [{ + type: 'text', + text: JSON.stringify({ + success: true, + data, + meta: meta ?? {}, + }), + }], + }; +} + +function parseCursor(cursor?: string): number { + if (!cursor) return 0; + if (!/^\d+$/.test(cursor)) { + throw new Error('Invalid cursor: must be a non-negative integer string'); + } + return Number.parseInt(cursor, 10); +} + +function clampPageSize(maxCount?: number): number { + return Math.min(MAX_PAGE_SIZE, Math.max(1, maxCount ?? DEFAULT_PAGE_SIZE)); +} + +function paginateArray( + items: T[], + opts?: { max_count?: number; cursor?: string }, +): { items: T[]; meta: McpMeta } { + const limit = clampPageSize(opts?.max_count); + const offset = parseCursor(opts?.cursor); + const sliced = items.slice(offset, offset + limit); + const nextOffset = offset + sliced.length; + const nextCursor = nextOffset < items.length ? String(nextOffset) : undefined; + + return { + items: sliced, + meta: { + pagination: { + cursor: opts?.cursor ?? '0', + max_count: limit, + returned: sliced.length, + total: items.length, + ...(nextCursor ? { next_cursor: nextCursor } : {}), + }, + }, + }; +} + +async function runWithIdempotency( + toolName: string, + requestId: string | undefined, + inputForHash: unknown, + execute: () => Promise, +): Promise<{ data: T; meta?: McpMeta }> { + if (!requestId) { + return { data: await execute() }; + } + + const store = getIdempotencyStore(); + const inputHash = computeIdempotencyHash(inputForHash); + const existing = store.get(toolName, requestId); + + if (existing) { + if (existing.inputHash !== inputHash) { + throw new Error('request_id already used with different parameters'); + } + + return { + data: existing.responseData as T, + meta: { + request_id: requestId, + idempotent_replay: true, + first_processed_at: existing.createdAt, + }, + }; + } + + const data = await execute(); + store.put(toolName, requestId, inputHash, data); + return { + data, + meta: { + request_id: requestId, + idempotent_replay: false, + }, + }; +} + +async function retryFailedNotificationTaskByFingerprint( + browser: BrowserManager, + fingerprint: string, + replyContentOverride?: string, +): Promise<{ success: boolean; fingerprint: string }> { + const store = getNotificationStateStore(); + const task = store.getByFingerprint(fingerprint); + if (!task) { + throw new Error(`Notification task not found: ${fingerprint}`); + } + if (task.status !== 'failed') { + throw new Error(`Task status must be failed to retry, got: ${task.status}`); + } + + const replyContent = replyContentOverride ?? task.replyContent; + if (!replyContent) { + throw new Error('Retry requires reply_content when task has no stored replyContent'); + } + + store.markPending(fingerprint); + + const timeoutMs = + config.operationTimeouts['reply'] ?? + config.operationTimeouts['default'] ?? + 20_000; + + try { + const result = await browser.withPage( + PLATFORM, + async (page) => + replyNotification( + page, + task.notification.userId, + task.notification.content, + replyContent, + ), + timeoutMs, + ); + + if (result.success) { + store.markReplied(fingerprint, replyContent); + } else { + store.markFailed(fingerprint, 'Retry reply returned success=false'); + } + + return { + success: result.success, + fingerprint, + }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + store.markFailed(fingerprint, message); + throw err; + } +} + +function resolveReplyTarget(args: { + fingerprint?: string; + user_id?: string; + comment_content?: string; +}): { fingerprint?: string; userId: string; commentContent: string } { + const store = getNotificationStateStore(); + + if (args.fingerprint) { + const task = store.getByFingerprint(args.fingerprint); + if (!task) { + throw new Error(`Notification task not found: ${args.fingerprint}`); + } + return { + fingerprint: args.fingerprint, + userId: task.notification.userId, + commentContent: task.notification.content, + }; + } + + if (!args.user_id || !args.comment_content) { + throw new Error('Either fingerprint or both user_id and comment_content are required'); + } + + const fp = store.findOpenFingerprint(args.user_id, args.comment_content) ?? undefined; + return { + ...(fp ? { fingerprint: fp } : {}), + userId: args.user_id, + commentContent: args.comment_content, + }; +} + // --------------------------------------------------------------------------- // PlatformPlugin implementation // --------------------------------------------------------------------------- @@ -104,15 +298,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { async (page) => checkLoginStatus(page), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(status), - }, - ], - }; + return ok(status); }); }, ); @@ -128,15 +314,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { async () => { return withErrorHandling('xhs_get_login_qrcode', async () => { const result = await getLoginQRCode(browser); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(result); }); }, ); @@ -152,15 +330,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { async () => { return withErrorHandling('xhs_delete_cookies', async () => { await deleteCookies(browser); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify({ success: true, message: 'Cookies deleted' }), - }, - ], - }; + return ok({ message: 'Cookies deleted' }); }); }, ); @@ -177,7 +347,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { 'xhs_list_feeds', 'Get Xiaohongshu explore page recommended feed list', ListFeedsSchema, - async () => { + async (args) => { return withErrorHandling('xhs_list_feeds', async () => { const timeoutMs = config.operationTimeouts['feed_list'] ?? config.operationTimeouts['default'] ?? 60_000; @@ -186,15 +356,11 @@ export const xiaohongshuPlugin: PlatformPlugin = { async (page) => listFeeds(page), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(feeds), - }, - ], - }; + const pageResult = paginateArray(feeds, { + max_count: args.max_count, + cursor: args.cursor, + }); + return ok(pageResult.items, pageResult.meta); }); }, ); @@ -224,15 +390,11 @@ export const xiaohongshuPlugin: PlatformPlugin = { async (page) => searchFeeds(page, args.keyword, filters), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(feeds), - }, - ], - }; + const pageResult = paginateArray(feeds, { + max_count: args.max_count, + cursor: args.cursor, + }); + return ok(pageResult.items, pageResult.meta); }); }, ); @@ -255,15 +417,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { getFeedDetail(page, args.feed_id, args.xsec_token), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(detail), - }, - ], - }; + return ok(detail); }); }, ); @@ -289,15 +443,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { getSubComments(page, args.feed_id, args.xsec_token, args.comment_id, args.max_count), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(result); }); }, ); @@ -320,15 +466,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { getUserProfile(page, args.user_id, args.xsec_token), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(profile), - }, - ], - }; + return ok(profile); }); }, ); @@ -345,7 +483,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { 'xhs_list_my_notes', 'List your published notes on Xiaohongshu from the creator center', ListMyNotesSchema, - async () => { + async (args) => { return withErrorHandling('xhs_list_my_notes', async () => { const timeoutMs = config.operationTimeouts['feed_list'] ?? @@ -357,10 +495,11 @@ export const xiaohongshuPlugin: PlatformPlugin = { async (page) => listMyNotes(page), timeoutMs, ); - - return { - content: [{ type: 'text' as const, text: JSON.stringify(notes) }], - }; + const pageResult = paginateArray(notes, { + max_count: args.max_count, + cursor: args.cursor, + }); + return ok(pageResult.items, pageResult.meta); }); }, ); @@ -379,39 +518,51 @@ export const xiaohongshuPlugin: PlatformPlugin = { PublishImageSchema, async (args) => { return withErrorHandling('xhs_publish_image', async () => { - // Resolve all images (local path or URL download) before acquiring browser. - const resolved: Array<{ path: string; temporary: boolean }> = []; - for (const img of args.images) { - resolved.push(await resolveMediaInput(img, { maxSizeMB: IMAGE_MAX_SIZE_MB })); - } - const validatedPaths = resolved.map((r) => r.path); + const { data, meta } = await runWithIdempotency( + 'xhs_publish_image', + args.request_id, + { + title: args.title, + content: args.content, + images: args.images, + tags: args.tags, + schedule_at: args.schedule_at, + is_original: args.is_original, + visibility: args.visibility, + }, + async () => { + // Resolve all images (local path or URL download) before acquiring browser. + const resolved: Array<{ path: string; temporary: boolean }> = []; + for (const img of args.images) { + resolved.push(await resolveMediaInput(img, { maxSizeMB: IMAGE_MAX_SIZE_MB })); + } + const validatedPaths = resolved.map((r) => r.path); - const timeoutMs = - config.operationTimeouts['publish'] ?? - config.operationTimeouts['default'] ?? - 300_000; + const timeoutMs = + config.operationTimeouts['publish'] ?? + config.operationTimeouts['default'] ?? + 300_000; - try { - const result = await browser.withPage( - PLATFORM, - async (page) => - publishImageNote(page, args.title, args.content, validatedPaths, { - tags: args.tags, - scheduleAt: args.schedule_at, - isOriginal: args.is_original, - visibility: args.visibility, - }), - timeoutMs, - ); - - return { - content: [{ type: 'text' as const, text: JSON.stringify(result) }], - }; - } finally { - for (const r of resolved) { - if (r.temporary) await cleanupFile(r.path).catch(() => undefined); - } - } + try { + return await browser.withPage( + PLATFORM, + async (page) => + publishImageNote(page, args.title, args.content, validatedPaths, { + tags: args.tags, + scheduleAt: args.schedule_at, + isOriginal: args.is_original, + visibility: args.visibility, + }), + timeoutMs, + ); + } finally { + for (const r of resolved) { + if (r.temporary) await cleanupFile(r.path).catch(() => undefined); + } + } + }, + ); + return ok(data, meta); }); }, ); @@ -426,32 +577,43 @@ export const xiaohongshuPlugin: PlatformPlugin = { PublishVideoSchema, async (args) => { return withErrorHandling('xhs_publish_video', async () => { - // Resolve video (local path or URL download) before acquiring browser. - const resolvedVideo = await resolveMediaInput(args.video, { maxSizeMB: VIDEO_MAX_SIZE_MB }); + const { data, meta } = await runWithIdempotency( + 'xhs_publish_video', + args.request_id, + { + title: args.title, + content: args.content, + video: args.video, + tags: args.tags, + schedule_at: args.schedule_at, + visibility: args.visibility, + }, + async () => { + // Resolve video (local path or URL download) before acquiring browser. + const resolvedVideo = await resolveMediaInput(args.video, { maxSizeMB: VIDEO_MAX_SIZE_MB }); - const timeoutMs = - config.operationTimeouts['publish'] ?? - config.operationTimeouts['default'] ?? - 300_000; + const timeoutMs = + config.operationTimeouts['publish'] ?? + config.operationTimeouts['default'] ?? + 300_000; - try { - const result = await browser.withPage( - PLATFORM, - async (page) => - publishVideoNote(page, args.title, args.content, resolvedVideo.path, { - tags: args.tags, - scheduleAt: args.schedule_at, - visibility: args.visibility, - }), - timeoutMs, - ); - - return { - content: [{ type: 'text' as const, text: JSON.stringify(result) }], - }; - } finally { - if (resolvedVideo.temporary) await cleanupFile(resolvedVideo.path).catch(() => undefined); - } + try { + return await browser.withPage( + PLATFORM, + async (page) => + publishVideoNote(page, args.title, args.content, resolvedVideo.path, { + tags: args.tags, + scheduleAt: args.schedule_at, + visibility: args.visibility, + }), + timeoutMs, + ); + } finally { + if (resolvedVideo.temporary) await cleanupFile(resolvedVideo.path).catch(() => undefined); + } + }, + ); + return ok(data, meta); }); }, ); @@ -481,15 +643,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { postComment(page, args.feed_id, args.xsec_token, args.content), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(result); }); }, ); @@ -504,33 +658,38 @@ export const xiaohongshuPlugin: PlatformPlugin = { ReplyCommentSchema, async (args) => { return withErrorHandling('xhs_reply_comment', async () => { - const timeoutMs = - config.operationTimeouts['reply'] ?? - config.operationTimeouts['default'] ?? - 20_000; + const { data, meta } = await runWithIdempotency( + 'xhs_reply_comment', + args.request_id, + { + feed_id: args.feed_id, + xsec_token: args.xsec_token, + content: args.content, + comment_id: args.comment_id, + user_id: args.user_id, + }, + async () => { + const timeoutMs = + config.operationTimeouts['reply'] ?? + config.operationTimeouts['default'] ?? + 20_000; - const result = await browser.withPage( - PLATFORM, - async (page) => - replyComment( - page, - args.feed_id, - args.xsec_token, - args.content, - args.comment_id, - args.user_id, - ), - timeoutMs, + return await browser.withPage( + PLATFORM, + async (page) => + replyComment( + page, + args.feed_id, + args.xsec_token, + args.content, + args.comment_id, + args.user_id, + ), + timeoutMs, + ); + }, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(data, meta); }); }, ); @@ -556,15 +715,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { toggleLike(page, args.feed_id, args.xsec_token), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(result); }); }, ); @@ -590,21 +741,13 @@ export const xiaohongshuPlugin: PlatformPlugin = { toggleFavorite(page, args.feed_id, args.xsec_token), timeoutMs, ); - - return { - content: [ - { - type: 'text' as const, - text: JSON.stringify(result), - }, - ], - }; + return ok(result); }); }, ); // ===================================================================== - // Notifications (5 tools) + // Notifications (7 tools) // ===================================================================== // ----------------------------------------------------------------------- @@ -628,16 +771,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { : ['new', 'failed']; const tasks = getNotificationStateStore().listByStatuses(statuses, args.max_count); - - return { - content: [{ - type: 'text' as const, - text: JSON.stringify({ - tasks, - ...(syncResult ? { synced: syncResult } : {}), - }), - }], - }; + return ok(tasks, syncResult ? { synced: syncResult } : undefined); }); }, ); @@ -660,16 +794,56 @@ export const xiaohongshuPlugin: PlatformPlugin = { store.setStatus(args.fingerprint, args.status, args.note); const updated = store.getByFingerprint(args.fingerprint); + return ok(updated); + }); + }, + ); - return { - content: [{ - type: 'text' as const, - text: JSON.stringify({ - success: true, - task: updated, - }), - }], - }; + // ----------------------------------------------------------------------- + // xhs_mark_notification_tasks + // ----------------------------------------------------------------------- + + server.tool( + 'xhs_mark_notification_tasks', + 'Batch mark notification task statuses', + MarkNotificationTasksSchema, + async (args) => { + return withErrorHandling('xhs_mark_notification_tasks', async () => { + const store = getNotificationStateStore(); + const results: Array<{ + fingerprint: string; + success: boolean; + task?: NotificationTask | null; + error?: string; + }> = []; + + for (const item of args.tasks) { + const existing = store.getByFingerprint(item.fingerprint); + if (!existing) { + results.push({ + fingerprint: item.fingerprint, + success: false, + error: 'Notification task not found', + }); + continue; + } + + store.setStatus(item.fingerprint, item.status, item.note); + results.push({ + fingerprint: item.fingerprint, + success: true, + task: store.getByFingerprint(item.fingerprint), + }); + } + + const successCount = results.filter((r) => r.success).length; + return ok(results, { + batch: { + total: results.length, + success: successCount, + failed: results.length - successCount, + }, + }); }); }, ); @@ -685,12 +859,7 @@ export const xiaohongshuPlugin: PlatformPlugin = { async (args) => { return withErrorHandling('xhs_list_failed_notification_tasks', async () => { const tasks = getNotificationStateStore().listByStatuses(['failed'], args.max_count); - return { - content: [{ - type: 'text' as const, - text: JSON.stringify({ tasks }), - }], - }; + return ok(tasks); }); }, ); @@ -705,62 +874,83 @@ export const xiaohongshuPlugin: PlatformPlugin = { RetryNotificationTaskSchema, async (args) => { return withErrorHandling('xhs_retry_notification_task', async () => { - const store = getNotificationStateStore(); - const task = store.getByFingerprint(args.fingerprint); - if (!task) { - throw new Error(`Notification task not found: ${args.fingerprint}`); - } - if (task.status !== 'failed') { - throw new Error(`Task status must be failed to retry, got: ${task.status}`); - } + const { data, meta } = await runWithIdempotency( + 'xhs_retry_notification_task', + args.request_id, + { + fingerprint: args.fingerprint, + reply_content: args.reply_content, + }, + async () => + retryFailedNotificationTaskByFingerprint( + browser, + args.fingerprint, + args.reply_content, + ), + ); + return ok(data, meta); + }); + }, + ); - const replyContent = args.reply_content ?? task.replyContent; - if (!replyContent) { - throw new Error( - 'Retry requires reply_content when task has no stored replyContent', - ); - } + // ----------------------------------------------------------------------- + // xhs_retry_notification_tasks + // ----------------------------------------------------------------------- - store.markPending(args.fingerprint); + server.tool( + 'xhs_retry_notification_tasks', + 'Batch retry failed notification tasks', + RetryNotificationTasksSchema, + async (args) => { + return withErrorHandling('xhs_retry_notification_tasks', async () => { + const { data, meta } = await runWithIdempotency( + 'xhs_retry_notification_tasks', + args.request_id, + { + tasks: args.tasks, + continue_on_error: args.continue_on_error, + }, + async () => { + const results: Array<{ + fingerprint: string; + success: boolean; + error?: string; + }> = []; - const timeoutMs = - config.operationTimeouts['reply'] ?? - config.operationTimeouts['default'] ?? - 20_000; + for (const item of args.tasks) { + try { + const one = await retryFailedNotificationTaskByFingerprint( + browser, + item.fingerprint, + item.reply_content, + ); + results.push(one); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + results.push({ + fingerprint: item.fingerprint, + success: false, + error: message, + }); + if (!args.continue_on_error) { + throw err; + } + } + } - try { - const result = await browser.withPage( - PLATFORM, - async (page) => - replyNotification( - page, - task.notification.userId, - task.notification.content, - replyContent, - ), - timeoutMs, - ); + const successCount = results.filter((r) => r.success).length; + return { + results, + summary: { + total: results.length, + success: successCount, + failed: results.length - successCount, + }, + }; + }, + ); - if (result.success) { - store.markReplied(args.fingerprint, replyContent); - } else { - store.markFailed(args.fingerprint, 'Retry reply returned success=false'); - } - - return { - content: [{ - type: 'text' as const, - text: JSON.stringify({ - ...result, - fingerprint: args.fingerprint, - }), - }], - }; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - store.markFailed(args.fingerprint, message); - throw err; - } + return ok(data, meta); }); }, ); @@ -775,53 +965,71 @@ export const xiaohongshuPlugin: PlatformPlugin = { ReplyNotificationSchema, async (args) => { return withErrorHandling('xhs_reply_notification', async () => { - const targetFingerprint = - args.fingerprint ?? - getNotificationStateStore().findOpenFingerprint(args.user_id, args.comment_content); - if (targetFingerprint) { - getNotificationStateStore().markPending(targetFingerprint); - } + const target = resolveReplyTarget({ + fingerprint: args.fingerprint, + user_id: args.user_id, + comment_content: args.comment_content, + }); + const targetFingerprint = target.fingerprint; - const timeoutMs = - config.operationTimeouts['reply'] ?? - config.operationTimeouts['default'] ?? - 20_000; + const { data, meta } = await runWithIdempotency( + 'xhs_reply_notification', + args.request_id, + { + fingerprint: args.fingerprint, + user_id: target.userId, + comment_content: target.commentContent, + reply_content: args.reply_content, + }, + async () => { + const timeoutMs = + config.operationTimeouts['reply'] ?? + config.operationTimeouts['default'] ?? + 20_000; - try { - const result = await browser.withPage( - PLATFORM, - async (page) => - replyNotification(page, args.user_id, args.comment_content, args.reply_content), - timeoutMs, - ); + try { + if (targetFingerprint) { + getNotificationStateStore().markPending(targetFingerprint); + } - if (targetFingerprint) { - if (result.success) { - getNotificationStateStore().markReplied(targetFingerprint, args.reply_content); - } else { - getNotificationStateStore().markFailed( - targetFingerprint, - 'Reply action returned success=false', + const result = await browser.withPage( + PLATFORM, + async (page) => + replyNotification( + page, + target.userId, + target.commentContent, + args.reply_content, + ), + timeoutMs, ); - } - } - return { - content: [{ - type: 'text' as const, - text: JSON.stringify({ + if (targetFingerprint) { + if (result.success) { + getNotificationStateStore().markReplied(targetFingerprint, args.reply_content); + } else { + getNotificationStateStore().markFailed( + targetFingerprint, + 'Reply action returned success=false', + ); + } + } + + return { ...result, ...(targetFingerprint ? { fingerprint: targetFingerprint } : {}), - }), - }], - }; - } catch (err) { - if (targetFingerprint) { - const message = err instanceof Error ? err.message : String(err); - getNotificationStateStore().markFailed(targetFingerprint, message); - } - throw err; - } + }; + } catch (err) { + if (targetFingerprint) { + const message = err instanceof Error ? err.message : String(err); + getNotificationStateStore().markFailed(targetFingerprint, message); + } + throw err; + } + }, + ); + + return ok(data, meta); }); }, ); diff --git a/src/platforms/xiaohongshu/routes.ts b/src/platforms/xiaohongshu/routes.ts index 652855e..2482063 100644 --- a/src/platforms/xiaohongshu/routes.ts +++ b/src/platforms/xiaohongshu/routes.ts @@ -683,11 +683,36 @@ export function createXhsRoutes(browser: BrowserManager): Router { void (async () => { try { const body = ReplyNotificationBodySchema.parse(req.body); - const targetFingerprint = - body.fingerprint ?? - getNotificationStateStore().findOpenFingerprint(body.user_id, body.comment_content); + const store = getNotificationStateStore(); + + const target = (() => { + if (body.fingerprint) { + const task = store.getByFingerprint(body.fingerprint); + if (!task) { + throw new Error(`Notification task not found: ${body.fingerprint}`); + } + return { + fingerprint: body.fingerprint, + userId: task.notification.userId, + commentContent: task.notification.content, + }; + } + + if (!body.user_id || !body.comment_content) { + throw new Error('Either fingerprint or both user_id and comment_content are required'); + } + + const fp = store.findOpenFingerprint(body.user_id, body.comment_content) ?? undefined; + return { + ...(fp ? { fingerprint: fp } : {}), + userId: body.user_id, + commentContent: body.comment_content, + }; + })(); + + const targetFingerprint = target.fingerprint; if (targetFingerprint) { - getNotificationStateStore().markPending(targetFingerprint); + store.markPending(targetFingerprint); } const timeoutMs = @@ -699,15 +724,15 @@ export function createXhsRoutes(browser: BrowserManager): Router { const result = await browser.withPage( PLATFORM, async (page) => - replyNotification(page, body.user_id, body.comment_content, body.reply_content), + replyNotification(page, target.userId, target.commentContent, body.reply_content), timeoutMs, ); if (targetFingerprint) { if (result.success) { - getNotificationStateStore().markReplied(targetFingerprint, body.reply_content); + store.markReplied(targetFingerprint, body.reply_content); } else { - getNotificationStateStore().markFailed( + store.markFailed( targetFingerprint, 'Reply action returned success=false', ); @@ -721,7 +746,7 @@ export function createXhsRoutes(browser: BrowserManager): Router { } catch (err) { if (targetFingerprint) { const message = err instanceof Error ? err.message : String(err); - getNotificationStateStore().markFailed(targetFingerprint, message); + store.markFailed(targetFingerprint, message); } throw err; } diff --git a/src/platforms/xiaohongshu/schemas.ts b/src/platforms/xiaohongshu/schemas.ts index ad04630..69dc475 100644 --- a/src/platforms/xiaohongshu/schemas.ts +++ b/src/platforms/xiaohongshu/schemas.ts @@ -1,7 +1,7 @@ import { z } from 'zod'; // --------------------------------------------------------------------------- -// MCP tool parameter schemas for all 13 Xiaohongshu tools. +// MCP tool parameter schemas for Xiaohongshu tools. // // Phase 2 tools (login) have no parameters — their schemas are empty objects. // Phase 3/4 schemas are defined here so that the full tool surface is @@ -22,7 +22,20 @@ export const DeleteCookiesSchema = {}; // -- Phase 3: Content browsing (4 tools) ----------------------------------- /** xhs_list_feeds — no parameters. */ -export const ListFeedsSchema = {}; +export const ListFeedsSchema = { + max_count: z + .number() + .int() + .min(1) + .max(200) + .optional() + .default(20) + .describe('Maximum number of feeds to return per page (1–200, default 20)'), + cursor: z + .string() + .optional() + .describe('Pagination cursor returned by previous call'), +}; /** xhs_search */ export const SearchSchema = { @@ -44,6 +57,18 @@ export const SearchSchema = { }) .optional() .describe('Optional search filters'), + max_count: z + .number() + .int() + .min(1) + .max(200) + .optional() + .default(20) + .describe('Maximum number of search results to return per page (1–200, default 20)'), + cursor: z + .string() + .optional() + .describe('Pagination cursor returned by previous call'), }; /** xhs_get_feed_detail */ @@ -77,6 +102,12 @@ export const GetUserProfileSchema = { /** xhs_publish_image */ export const PublishImageSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for publish request'), title: z.string().min(1).max(20, 'Title must be ≤ 20 characters').describe('Note title (max 20 chars)'), content: z.string().max(1000, 'Content must be ≤ 1000 characters').describe('Note body text (max 1000 chars)'), images: z @@ -103,6 +134,12 @@ export const PublishImageSchema = { /** xhs_publish_video */ export const PublishVideoSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for publish request'), title: z.string().min(1).max(20, 'Title must be ≤ 20 characters').describe('Note title (max 20 chars)'), content: z.string().max(1000, 'Content must be ≤ 1000 characters').describe('Note body text (max 1000 chars)'), video: z.string().describe('Local file path or HTTP/HTTPS URL for the video'), @@ -129,6 +166,12 @@ export const PostCommentSchema = { /** xhs_reply_comment */ export const ReplyCommentSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for reply request'), feed_id: z.string().describe('Feed ID'), xsec_token: z.string().describe('Security token for the feed'), comment_id: z.string().optional().describe('Comment ID to reply to'), @@ -142,8 +185,21 @@ export const LikeSchema = { xsec_token: z.string().describe('Security token for the feed'), }; -/** xhs_list_my_notes — no parameters. */ -export const ListMyNotesSchema = {}; +/** xhs_list_my_notes */ +export const ListMyNotesSchema = { + max_count: z + .number() + .int() + .min(1) + .max(200) + .optional() + .default(20) + .describe('Maximum number of notes to return per page (1–200, default 20)'), + cursor: z + .string() + .optional() + .describe('Pagination cursor returned by previous call'), +}; // -- Phase 5: Notifications & automation ----------------------------------- @@ -161,12 +217,18 @@ export const GetCommentNotificationsSchema = { /** xhs_reply_notification */ export const ReplyNotificationSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for notification reply'), fingerprint: z .string() .optional() .describe('Optional notification fingerprint from xhs_get_unprocessed_notifications'), - user_id: z.string().describe('User ID of the comment author (from notification)'), - comment_content: z.string().describe('Original comment content to match the notification'), + user_id: z.string().optional().describe('User ID of the comment author (fallback when fingerprint is absent)'), + comment_content: z.string().optional().describe('Original comment content to match the notification (fallback when fingerprint is absent)'), reply_content: z.string().min(1).describe('Reply text to send'), }; @@ -217,6 +279,12 @@ export const ListFailedNotificationTasksSchema = { /** xhs_retry_notification_task */ export const RetryNotificationTaskSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for retry request'), fingerprint: z.string().describe('Notification task fingerprint to retry'), reply_content: z .string() @@ -225,6 +293,42 @@ export const RetryNotificationTaskSchema = { .describe('Optional override reply text. If omitted, uses stored reply_content from previous attempt.'), }; +/** xhs_mark_notification_tasks */ +export const MarkNotificationTasksSchema = { + tasks: z + .array(z.object({ + fingerprint: z.string(), + status: z.enum(['new', 'pending', 'ignored', 'replied', 'failed']), + note: z.string().optional(), + })) + .min(1) + .max(100) + .describe('Batch of task status updates (1–100 items)'), +}; + +/** xhs_retry_notification_tasks */ +export const RetryNotificationTasksSchema = { + request_id: z + .string() + .min(1) + .max(128) + .optional() + .describe('Optional idempotency key for batch retry request'), + tasks: z + .array(z.object({ + fingerprint: z.string(), + reply_content: z.string().min(1).optional(), + })) + .min(1) + .max(100) + .describe('Batch of failed tasks to retry (1–100 items)'), + continue_on_error: z + .boolean() + .optional() + .default(true) + .describe('Continue processing remaining tasks after one task fails'), +}; + /** xhs_favorite */ export const FavoriteSchema = { feed_id: z.string().describe('Feed ID to toggle favorite'), diff --git a/src/utils/errors.ts b/src/utils/errors.ts index c5995f2..aa00656 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -123,9 +123,12 @@ export async function withErrorHandling( ); const payload = JSON.stringify({ - tool: toolName, - error: category, - message: sanitized, + success: false, + error: { + tool: toolName, + code: category, + message: sanitized, + }, }); return { diff --git a/src/utils/idempotency.ts b/src/utils/idempotency.ts new file mode 100644 index 0000000..9feb65c --- /dev/null +++ b/src/utils/idempotency.ts @@ -0,0 +1,115 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import path from 'node:path'; +import { DatabaseSync } from 'node:sqlite'; + +import { config } from '../config/index.js'; + +interface IdempotencyRow { + tool_name: string; + request_id: string; + input_hash: string; + response_json: string; + created_at: number; +} + +export interface IdempotencyRecord { + toolName: string; + requestId: string; + inputHash: string; + responseData: unknown; + createdAt: string; +} + +const DB_FILENAME = 'idempotency.db'; + +function canonicalize(value: unknown): unknown { + if (Array.isArray(value)) { + return value.map(canonicalize); + } + if (value && typeof value === 'object') { + const input = value as Record; + const out: Record = {}; + for (const key of Object.keys(input).sort()) { + out[key] = canonicalize(input[key]); + } + return out; + } + return value; +} + +export function computeIdempotencyHash(input: unknown): string { + const canonical = canonicalize(input); + return crypto + .createHash('sha256') + .update(JSON.stringify(canonical)) + .digest('hex'); +} + +export class IdempotencyStore { + private readonly db: DatabaseSync; + + constructor(baseDir = config.cookieDir, dbFilename = DB_FILENAME) { + fs.mkdirSync(baseDir, { recursive: true, mode: 0o700 }); + const dbPath = path.join(baseDir, dbFilename); + this.db = new DatabaseSync(dbPath); + this.db.exec(` + CREATE TABLE IF NOT EXISTS idempotency_requests ( + tool_name TEXT NOT NULL, + request_id TEXT NOT NULL, + input_hash TEXT NOT NULL, + response_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY (tool_name, request_id) + ); + `); + } + + get(toolName: string, requestId: string): IdempotencyRecord | null { + const stmt = this.db.prepare(` + SELECT tool_name, request_id, input_hash, response_json, created_at + FROM idempotency_requests + WHERE tool_name = ? AND request_id = ? + LIMIT 1 + `); + const row = stmt.get(toolName, requestId) as IdempotencyRow | undefined; + if (!row) return null; + + return { + toolName: row.tool_name, + requestId: row.request_id, + inputHash: row.input_hash, + responseData: JSON.parse(row.response_json), + createdAt: new Date(row.created_at).toISOString(), + }; + } + + put( + toolName: string, + requestId: string, + inputHash: string, + responseData: unknown, + ): void { + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO idempotency_requests ( + tool_name, request_id, input_hash, response_json, created_at + ) VALUES (?, ?, ?, ?, ?) + `); + stmt.run( + toolName, + requestId, + inputHash, + JSON.stringify(responseData), + Date.now(), + ); + } +} + +let singleton: IdempotencyStore | null = null; + +export function getIdempotencyStore(): IdempotencyStore { + if (!singleton) { + singleton = new IdempotencyStore(); + } + return singleton; +} diff --git a/test/errors.test.ts b/test/errors.test.ts index ed8083b..f0b79aa 100644 --- a/test/errors.test.ts +++ b/test/errors.test.ts @@ -129,9 +129,10 @@ describe('withErrorHandling', () => { expect(result.content).toHaveLength(1); const payload = JSON.parse(result.content[0]!.text); - expect(payload.tool).toBe('publish_post'); - expect(payload.error).toBe(ErrorCategory.TIMEOUT); - expect(typeof payload.message).toBe('string'); + expect(payload.success).toBe(false); + expect(payload.error.tool).toBe('publish_post'); + expect(payload.error.code).toBe(ErrorCategory.TIMEOUT); + expect(typeof payload.error.message).toBe('string'); }); it('wraps non-Error throws into an Error', async () => { @@ -142,8 +143,9 @@ describe('withErrorHandling', () => { expect(result.isError).toBe(true); const payload = JSON.parse(result.content[0]!.text); - expect(payload.tool).toBe('my_tool'); - expect(payload.error).toBe(ErrorCategory.INTERNAL); - expect(payload.message).toContain('raw string error'); + expect(payload.success).toBe(false); + expect(payload.error.tool).toBe('my_tool'); + expect(payload.error.code).toBe(ErrorCategory.INTERNAL); + expect(payload.error.message).toContain('raw string error'); }); });