Files
social-mcp/apps/xhs-mcp/src/platforms/xiaohongshu/notification-state.ts
T

447 lines
12 KiB
TypeScript

import crypto from 'node:crypto';
import fs from 'node:fs';
import path from 'node:path';
import { config } from '@social/core/config/index.js';
import { logger } from '@social/core/utils/logger.js';
import { DatabaseSync } from '@social/core/utils/sqlite.js';
import type { CommentNotification } from './types.js';
export type NotificationTaskStatus =
| 'new'
| 'pending'
| 'replied'
| 'failed'
| 'ignored';
interface NotificationRow {
fingerprint: string;
user_id: string;
nickname: string;
avatar: string;
content: string;
type: string;
time: string;
feed_id: string;
xsec_token: string;
note_image: string;
status: NotificationTaskStatus;
first_seen_at: number;
last_seen_at: number;
retry_count: number;
last_attempt_at: number | null;
replied_at: number | null;
reply_content: string | null;
error_message: string | null;
}
export interface NotificationTask {
fingerprint: string;
notification: CommentNotification;
status: NotificationTaskStatus;
firstSeenAt: string;
lastSeenAt: string;
retryCount: number;
lastAttemptAt?: string;
repliedAt?: string;
replyContent?: string;
errorMessage?: string;
}
export interface NotificationUpsertResult {
fetched: number;
inserted: number;
updated: number;
}
export interface NotificationKeysetCursor {
firstSeenAt: number;
fingerprint: string;
}
const PLATFORM = 'xiaohongshu';
const DB_FILENAME = 'automation.db';
const log = logger.child({ module: 'xhs-notification-state' });
export class NotificationStateStore {
private readonly db: InstanceType<typeof DatabaseSync>;
private readonly dbPath: string;
constructor(baseDir = config.cookieDir, dbFilename = DB_FILENAME) {
const dir = path.join(baseDir, PLATFORM);
fs.mkdirSync(dir, { recursive: true, mode: 0o700 });
this.dbPath = path.join(dir, dbFilename);
this.db = new DatabaseSync(this.dbPath);
this.db.exec('PRAGMA journal_mode = WAL;');
this.db.exec('PRAGMA synchronous = NORMAL;');
this.db.exec(`
CREATE TABLE IF NOT EXISTS notification_tasks (
fingerprint TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
nickname TEXT NOT NULL,
avatar TEXT NOT NULL,
content TEXT NOT NULL,
type TEXT NOT NULL,
time TEXT NOT NULL,
feed_id TEXT NOT NULL,
xsec_token TEXT NOT NULL,
note_image TEXT NOT NULL,
status TEXT NOT NULL,
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
replied_at INTEGER,
reply_content TEXT,
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_notification_tasks_status_first_seen
ON notification_tasks(status, first_seen_at);
CREATE INDEX IF NOT EXISTS idx_notification_tasks_user_content_status
ON notification_tasks(user_id, content, status);
`);
log.info({ dbPath: this.dbPath }, 'Notification state store initialized');
}
buildFingerprint(notification: CommentNotification): string {
const payload = [
notification.feedId,
notification.userId,
notification.content.trim(),
notification.time.trim(),
notification.type.trim(),
].join('|');
return crypto.createHash('sha256').update(payload).digest('hex');
}
upsertNotifications(notifications: CommentNotification[]): NotificationUpsertResult {
if (notifications.length === 0) {
return { fetched: 0, inserted: 0, updated: 0 };
}
const now = Date.now();
let inserted = 0;
let updated = 0;
const selectStmt = this.db.prepare(
'SELECT fingerprint FROM notification_tasks WHERE fingerprint = ?',
);
const insertStmt = this.db.prepare(`
INSERT INTO notification_tasks (
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'new', ?, ?)
`);
const updateStmt = this.db.prepare(`
UPDATE notification_tasks
SET
nickname = ?,
avatar = ?,
type = ?,
time = ?,
feed_id = ?,
xsec_token = ?,
note_image = ?,
last_seen_at = ?
WHERE fingerprint = ?
`);
this.db.exec('BEGIN');
try {
for (const n of notifications) {
const fp = this.buildFingerprint(n);
const exists = selectStmt.get(fp) as { fingerprint: string } | undefined;
if (!exists) {
insertStmt.run(
fp,
n.userId,
n.nickname,
n.avatar,
n.content,
n.type,
n.time,
n.feedId,
n.xsecToken,
n.noteImage,
now,
now,
);
inserted++;
} else {
updateStmt.run(
n.nickname,
n.avatar,
n.type,
n.time,
n.feedId,
n.xsecToken,
n.noteImage,
now,
fp,
);
updated++;
}
}
this.db.exec('COMMIT');
} catch (err) {
this.db.exec('ROLLBACK');
throw err;
}
return {
fetched: notifications.length,
inserted,
updated,
};
}
listByStatuses(
statuses: NotificationTaskStatus[],
maxCount: number,
offset = 0,
): NotificationTask[] {
if (statuses.length === 0) return [];
const placeholders = statuses.map(() => '?').join(', ');
const query = `
SELECT
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at,
retry_count, last_attempt_at, replied_at, reply_content, error_message
FROM notification_tasks
WHERE status IN (${placeholders})
ORDER BY first_seen_at ASC
LIMIT ?
OFFSET ?
`;
const stmt = this.db.prepare(query);
const rows = stmt.all(...statuses, maxCount, offset) as unknown as NotificationRow[];
return rows.map((r) => this.rowToTask(r));
}
listByStatusesKeyset(
statuses: NotificationTaskStatus[],
maxCount: number,
cursor?: NotificationKeysetCursor,
): { tasks: NotificationTask[]; hasMore: boolean; nextCursor?: NotificationKeysetCursor } {
if (statuses.length === 0 || maxCount <= 0) {
return { tasks: [], hasMore: false };
}
const placeholders = statuses.map(() => '?').join(', ');
const condition = cursor
? `
AND (
first_seen_at > ?
OR (first_seen_at = ? AND fingerprint > ?)
)
`
: '';
const query = `
SELECT
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at,
retry_count, last_attempt_at, replied_at, reply_content, error_message
FROM notification_tasks
WHERE status IN (${placeholders})
${condition}
ORDER BY first_seen_at ASC, fingerprint ASC
LIMIT ?
`;
const stmt = this.db.prepare(query);
const limitWithSentinel = maxCount + 1;
const rows = cursor
? stmt.all(
...statuses,
cursor.firstSeenAt,
cursor.firstSeenAt,
cursor.fingerprint,
limitWithSentinel,
) as unknown as NotificationRow[]
: stmt.all(...statuses, limitWithSentinel) as unknown as NotificationRow[];
const hasMore = rows.length > maxCount;
const pageRows = hasMore ? rows.slice(0, maxCount) : rows;
const tasks = pageRows.map((r) => this.rowToTask(r));
if (!hasMore || pageRows.length === 0) {
return { tasks, hasMore };
}
const last = pageRows[pageRows.length - 1]!;
return {
tasks,
hasMore,
nextCursor: {
firstSeenAt: last.first_seen_at,
fingerprint: last.fingerprint,
},
};
}
countByStatuses(statuses: NotificationTaskStatus[]): number {
if (statuses.length === 0) return 0;
const placeholders = statuses.map(() => '?').join(', ');
const query = `
SELECT COUNT(1) AS count
FROM notification_tasks
WHERE status IN (${placeholders})
`;
const stmt = this.db.prepare(query);
const row = stmt.get(...statuses) as { count?: number } | undefined;
return row?.count ?? 0;
}
getByFingerprint(fingerprint: string): NotificationTask | null {
const stmt = this.db.prepare(`
SELECT
fingerprint, user_id, nickname, avatar, content, type, time,
feed_id, xsec_token, note_image, status, first_seen_at, last_seen_at,
retry_count, last_attempt_at, replied_at, reply_content, error_message
FROM notification_tasks
WHERE fingerprint = ?
LIMIT 1
`);
const row = stmt.get(fingerprint) as NotificationRow | undefined;
return row ? this.rowToTask(row) : null;
}
findOpenFingerprint(userId: string, content: string): string | null {
const stmt = this.db.prepare(`
SELECT fingerprint
FROM notification_tasks
WHERE user_id = ?
AND content = ?
AND status IN ('new', 'failed', 'pending')
ORDER BY first_seen_at ASC
LIMIT 1
`);
const row = stmt.get(userId, content) as { fingerprint: string } | undefined;
return row?.fingerprint ?? null;
}
markPending(fingerprint: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET status = 'pending', last_attempt_at = ?, error_message = NULL
WHERE fingerprint = ?
`);
stmt.run(now, fingerprint);
}
markReplied(fingerprint: string, replyContent: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET
status = 'replied',
replied_at = ?,
last_attempt_at = ?,
reply_content = ?,
error_message = NULL
WHERE fingerprint = ?
`);
stmt.run(now, now, replyContent, fingerprint);
}
markFailed(fingerprint: string, errorMessage: string): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET
status = 'failed',
retry_count = retry_count + 1,
last_attempt_at = ?,
error_message = ?
WHERE fingerprint = ?
`);
stmt.run(now, errorMessage, fingerprint);
}
markIgnored(fingerprint: string, reason?: string): void {
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET status = 'ignored', error_message = ?
WHERE fingerprint = ?
`);
stmt.run(reason ?? 'Ignored by operator', fingerprint);
}
setStatus(
fingerprint: string,
status: NotificationTaskStatus,
note?: string,
): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE notification_tasks
SET
status = ?,
last_attempt_at = CASE WHEN ? IN ('pending', 'failed', 'replied') THEN ? ELSE last_attempt_at END,
replied_at = CASE WHEN ? = 'replied' THEN ? ELSE replied_at END,
error_message = CASE WHEN ? = 'failed' THEN COALESCE(?, 'Marked as failed') WHEN ? = 'ignored' THEN COALESCE(?, 'Ignored by operator') ELSE error_message END,
reply_content = CASE WHEN ? = 'replied' THEN COALESCE(?, reply_content) ELSE reply_content END
WHERE fingerprint = ?
`);
stmt.run(
status,
status,
now,
status,
now,
status,
note ?? null,
status,
note ?? null,
status,
note ?? null,
fingerprint,
);
}
private rowToTask(row: NotificationRow): NotificationTask {
return {
fingerprint: row.fingerprint,
notification: {
userId: row.user_id,
nickname: row.nickname,
avatar: row.avatar,
content: row.content,
type: row.type,
time: row.time,
feedId: row.feed_id,
xsecToken: row.xsec_token,
noteImage: row.note_image,
},
status: row.status,
firstSeenAt: new Date(row.first_seen_at).toISOString(),
lastSeenAt: new Date(row.last_seen_at).toISOString(),
retryCount: row.retry_count,
...(row.last_attempt_at ? { lastAttemptAt: new Date(row.last_attempt_at).toISOString() } : {}),
...(row.replied_at ? { repliedAt: new Date(row.replied_at).toISOString() } : {}),
...(row.reply_content ? { replyContent: row.reply_content } : {}),
...(row.error_message ? { errorMessage: row.error_message } : {}),
};
}
}
let storeSingleton: NotificationStateStore | null = null;
export function getNotificationStateStore(): NotificationStateStore {
if (!storeSingleton) {
storeSingleton = new NotificationStateStore();
}
return storeSingleton;
}