feat(threads): add real-time forum and chat system

Implement DB schema, API, and WebSocket for channels and messages
Add frontend views, AI moderation, and admin management
This commit is contained in:
2026-05-07 11:28:14 +08:00
parent 23a7301598
commit cbb101336b
16 changed files with 3567 additions and 10 deletions

View File

@@ -267,6 +267,17 @@ VALUES
('life.comments.like', 'Like Life comments', 'Like and unlike Life comments.', 'Life', true),
('life.reactions.set', 'Set Life reactions', 'Set and remove Life reactions.', 'Life', true),
('life.ratings.set', 'Set Life ratings', 'Set and remove Life star ratings.', 'Life', true),
('threads.create', 'Create Threads', 'Create forum threads.', 'Threads', true),
('threads.messages.create', 'Create Thread messages', 'Create chat messages inside Threads.', 'Threads', true),
('threads.follow', 'Follow Threads', 'Follow Threads and manage read state.', 'Threads', true),
('threads.reactions.set', 'Set Thread reactions', 'Set and remove Thread and Thread message reactions.', 'Threads', true),
('admin.threads.channels.read', 'View Thread channels', 'View Thread channel configuration.', 'Threads', true),
('admin.threads.channels.create', 'Create Thread channels', 'Create Thread channels.', 'Threads', true),
('admin.threads.channels.update', 'Update Thread channels', 'Edit Thread channel configuration.', 'Threads', true),
('admin.threads.channels.delete', 'Delete Thread channels', 'Delete Thread channels.', 'Threads', true),
('admin.threads.threads.delete', 'Delete any Thread', 'Delete any Thread.', 'Threads', true),
('admin.threads.threads.lock', 'Lock Threads', 'Lock and unlock Threads.', 'Threads', true),
('admin.threads.messages.delete', 'Delete any Thread message', 'Delete any Thread message.', 'Threads', true),
('users.follow', 'Follow users', 'Follow and unfollow public user profiles.', 'Users', true),
('discussions.comments.create', 'Create discussion comments', 'Create entity discussion comments and replies.', 'Discussions', true),
('discussions.comments.delete', 'Delete own discussion comments', 'Delete own entity discussion comments.', 'Discussions', true),
@@ -367,6 +378,17 @@ JOIN permissions p ON p.key = ANY (ARRAY[
'life.comments.like',
'life.reactions.set',
'life.ratings.set',
'threads.create',
'threads.messages.create',
'threads.follow',
'threads.reactions.set',
'admin.threads.channels.read',
'admin.threads.channels.create',
'admin.threads.channels.update',
'admin.threads.channels.delete',
'admin.threads.threads.delete',
'admin.threads.threads.lock',
'admin.threads.messages.delete',
'users.follow',
'discussions.comments.create',
'discussions.comments.delete',
@@ -440,6 +462,10 @@ JOIN permissions p ON p.key = ANY (ARRAY[
'life.comments.like',
'life.reactions.set',
'life.ratings.set',
'threads.create',
'threads.messages.create',
'threads.follow',
'threads.reactions.set',
'users.follow',
'discussions.comments.create',
'discussions.comments.delete',
@@ -513,6 +539,10 @@ JOIN permissions p ON p.key = ANY (ARRAY[
'life.comments.like',
'life.reactions.set',
'life.ratings.set',
'threads.create',
'threads.messages.create',
'threads.follow',
'threads.reactions.set',
'users.follow',
'discussions.comments.create',
'discussions.comments.delete',
@@ -554,6 +584,33 @@ JOIN permissions p ON p.key = 'users.follow'
WHERE r.key IN ('admin', 'editor', 'member')
ON CONFLICT DO NOTHING;
INSERT INTO role_permissions (role_id, permission_id)
SELECT r.id, p.id
FROM roles r
JOIN permissions p ON p.key = ANY (ARRAY[
'threads.create',
'threads.messages.create',
'threads.follow',
'threads.reactions.set'
])
WHERE r.key IN ('admin', 'editor', 'member')
ON CONFLICT DO NOTHING;
INSERT INTO role_permissions (role_id, permission_id)
SELECT r.id, p.id
FROM roles r
JOIN permissions p ON p.key = ANY (ARRAY[
'admin.threads.channels.read',
'admin.threads.channels.create',
'admin.threads.channels.update',
'admin.threads.channels.delete',
'admin.threads.threads.delete',
'admin.threads.threads.lock',
'admin.threads.messages.delete'
])
WHERE r.key = 'admin'
ON CONFLICT DO NOTHING;
WITH first_owner_user AS (
SELECT u.id
FROM users u
@@ -805,6 +862,184 @@ CREATE INDEX IF NOT EXISTS life_post_ratings_post_idx
CREATE INDEX IF NOT EXISTS life_post_ratings_user_idx
ON life_post_ratings(user_id, updated_at DESC, post_id DESC);
CREATE TABLE IF NOT EXISTS thread_channels (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name text NOT NULL UNIQUE,
allow_user_threads boolean NOT NULL DEFAULT true,
sort_order integer NOT NULL DEFAULT 0 CHECK (sort_order >= 0),
created_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
updated_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CHECK (length(name) BETWEEN 1 AND 80)
);
CREATE INDEX IF NOT EXISTS thread_channels_sort_order_idx
ON thread_channels(sort_order, id);
INSERT INTO thread_channels (name, allow_user_threads, sort_order)
VALUES
('General', true, 10),
('Questions', true, 20),
('Showcase', true, 30)
ON CONFLICT (name) DO NOTHING;
CREATE TABLE IF NOT EXISTS thread_channel_tags (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
channel_id integer NOT NULL REFERENCES thread_channels(id) ON DELETE CASCADE,
name text NOT NULL,
sort_order integer NOT NULL DEFAULT 0 CHECK (sort_order >= 0),
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (channel_id, name),
CHECK (length(name) BETWEEN 1 AND 40)
);
CREATE INDEX IF NOT EXISTS thread_channel_tags_channel_sort_idx
ON thread_channel_tags(channel_id, sort_order, id);
CREATE TABLE IF NOT EXISTS thread_channel_languages (
channel_id integer NOT NULL REFERENCES thread_channels(id) ON DELETE CASCADE,
language_code text NOT NULL REFERENCES languages(code) ON DELETE CASCADE,
sort_order integer NOT NULL DEFAULT 0 CHECK (sort_order >= 0),
PRIMARY KEY (channel_id, language_code)
);
CREATE INDEX IF NOT EXISTS thread_channel_languages_sort_idx
ON thread_channel_languages(channel_id, sort_order, language_code);
CREATE TABLE IF NOT EXISTS threads (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
channel_id integer NOT NULL REFERENCES thread_channels(id) ON DELETE CASCADE,
title text NOT NULL,
language_code text NOT NULL REFERENCES languages(code) ON DELETE RESTRICT,
locked boolean NOT NULL DEFAULT false,
message_count integer NOT NULL DEFAULT 0 CHECK (message_count >= 0),
last_message_id integer,
last_active_at timestamptz NOT NULL DEFAULT now(),
created_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
updated_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
deleted_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
deleted_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CHECK (length(title) BETWEEN 1 AND 140)
);
CREATE INDEX IF NOT EXISTS threads_channel_last_active_idx
ON threads(channel_id, last_active_at DESC, id DESC)
WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS threads_created_at_idx
ON threads(created_at DESC, id DESC)
WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS threads_language_idx
ON threads(language_code, last_active_at DESC, id DESC)
WHERE deleted_at IS NULL;
CREATE TABLE IF NOT EXISTS thread_tag_links (
thread_id integer NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
tag_id integer NOT NULL REFERENCES thread_channel_tags(id) ON DELETE CASCADE,
PRIMARY KEY (thread_id, tag_id)
);
CREATE INDEX IF NOT EXISTS thread_tag_links_tag_idx
ON thread_tag_links(tag_id, thread_id);
CREATE TABLE IF NOT EXISTS thread_messages (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
thread_id integer NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
body text NOT NULL CHECK (length(body) BETWEEN 1 AND 2000),
ai_moderation_status text NOT NULL DEFAULT 'unreviewed' CHECK (ai_moderation_status IN ('unreviewed', 'reviewing', 'approved', 'rejected', 'failed')),
ai_moderation_language_code text REFERENCES languages(code) ON DELETE SET NULL,
ai_moderation_reason text,
ai_moderation_content_hash text,
ai_moderation_checked_at timestamptz,
ai_moderation_retry_count integer NOT NULL DEFAULT 0 CHECK (ai_moderation_retry_count >= 0),
ai_moderation_updated_at timestamptz NOT NULL DEFAULT now(),
created_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
deleted_by_user_id integer REFERENCES users(id) ON DELETE SET NULL,
deleted_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'threads_last_message_fk'
) THEN
ALTER TABLE threads
ADD CONSTRAINT threads_last_message_fk
FOREIGN KEY (last_message_id) REFERENCES thread_messages(id) ON DELETE SET NULL;
END IF;
END $$;
CREATE INDEX IF NOT EXISTS thread_messages_thread_created_idx
ON thread_messages(thread_id, created_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS thread_messages_user_idx
ON thread_messages(created_by_user_id, created_at DESC, id DESC);
CREATE TABLE IF NOT EXISTS thread_reactions (
thread_id integer NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
user_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reaction_type text NOT NULL CHECK (reaction_type IN ('thumbs-up', 'heart', 'laugh', 'fire', 'eyes')),
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (thread_id, user_id, reaction_type)
);
CREATE INDEX IF NOT EXISTS thread_reactions_thread_idx
ON thread_reactions(thread_id, reaction_type);
CREATE TABLE IF NOT EXISTS thread_message_reactions (
message_id integer NOT NULL REFERENCES thread_messages(id) ON DELETE CASCADE,
user_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reaction_type text NOT NULL CHECK (reaction_type IN ('thumbs-up', 'heart', 'laugh', 'fire', 'eyes')),
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (message_id, user_id, reaction_type)
);
CREATE INDEX IF NOT EXISTS thread_message_reactions_message_idx
ON thread_message_reactions(message_id, reaction_type);
CREATE TABLE IF NOT EXISTS thread_follows (
thread_id integer NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
user_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (thread_id, user_id)
);
CREATE INDEX IF NOT EXISTS thread_follows_user_idx
ON thread_follows(user_id, created_at DESC, thread_id DESC);
CREATE TABLE IF NOT EXISTS thread_reads (
thread_id integer NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
user_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
last_read_message_id integer REFERENCES thread_messages(id) ON DELETE SET NULL,
last_read_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (thread_id, user_id)
);
CREATE INDEX IF NOT EXISTS thread_reads_user_idx
ON thread_reads(user_id, thread_id);
CREATE TABLE IF NOT EXISTS thread_ws_tickets (
ticket_hash text PRIMARY KEY,
user_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
expires_at timestamptz NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
CHECK (length(ticket_hash) BETWEEN 32 AND 128)
);
CREATE INDEX IF NOT EXISTS thread_ws_tickets_expires_idx
ON thread_ws_tickets(expires_at);
CREATE TABLE IF NOT EXISTS skills (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name text NOT NULL UNIQUE,

View File

@@ -5,9 +5,10 @@ import {
createApprovedCommentNotification,
createModerationResultNotification
} from './notifications.ts';
import { applyApprovedThreadMessage, publishThreadMessageModeration } from './threadsRealtime.ts';
export type AiModerationStatus = 'unreviewed' | 'reviewing' | 'approved' | 'rejected' | 'failed';
export type AiModerationTargetType = 'life-post' | 'life-comment' | 'discussion-comment';
export type AiModerationTargetType = 'life-post' | 'life-comment' | 'discussion-comment' | 'thread-message';
export type AiModerationApiFormat = 'gemini-generate-content' | 'openai-chat-completions';
export type AiModerationAuthMode = 'query-key' | 'bearer-token';
@@ -254,6 +255,49 @@ const targetQueries: Record<
AND deleted_at IS NULL
RETURNING id
`
},
'thread-message': {
select: `
SELECT
tm.id,
tm.body,
tm.ai_moderation_status AS status,
tm.ai_moderation_language_code AS "languageCode",
tm.ai_moderation_reason AS reason,
tm.ai_moderation_content_hash AS "contentHash"
FROM thread_messages tm
JOIN threads t ON t.id = tm.thread_id
WHERE tm.id = $1
AND tm.deleted_at IS NULL
AND t.deleted_at IS NULL
`,
updateStatus: `
UPDATE thread_messages
SET ai_moderation_status = $2,
ai_moderation_language_code = $3,
ai_moderation_reason = CASE WHEN $2 IN ('rejected', 'failed') THEN $4 ELSE NULL END,
ai_moderation_checked_at = now(),
ai_moderation_updated_at = now()
WHERE id = $1
AND deleted_at IS NULL
`,
updateForReview: `
UPDATE thread_messages
SET ai_moderation_status = 'reviewing',
ai_moderation_language_code = $2,
ai_moderation_reason = NULL,
ai_moderation_content_hash = $3,
ai_moderation_checked_at = NULL,
ai_moderation_retry_count = CASE
WHEN $4::boolean THEN 0
WHEN $5::boolean THEN ai_moderation_retry_count + 1
ELSE ai_moderation_retry_count
END,
ai_moderation_updated_at = now()
WHERE id = $1
AND deleted_at IS NULL
RETURNING id
`
}
};
@@ -595,6 +639,15 @@ async function enqueuePendingAiModeration(): Promise<void> {
WHERE deleted_at IS NULL
AND ai_moderation_status IN ('unreviewed', 'reviewing')
UNION ALL
SELECT 'thread-message'::text AS type, tm.id
FROM thread_messages tm
JOIN threads t ON t.id = tm.thread_id
WHERE tm.deleted_at IS NULL
AND t.deleted_at IS NULL
AND tm.ai_moderation_status IN ('unreviewed', 'reviewing')
LIMIT $1
`,
[retryScanLimit]
@@ -715,9 +768,28 @@ async function updateTargetStatus(
}
try {
await createModerationResultNotification(target, status);
if (target.type === 'thread-message') {
if (status === 'approved') {
await applyApprovedThreadMessage(target.id);
} else {
const row = await queryOne<{ threadId: number }>(
'SELECT thread_id AS "threadId" FROM thread_messages WHERE id = $1',
[target.id]
);
if (row) {
await publishThreadMessageModeration(row.threadId, null);
}
}
return;
}
const notificationTarget = {
type: target.type as Exclude<AiModerationTargetType, 'thread-message'>,
id: target.id
};
await createModerationResultNotification(notificationTarget, status);
if (status === 'approved') {
await createApprovedCommentNotification(target);
await createApprovedCommentNotification(notificationTarget);
}
} catch (error) {
logger?.warn(

View File

@@ -945,7 +945,6 @@ export function setupNotificationWebSocketServer(server: Server, logger: Fastify
server.on('upgrade', async (request, socket) => {
const url = new URL(request.url ?? '/', 'http://localhost');
if (url.pathname !== '/api/notifications/ws') {
socket.destroy();
return;
}

View File

@@ -17,6 +17,13 @@ import {
type AiModerationStatus
} from './aiModeration.ts';
import { createLifePostReactionNotification, createUserFollowNotification } from './notifications.ts';
import {
createThreadWebSocketTicket,
publishThreadMessageCreated,
publishThreadMessageModeration,
publishThreadReactionUpdated,
publishThreadReadUpdated
} from './threadsRealtime.ts';
type QueryValue = string | string[] | undefined;
@@ -26,6 +33,53 @@ type ListPage<T> = {
nextCursor: string | null;
hasMore: boolean;
};
export type ThreadReactionType = 'thumbs-up' | 'heart' | 'laugh' | 'fire' | 'eyes';
export type ThreadReactionCounts = Record<ThreadReactionType, number>;
export type ThreadChannelTag = { id: number; name: string; sortOrder: number };
export type ThreadChannel = {
id: number;
name: string;
allowUserThreads: boolean;
sortOrder: number;
tags: ThreadChannelTag[];
languages: Array<{ code: string; name: string }>;
unreadCount: number;
};
export type ThreadSummary = {
id: number;
channelId: number;
title: string;
languageCode: string;
tags: ThreadChannelTag[];
locked: boolean;
messageCount: number;
lastActiveAt: Date;
createdAt: Date;
author: { id: number; displayName: string } | null;
reactionCounts: ThreadReactionCounts;
myReactions: ThreadReactionType[];
followed: boolean;
unread: boolean;
};
export type ThreadMessage = {
id: number;
threadId: number;
body: string;
moderationStatus: AiModerationStatus;
moderationLanguageCode: string | null;
moderationReason: string | null;
createdAt: Date;
updatedAt: Date;
author: { id: number; displayName: string } | null;
reactionCounts: ThreadReactionCounts;
myReactions: ThreadReactionType[];
};
export type ThreadMessagesPage = {
items: ThreadMessage[];
beforeCursor: string | null;
hasMoreBefore: boolean;
};
export type ThreadsPage = ListPage<ThreadSummary>;
type DbClient = PoolClient;
type DataToolScope = 'pokemon' | 'habitats' | 'items' | 'artifacts' | 'recipes' | 'checklist';
@@ -8837,6 +8891,873 @@ export async function importAdminHabitatsCsv(payload: Record<string, unknown>, u
return getAdminDataToolsSummary();
}
const threadReactionTypes: ThreadReactionType[] = ['thumbs-up', 'heart', 'laugh', 'fire', 'eyes'];
const defaultThreadLimit = 20;
const maxThreadLimit = 50;
const defaultThreadMessageLimit = 30;
const maxThreadMessageLimit = 80;
type ThreadCursor = { value: string; id: number };
type ThreadMessageCursor = { createdAt: string; id: number };
function emptyThreadReactionCounts(): ThreadReactionCounts {
return { 'thumbs-up': 0, heart: 0, laugh: 0, fire: 0, eyes: 0 };
}
function isThreadReactionType(value: unknown): value is ThreadReactionType {
return typeof value === 'string' && threadReactionTypes.includes(value as ThreadReactionType);
}
function cleanThreadReactionType(value: unknown): ThreadReactionType {
if (!isThreadReactionType(value)) {
throw validationError('server.validation.reactionInvalid');
}
return value;
}
function cleanThreadLimit(value: QueryValue, fallback = defaultThreadLimit, max = maxThreadLimit): number {
const raw = Number(asString(value));
return Number.isInteger(raw) && raw > 0 ? Math.min(raw, max) : fallback;
}
function encodeCursor(value: unknown): string {
return Buffer.from(JSON.stringify(value), 'utf8').toString('base64url');
}
function decodeThreadCursor(value: QueryValue): ThreadCursor | null {
const cursor = asString(value);
if (!cursor) return null;
try {
const payload = JSON.parse(Buffer.from(cursor, 'base64url').toString('utf8')) as unknown;
if (payload && typeof payload === 'object') {
const record = payload as Record<string, unknown>;
if (typeof record.value === 'string' && Number.isInteger(Number(record.id))) {
return { value: record.value, id: Number(record.id) };
}
}
} catch {
return null;
}
return null;
}
function decodeThreadMessageCursor(value: QueryValue): ThreadMessageCursor | null {
const cursor = asString(value);
if (!cursor) return null;
try {
const payload = JSON.parse(Buffer.from(cursor, 'base64url').toString('utf8')) as unknown;
if (payload && typeof payload === 'object') {
const record = payload as Record<string, unknown>;
if (typeof record.createdAt === 'string' && Number.isInteger(Number(record.id))) {
return { createdAt: record.createdAt, id: Number(record.id) };
}
}
} catch {
return null;
}
return null;
}
function cleanThreadTitle(value: unknown): string {
const title = cleanName(value, 'server.validation.titleRequired');
if (title.length > 140) {
throw validationError('server.validation.valueTooLong');
}
return title;
}
function cleanThreadMessageBody(value: unknown): string {
const body = cleanName(value, 'server.validation.commentRequired');
if (body.length > 2000) {
throw validationError('server.validation.commentTooLong');
}
return body;
}
function cleanThreadLanguageCode(value: unknown): string {
const languageCode = cleanModerationLanguageCode(value);
if (!languageCode) {
throw validationError('server.validation.languageInvalid');
}
return languageCode;
}
function cleanThreadTagIds(value: unknown): number[] {
if (!Array.isArray(value)) {
return [];
}
return cleanIds(value).slice(0, 8);
}
async function publicThreadChannels(userId: number | null): Promise<ThreadChannel[]> {
const rows = await query<{
id: number;
name: string;
allowUserThreads: boolean;
sortOrder: number;
tags: ThreadChannelTag[] | null;
languages: Array<{ code: string; name: string }> | null;
unreadCount: number;
}>(
`
SELECT
tc.id,
tc.name,
tc.allow_user_threads AS "allowUserThreads",
tc.sort_order AS "sortOrder",
COALESCE(tags.items, '[]'::json) AS tags,
COALESCE(channel_languages.items, fallback_languages.items, '[]'::json) AS languages,
CASE
WHEN $1::integer IS NULL THEN 0
ELSE COALESCE(unread.count, 0)
END AS "unreadCount"
FROM thread_channels tc
LEFT JOIN LATERAL (
SELECT json_agg(json_build_object('id', tct.id, 'name', tct.name, 'sortOrder', tct.sort_order) ORDER BY tct.sort_order, tct.id) AS items
FROM thread_channel_tags tct
WHERE tct.channel_id = tc.id
) tags ON true
LEFT JOIN LATERAL (
SELECT json_agg(json_build_object('code', l.code, 'name', l.name) ORDER BY tcl.sort_order, l.sort_order, l.code) AS items
FROM thread_channel_languages tcl
JOIN languages l ON l.code = tcl.language_code
WHERE tcl.channel_id = tc.id
AND l.enabled = true
) channel_languages ON true
LEFT JOIN LATERAL (
SELECT json_agg(json_build_object('code', l.code, 'name', l.name) ORDER BY l.sort_order, l.code) AS items
FROM languages l
WHERE l.enabled = true
) fallback_languages ON true
LEFT JOIN LATERAL (
SELECT COUNT(*)::integer AS count
FROM thread_follows tf
JOIN threads t ON t.id = tf.thread_id
LEFT JOIN thread_reads tr ON tr.thread_id = t.id AND tr.user_id = tf.user_id
WHERE tf.user_id = $1::integer
AND t.channel_id = tc.id
AND t.deleted_at IS NULL
AND t.last_message_id IS NOT NULL
AND (tr.last_read_message_id IS NULL OR t.last_message_id > tr.last_read_message_id)
) unread ON true
ORDER BY tc.sort_order, tc.id
`,
[userId]
);
return rows.map((row) => ({
id: row.id,
name: row.name,
allowUserThreads: row.allowUserThreads,
sortOrder: row.sortOrder,
tags: row.tags ?? [],
languages: row.languages ?? [],
unreadCount: row.unreadCount
}));
}
export async function listThreadChannels(userId: number | null): Promise<ThreadChannel[]> {
return publicThreadChannels(userId);
}
export async function listAdminThreadChannels(): Promise<ThreadChannel[]> {
return publicThreadChannels(null);
}
async function channelAllowsLanguage(channelId: number, languageCode: string): Promise<boolean> {
const row = await queryOne<{ allowed: boolean }>(
`
SELECT CASE
WHEN EXISTS (SELECT 1 FROM thread_channel_languages WHERE channel_id = $1) THEN EXISTS (
SELECT 1
FROM thread_channel_languages tcl
JOIN languages l ON l.code = tcl.language_code
WHERE tcl.channel_id = $1 AND tcl.language_code = $2 AND l.enabled = true
)
ELSE EXISTS (SELECT 1 FROM languages WHERE code = $2 AND enabled = true)
END AS allowed
`,
[channelId, languageCode]
);
return row?.allowed === true;
}
async function validateThreadTags(channelId: number, tagIds: number[]): Promise<void> {
if (!tagIds.length) return;
const rows = await query<{ id: number }>(
'SELECT id FROM thread_channel_tags WHERE channel_id = $1 AND id = ANY($2::integer[])',
[channelId, tagIds]
);
if (rows.length !== tagIds.length) {
throw validationError('server.validation.invalidField');
}
}
async function threadReactionCounts(threadIds: number[], userId: number | null): Promise<{
counts: Map<number, ThreadReactionCounts>;
mine: Map<number, ThreadReactionType[]>;
}> {
const counts = new Map<number, ThreadReactionCounts>();
const mine = new Map<number, ThreadReactionType[]>();
for (const id of threadIds) counts.set(id, emptyThreadReactionCounts());
if (!threadIds.length) return { counts, mine };
const countRows = await query<{ threadId: number; reactionType: ThreadReactionType; count: number }>(
`
SELECT thread_id AS "threadId", reaction_type AS "reactionType", COUNT(*)::integer AS count
FROM thread_reactions
WHERE thread_id = ANY($1::integer[])
GROUP BY thread_id, reaction_type
`,
[threadIds]
);
for (const row of countRows) {
const item = counts.get(row.threadId);
if (item && isThreadReactionType(row.reactionType)) {
item[row.reactionType] = row.count;
}
}
if (userId !== null) {
const myRows = await query<{ threadId: number; reactionType: ThreadReactionType }>(
`
SELECT thread_id AS "threadId", reaction_type AS "reactionType"
FROM thread_reactions
WHERE user_id = $1
AND thread_id = ANY($2::integer[])
`,
[userId, threadIds]
);
for (const row of myRows) {
if (!isThreadReactionType(row.reactionType)) continue;
mine.set(row.threadId, [...(mine.get(row.threadId) ?? []), row.reactionType]);
}
}
return { counts, mine };
}
async function threadMessageReactionCounts(messageIds: number[], userId: number | null): Promise<{
counts: Map<number, ThreadReactionCounts>;
mine: Map<number, ThreadReactionType[]>;
}> {
const counts = new Map<number, ThreadReactionCounts>();
const mine = new Map<number, ThreadReactionType[]>();
for (const id of messageIds) counts.set(id, emptyThreadReactionCounts());
if (!messageIds.length) return { counts, mine };
const countRows = await query<{ messageId: number; reactionType: ThreadReactionType; count: number }>(
`
SELECT message_id AS "messageId", reaction_type AS "reactionType", COUNT(*)::integer AS count
FROM thread_message_reactions
WHERE message_id = ANY($1::integer[])
GROUP BY message_id, reaction_type
`,
[messageIds]
);
for (const row of countRows) {
const item = counts.get(row.messageId);
if (item && isThreadReactionType(row.reactionType)) {
item[row.reactionType] = row.count;
}
}
if (userId !== null) {
const myRows = await query<{ messageId: number; reactionType: ThreadReactionType }>(
`
SELECT message_id AS "messageId", reaction_type AS "reactionType"
FROM thread_message_reactions
WHERE user_id = $1
AND message_id = ANY($2::integer[])
`,
[userId, messageIds]
);
for (const row of myRows) {
if (!isThreadReactionType(row.reactionType)) continue;
mine.set(row.messageId, [...(mine.get(row.messageId) ?? []), row.reactionType]);
}
}
return { counts, mine };
}
async function hydrateThreads(rows: Array<ThreadSummary & { lastActiveCursor: string }>, userId: number | null): Promise<ThreadSummary[]> {
const ids = rows.map((row) => row.id);
const tags = await query<{ threadId: number; id: number; name: string; sortOrder: number }>(
`
SELECT ttl.thread_id AS "threadId", tct.id, tct.name, tct.sort_order AS "sortOrder"
FROM thread_tag_links ttl
JOIN thread_channel_tags tct ON tct.id = ttl.tag_id
WHERE ttl.thread_id = ANY($1::integer[])
ORDER BY tct.sort_order, tct.id
`,
[ids]
);
const tagsByThread = new Map<number, ThreadChannelTag[]>();
for (const tag of tags) {
tagsByThread.set(tag.threadId, [...(tagsByThread.get(tag.threadId) ?? []), { id: tag.id, name: tag.name, sortOrder: tag.sortOrder }]);
}
const reactions = await threadReactionCounts(ids, userId);
return rows.map((row) => ({
id: row.id,
channelId: row.channelId,
title: row.title,
languageCode: row.languageCode,
tags: tagsByThread.get(row.id) ?? [],
locked: row.locked,
messageCount: row.messageCount,
lastActiveAt: row.lastActiveAt,
createdAt: row.createdAt,
author: row.author,
reactionCounts: reactions.counts.get(row.id) ?? emptyThreadReactionCounts(),
myReactions: reactions.mine.get(row.id) ?? [],
followed: row.followed,
unread: row.unread
}));
}
export async function listThreads(paramsQuery: QueryParams, userId: number | null): Promise<ThreadsPage> {
const limit = cleanThreadLimit(paramsQuery.limit);
const channelId = optionalPositiveInteger(asString(paramsQuery.channelId), 'server.validation.invalidField');
const tagId = optionalPositiveInteger(asString(paramsQuery.tagId), 'server.validation.invalidField');
const language = asString(paramsQuery.language);
const sort = asString(paramsQuery.sort) ?? 'last-active';
const cursor = decodeThreadCursor(paramsQuery.cursor);
const conditions = ['t.deleted_at IS NULL'];
const params: unknown[] = [];
if (channelId !== null) {
params.push(channelId);
conditions.push(`t.channel_id = $${params.length}`);
}
if (tagId !== null) {
params.push(tagId);
conditions.push(`EXISTS (SELECT 1 FROM thread_tag_links ttl WHERE ttl.thread_id = t.id AND ttl.tag_id = $${params.length})`);
}
if (language && language !== 'all') {
params.push(cleanThreadLanguageCode(language));
conditions.push(`t.language_code = $${params.length}`);
}
const orderField = sort === 'latest' ? 't.created_at' : sort === 'most-discussed' ? 't.message_count' : 't.last_active_at';
const orderCursorField = sort === 'latest' ? 'created_at' : sort === 'most-discussed' ? 'message_count' : 'last_active_at';
if (cursor) {
params.push(cursor.value, cursor.id);
conditions.push(`(${orderField}, t.id) < ($${params.length - 1}::${sort === 'most-discussed' ? 'integer' : 'timestamptz'}, $${params.length}::integer)`);
}
params.push(limit + 1, userId);
const limitParam = params.length - 1;
const userParam = params.length;
const rows = await query<ThreadSummary & { lastActiveCursor: string; createdAtCursor: string; messageCountCursor: number }>(
`
SELECT
t.id,
t.channel_id AS "channelId",
t.title,
t.language_code AS "languageCode",
t.locked,
t.message_count AS "messageCount",
t.last_active_at AS "lastActiveAt",
t.last_active_at::text AS "lastActiveCursor",
t.created_at AS "createdAt",
t.created_at::text AS "createdAtCursor",
t.message_count AS "messageCountCursor",
CASE WHEN u.id IS NULL THEN NULL ELSE json_build_object('id', u.id, 'displayName', u.display_name) END AS author,
($${userParam}::integer IS NOT NULL AND tf.user_id IS NOT NULL) AS followed,
($${userParam}::integer IS NOT NULL AND t.last_message_id IS NOT NULL AND (tr.last_read_message_id IS NULL OR t.last_message_id > tr.last_read_message_id)) AS unread
FROM threads t
LEFT JOIN users u ON u.id = t.created_by_user_id
LEFT JOIN thread_follows tf ON tf.thread_id = t.id AND tf.user_id = $${userParam}::integer
LEFT JOIN thread_reads tr ON tr.thread_id = t.id AND tr.user_id = $${userParam}::integer
WHERE ${conditions.join(' AND ')}
ORDER BY ${orderField} DESC, t.id DESC
LIMIT $${limitParam}
`,
params
);
const items = await hydrateThreads(rows.slice(0, limit), userId);
const last = rows.slice(0, limit).at(-1) as (typeof rows)[number] | undefined;
const nextValue = last ? (sort === 'latest' ? last.createdAtCursor : sort === 'most-discussed' ? String(last.messageCountCursor) : last.lastActiveCursor) : null;
return {
items,
nextCursor: rows.length > limit && last && nextValue ? encodeCursor({ value: nextValue, id: last.id }) : null,
hasMore: rows.length > limit
};
}
export async function getThread(threadIdValue: number, userId: number | null): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const rows = await query<ThreadSummary & { lastActiveCursor: string }>(
`
SELECT
t.id,
t.channel_id AS "channelId",
t.title,
t.language_code AS "languageCode",
t.locked,
t.message_count AS "messageCount",
t.last_active_at AS "lastActiveAt",
t.last_active_at::text AS "lastActiveCursor",
t.created_at AS "createdAt",
CASE WHEN u.id IS NULL THEN NULL ELSE json_build_object('id', u.id, 'displayName', u.display_name) END AS author,
($2::integer IS NOT NULL AND tf.user_id IS NOT NULL) AS followed,
($2::integer IS NOT NULL AND t.last_message_id IS NOT NULL AND (tr.last_read_message_id IS NULL OR t.last_message_id > tr.last_read_message_id)) AS unread
FROM threads t
LEFT JOIN users u ON u.id = t.created_by_user_id
LEFT JOIN thread_follows tf ON tf.thread_id = t.id AND tf.user_id = $2::integer
LEFT JOIN thread_reads tr ON tr.thread_id = t.id AND tr.user_id = $2::integer
WHERE t.id = $1
AND t.deleted_at IS NULL
`,
[threadId, userId]
);
return (await hydrateThreads(rows, userId))[0] ?? null;
}
async function getThreadMessageById(messageId: number, userId: number | null, canViewAll = false): Promise<ThreadMessage | null> {
const rows = await query<ThreadMessage>(
`
SELECT
tm.id,
tm.thread_id AS "threadId",
tm.body,
tm.ai_moderation_status AS "moderationStatus",
tm.ai_moderation_language_code AS "moderationLanguageCode",
tm.ai_moderation_reason AS "moderationReason",
tm.created_at AS "createdAt",
tm.updated_at AS "updatedAt",
CASE WHEN u.id IS NULL THEN NULL ELSE json_build_object('id', u.id, 'displayName', u.display_name) END AS author
FROM thread_messages tm
LEFT JOIN users u ON u.id = tm.created_by_user_id
WHERE tm.id = $1
AND tm.deleted_at IS NULL
AND ${moderationVisibilitySql('tm', 'tm.created_by_user_id', userId, canViewAll)}
`,
[messageId]
);
const row = rows[0];
if (!row) return null;
const reactions = await threadMessageReactionCounts([row.id], userId);
return {
...row,
reactionCounts: reactions.counts.get(row.id) ?? emptyThreadReactionCounts(),
myReactions: reactions.mine.get(row.id) ?? []
};
}
export async function listThreadMessages(
threadIdValue: number,
paramsQuery: QueryParams,
userId: number | null,
canViewAll = false
): Promise<ThreadMessagesPage | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const thread = await getThread(threadId, userId);
if (!thread) return null;
const limit = cleanThreadLimit(paramsQuery.limit, defaultThreadMessageLimit, maxThreadMessageLimit);
const before = decodeThreadMessageCursor(paramsQuery.before);
const conditions = ['tm.thread_id = $1', 'tm.deleted_at IS NULL'];
const params: unknown[] = [threadId];
if (before) {
params.push(before.createdAt, before.id);
conditions.push(`(tm.created_at, tm.id) < ($${params.length - 1}::timestamptz, $${params.length}::integer)`);
}
if (!canViewAll) {
if (userId !== null) {
params.push(userId);
conditions.push(`(tm.ai_moderation_status = 'approved' OR tm.created_by_user_id = $${params.length})`);
} else {
conditions.push("tm.ai_moderation_status = 'approved'");
}
}
params.push(limit + 1);
const rows = await query<ThreadMessage & { createdAtCursor: string }>(
`
SELECT
tm.id,
tm.thread_id AS "threadId",
tm.body,
tm.ai_moderation_status AS "moderationStatus",
tm.ai_moderation_language_code AS "moderationLanguageCode",
tm.ai_moderation_reason AS "moderationReason",
tm.created_at AS "createdAt",
tm.created_at::text AS "createdAtCursor",
tm.updated_at AS "updatedAt",
CASE WHEN u.id IS NULL THEN NULL ELSE json_build_object('id', u.id, 'displayName', u.display_name) END AS author
FROM thread_messages tm
LEFT JOIN users u ON u.id = tm.created_by_user_id
WHERE ${conditions.join(' AND ')}
ORDER BY tm.created_at DESC, tm.id DESC
LIMIT $${params.length}
`,
params
);
const pageRows = rows.slice(0, limit).reverse();
const reactions = await threadMessageReactionCounts(pageRows.map((row) => row.id), userId);
const items = pageRows.map((row) => ({
id: row.id,
threadId: row.threadId,
body: row.body,
moderationStatus: row.moderationStatus,
moderationLanguageCode: row.moderationLanguageCode,
moderationReason: row.moderationReason,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
author: row.author,
reactionCounts: reactions.counts.get(row.id) ?? emptyThreadReactionCounts(),
myReactions: reactions.mine.get(row.id) ?? []
}));
const oldest = rows.slice(0, limit).at(-1);
return {
items,
beforeCursor: rows.length > limit && oldest ? encodeCursor({ createdAt: oldest.createdAtCursor, id: oldest.id }) : null,
hasMoreBefore: rows.length > limit
};
}
export async function createThread(payload: Record<string, unknown>, userId: number): Promise<ThreadSummary> {
const channelId = requirePositiveInteger(payload.channelId, 'server.validation.invalidField');
const title = cleanThreadTitle(payload.title);
const languageCode = cleanThreadLanguageCode(payload.languageCode);
const tagIds = cleanThreadTagIds(payload.tagIds);
const messageBody = cleanThreadMessageBody(payload.body);
const channel = await queryOne<{ id: number; allowUserThreads: boolean }>(
'SELECT id, allow_user_threads AS "allowUserThreads" FROM thread_channels WHERE id = $1',
[channelId]
);
if (!channel || !channel.allowUserThreads) {
throw validationError('server.validation.invalidField');
}
if (!(await channelAllowsLanguage(channelId, languageCode))) {
throw validationError('server.validation.languageInvalid');
}
await validateThreadTags(channelId, tagIds);
const ids = await withTransaction(async (client) => {
const threadResult = await client.query<{ id: number }>(
`
INSERT INTO threads (channel_id, title, language_code, created_by_user_id, updated_by_user_id)
VALUES ($1, $2, $3, $4, $4)
RETURNING id
`,
[channelId, title, languageCode, userId]
);
const threadId = threadResult.rows[0].id;
for (const tagId of tagIds) {
await client.query('INSERT INTO thread_tag_links (thread_id, tag_id) VALUES ($1, $2) ON CONFLICT DO NOTHING', [threadId, tagId]);
}
const messageResult = await client.query<{ id: number }>(
`
INSERT INTO thread_messages (thread_id, body, ai_moderation_status, ai_moderation_language_code, created_by_user_id)
VALUES ($1, $2, 'unreviewed', $3, $4)
RETURNING id
`,
[threadId, messageBody, languageCode, userId]
);
await client.query('INSERT INTO thread_follows (thread_id, user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING', [threadId, userId]);
return { threadId, messageId: messageResult.rows[0].id };
});
await requestAiModerationReview({ type: 'thread-message', id: ids.messageId }, { languageCode, resetRetries: true });
return (await getThread(ids.threadId, userId)) as ThreadSummary;
}
export async function createThreadMessage(threadIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadMessage | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const body = cleanThreadMessageBody(payload.body);
const thread = await queryOne<{ id: number; locked: boolean; languageCode: string }>(
'SELECT id, locked, language_code AS "languageCode" FROM threads WHERE id = $1 AND deleted_at IS NULL',
[threadId]
);
if (!thread) return null;
if (thread.locked) {
throw validationError('server.validation.invalidField');
}
const result = await queryOne<{ id: number }>(
`
INSERT INTO thread_messages (thread_id, body, ai_moderation_status, ai_moderation_language_code, created_by_user_id)
VALUES ($1, $2, 'unreviewed', $3, $4)
RETURNING id
`,
[threadId, body, thread.languageCode, userId]
);
if (!result) return null;
await requestAiModerationReview({ type: 'thread-message', id: result.id }, { languageCode: thread.languageCode, resetRetries: true });
return getThreadMessageById(result.id, userId, false);
}
export async function markThreadRead(threadIdValue: number, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const row = await queryOne<{ lastMessageId: number | null }>(
'SELECT last_message_id AS "lastMessageId" FROM threads WHERE id = $1 AND deleted_at IS NULL',
[threadId]
);
if (!row) return null;
await pool.query(
`
INSERT INTO thread_reads (thread_id, user_id, last_read_message_id, last_read_at)
VALUES ($1, $2, $3, now())
ON CONFLICT (thread_id, user_id)
DO UPDATE SET last_read_message_id = EXCLUDED.last_read_message_id,
last_read_at = now()
`,
[threadId, userId, row.lastMessageId]
);
const thread = await getThread(threadId, userId);
await publishThreadReadUpdated(userId, threadId, thread?.unread ?? false, 0);
return thread;
}
export async function followThread(threadIdValue: number, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const thread = await getThread(threadId, userId);
if (!thread) return null;
await pool.query('INSERT INTO thread_follows (thread_id, user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING', [threadId, userId]);
return getThread(threadId, userId);
}
export async function unfollowThread(threadIdValue: number, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const thread = await getThread(threadId, userId);
if (!thread) return null;
await pool.query('DELETE FROM thread_follows WHERE thread_id = $1 AND user_id = $2', [threadId, userId]);
return getThread(threadId, userId);
}
export async function setThreadReaction(threadIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const reactionType = cleanThreadReactionType(payload.reactionType);
const thread = await getThread(threadId, userId);
if (!thread) return null;
await pool.query(
`
INSERT INTO thread_reactions (thread_id, user_id, reaction_type)
VALUES ($1, $2, $3)
ON CONFLICT (thread_id, user_id, reaction_type)
DO UPDATE SET updated_at = now()
`,
[threadId, userId, reactionType]
);
const updated = await getThread(threadId, userId);
if (updated) {
await publishThreadReactionUpdated(userId, {
type: 'thread.reactions.updated',
target: 'thread',
threadId,
messageId: null,
reactionCounts: updated.reactionCounts,
myReactions: updated.myReactions
});
}
return updated;
}
export async function deleteThreadReaction(threadIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const reactionType = cleanThreadReactionType(payload.reactionType);
const thread = await getThread(threadId, userId);
if (!thread) return null;
await pool.query('DELETE FROM thread_reactions WHERE thread_id = $1 AND user_id = $2 AND reaction_type = $3', [threadId, userId, reactionType]);
return getThread(threadId, userId);
}
export async function setThreadMessageReaction(messageIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadMessage | null> {
const messageId = requirePositiveInteger(messageIdValue, 'server.validation.recordInvalid');
const reactionType = cleanThreadReactionType(payload.reactionType);
const message = await getThreadMessageById(messageId, userId);
if (!message || message.moderationStatus !== 'approved') return null;
await pool.query(
`
INSERT INTO thread_message_reactions (message_id, user_id, reaction_type)
VALUES ($1, $2, $3)
ON CONFLICT (message_id, user_id, reaction_type)
DO UPDATE SET updated_at = now()
`,
[messageId, userId, reactionType]
);
const updated = await getThreadMessageById(messageId, userId);
if (updated) {
await publishThreadReactionUpdated(userId, {
type: 'thread.reactions.updated',
target: 'message',
threadId: updated.threadId,
messageId,
reactionCounts: updated.reactionCounts,
myReactions: updated.myReactions
});
}
return updated;
}
export async function deleteThreadMessageReaction(messageIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadMessage | null> {
const messageId = requirePositiveInteger(messageIdValue, 'server.validation.recordInvalid');
const reactionType = cleanThreadReactionType(payload.reactionType);
const message = await getThreadMessageById(messageId, userId);
if (!message) return null;
await pool.query('DELETE FROM thread_message_reactions WHERE message_id = $1 AND user_id = $2 AND reaction_type = $3', [messageId, userId, reactionType]);
return getThreadMessageById(messageId, userId);
}
export async function applyApprovedThreadMessage(messageId: number): Promise<void> {
const row = await queryOne<{ threadId: number }>(
`
UPDATE threads t
SET last_message_id = tm.id,
message_count = (
SELECT COUNT(*)::integer
FROM thread_messages visible_message
WHERE visible_message.thread_id = t.id
AND visible_message.deleted_at IS NULL
AND visible_message.ai_moderation_status = 'approved'
),
last_active_at = GREATEST(t.last_active_at, tm.created_at),
updated_at = now()
FROM thread_messages tm
WHERE tm.id = $1
AND tm.thread_id = t.id
AND tm.deleted_at IS NULL
AND tm.ai_moderation_status = 'approved'
RETURNING t.id AS "threadId"
`,
[messageId]
);
if (!row) return;
const message = await getThreadMessageById(messageId, null, true);
const thread = await getThread(row.threadId, null);
if (message && thread) {
await publishThreadMessageCreated(thread, message);
} else {
await publishThreadMessageModeration(row.threadId, message);
}
}
export async function createAdminThreadChannel(payload: Record<string, unknown>, userId: number): Promise<ThreadChannel[]> {
const name = cleanName(payload.name, 'server.validation.nameRequired');
const allowUserThreads = payload.allowUserThreads !== false;
const tagNames = Array.isArray(payload.tags) ? payload.tags.map((tag) => cleanName(tag, 'server.validation.nameRequired')).slice(0, 20) : [];
const languageCodes = Array.isArray(payload.languages) ? payload.languages.map(cleanThreadLanguageCode).slice(0, 20) : [];
await withTransaction(async (client) => {
const sortOrder = await nextSortOrder(client, 'thread_channels');
const result = await client.query<{ id: number }>(
`
INSERT INTO thread_channels (name, allow_user_threads, sort_order, created_by_user_id, updated_by_user_id)
VALUES ($1, $2, $3, $4, $4)
RETURNING id
`,
[name, allowUserThreads, sortOrder, userId]
);
await replaceThreadChannelConfig(client, result.rows[0].id, tagNames, languageCodes);
});
return listAdminThreadChannels();
}
async function replaceThreadChannelConfig(client: DbClient, channelId: number, tagNames: string[], languageCodes: string[]): Promise<void> {
await client.query('DELETE FROM thread_channel_tags WHERE channel_id = $1', [channelId]);
await client.query('DELETE FROM thread_channel_languages WHERE channel_id = $1', [channelId]);
for (const [index, tagName] of [...new Set(tagNames)].entries()) {
await client.query('INSERT INTO thread_channel_tags (channel_id, name, sort_order) VALUES ($1, $2, $3)', [channelId, tagName, (index + 1) * 10]);
}
for (const [index, languageCode] of [...new Set(languageCodes)].entries()) {
await client.query('INSERT INTO thread_channel_languages (channel_id, language_code, sort_order) VALUES ($1, $2, $3)', [
channelId,
languageCode,
(index + 1) * 10
]);
}
}
export async function updateAdminThreadChannel(channelIdValue: number, payload: Record<string, unknown>, userId: number): Promise<ThreadChannel[] | null> {
const channelId = requirePositiveInteger(channelIdValue, 'server.validation.recordInvalid');
const name = cleanName(payload.name, 'server.validation.nameRequired');
const allowUserThreads = payload.allowUserThreads !== false;
const tagNames = Array.isArray(payload.tags) ? payload.tags.map((tag) => cleanName(tag, 'server.validation.nameRequired')).slice(0, 20) : [];
const languageCodes = Array.isArray(payload.languages) ? payload.languages.map(cleanThreadLanguageCode).slice(0, 20) : [];
const updated = await withTransaction(async (client) => {
const result = await client.query(
`
UPDATE thread_channels
SET name = $1, allow_user_threads = $2, updated_by_user_id = $3, updated_at = now()
WHERE id = $4
`,
[name, allowUserThreads, userId, channelId]
);
if (!result.rowCount) return false;
await replaceThreadChannelConfig(client, channelId, tagNames, languageCodes);
return true;
});
return updated ? listAdminThreadChannels() : null;
}
export async function deleteAdminThreadChannel(channelIdValue: number): Promise<boolean> {
const channelId = requirePositiveInteger(channelIdValue, 'server.validation.recordInvalid');
const result = await pool.query('DELETE FROM thread_channels WHERE id = $1', [channelId]);
return Boolean(result.rowCount);
}
export async function updateThreadLock(threadIdValue: number, locked: boolean, userId: number): Promise<ThreadSummary | null> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const result = await pool.query(
'UPDATE threads SET locked = $1, updated_by_user_id = $2, updated_at = now() WHERE id = $3 AND deleted_at IS NULL',
[locked, userId, threadId]
);
return result.rowCount ? getThread(threadId, userId) : null;
}
export async function deleteThread(threadIdValue: number, userId: number): Promise<boolean> {
const threadId = requirePositiveInteger(threadIdValue, 'server.validation.recordInvalid');
const result = await pool.query(
'UPDATE threads SET deleted_at = now(), deleted_by_user_id = $1, updated_at = now() WHERE id = $2 AND deleted_at IS NULL',
[userId, threadId]
);
return Boolean(result.rowCount);
}
export async function deleteThreadMessage(messageIdValue: number, userId: number): Promise<boolean> {
const messageId = requirePositiveInteger(messageIdValue, 'server.validation.recordInvalid');
const result = await pool.query<{ threadId: number }>(
`
UPDATE thread_messages
SET deleted_at = now(), deleted_by_user_id = $1, updated_at = now()
WHERE id = $2
AND deleted_at IS NULL
RETURNING thread_id AS "threadId"
`,
[userId, messageId]
);
if (!result.rowCount) return false;
await pool.query(
`
UPDATE threads t
SET message_count = (
SELECT COUNT(*)::integer
FROM thread_messages tm
WHERE tm.thread_id = t.id
AND tm.deleted_at IS NULL
AND tm.ai_moderation_status = 'approved'
),
last_message_id = (
SELECT tm.id
FROM thread_messages tm
WHERE tm.thread_id = t.id
AND tm.deleted_at IS NULL
AND tm.ai_moderation_status = 'approved'
ORDER BY tm.created_at DESC, tm.id DESC
LIMIT 1
),
updated_at = now()
WHERE t.id = $1
`,
[result.rows[0].threadId]
);
return true;
}
export async function createThreadsWsTicketForUser(userId: number): Promise<{ ticket: string; expiresAt: Date }> {
return createThreadWebSocketTicket(userId);
}
export async function wipeAdminData(payload: Record<string, unknown>): Promise<{ scopes: DataToolScopeSummary[] }> {
const scopes = cleanDataToolScopes(payload.scopes);
await withTransaction(async (client) => {

View File

@@ -50,8 +50,13 @@ import {
createLifePost,
createPokemon,
createRecipe,
createAdminThreadChannel,
createThread,
createThreadMessage,
createThreadsWsTicketForUser,
deleteConfig,
deleteAncientArtifact,
deleteAdminThreadChannel,
deleteDailyChecklistItem,
deleteDish,
deleteDishCategory,
@@ -67,10 +72,15 @@ import {
deleteLifePostReaction,
deletePokemon,
deleteRecipe,
deleteThread,
deleteThreadMessage,
deleteThreadMessageReaction,
deleteThreadReaction,
exportAdminData,
fetchPokemonData,
fetchPokemonImageOptions,
followUser,
followThread,
getAdminDataToolsSummary,
getAncientArtifact,
getHabitat,
@@ -81,6 +91,7 @@ import {
getPokemon,
getPublicUserProfile,
getRecipe,
getThread,
globalSearch,
importAdminData,
importAdminHabitatsCsv,
@@ -88,6 +99,7 @@ import {
isConfigType,
listAncientArtifacts,
listEntityDiscussionComments,
listAdminThreadChannels,
listConfig,
listDailyChecklistItems,
listHabitats,
@@ -100,6 +112,9 @@ import {
listPokemon,
listPokemonFetchOptions,
listRecipes,
listThreadChannels,
listThreadMessages,
listThreads,
listUserCommentActivities,
listUserLifePosts,
listUserReactionActivities,
@@ -112,6 +127,7 @@ import {
reorderItems,
reorderLanguages,
reorderRecipes,
markThreadRead,
retryEntityDiscussionCommentModeration,
retryLifeCommentModeration,
retryLifePostModeration,
@@ -120,6 +136,8 @@ import {
setLifePostReaction,
setEntityDiscussionCommentLike,
setLifeCommentLike,
setThreadMessageReaction,
setThreadReaction,
updateConfig,
updateAncientArtifact,
updateDailyChecklistItem,
@@ -131,7 +149,10 @@ import {
updateLifePost,
updatePokemon,
updateRecipe,
updateAdminThreadChannel,
updateThreadLock,
unfollowUser,
unfollowThread,
wipeAdminData
} from './queries.ts';
import {
@@ -160,6 +181,7 @@ import {
markNotificationRead,
setupNotificationWebSocketServer
} from './notifications.ts';
import { setupThreadWebSocketServer } from './threadsRealtime.ts';
const app = Fastify({
logger: true,
@@ -1689,6 +1711,129 @@ app.delete('/api/discussions/comments/:id/like', async (request, reply) => {
return comment ? comment : notFound(reply, request);
});
app.get('/api/thread-channels', async (request) => {
const user = await optionalUser(request);
return listThreadChannels(user?.id ?? null);
});
app.get('/api/threads', async (request) => {
const user = await optionalUser(request);
return listThreads(request.query as Record<string, string | string[] | undefined>, user?.id ?? null);
});
app.post('/api/threads/ws-ticket', async (request, reply) => {
const user = await requireVerifiedUser(request, reply);
if (!user) {
return;
}
return createThreadsWsTicketForUser(user.id);
});
app.post('/api/threads', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.create', 'communityWrite');
return user ? reply.code(201).send(await createThread(request.body as Record<string, unknown>, user.id)) : undefined;
});
app.get('/api/threads/:id', async (request, reply) => {
const { id } = request.params as { id: string };
const user = await optionalUser(request);
const thread = await getThread(Number(id), user?.id ?? null);
return thread ? thread : notFound(reply, request);
});
app.get('/api/threads/:id/messages', async (request, reply) => {
const { id } = request.params as { id: string };
const user = await optionalUser(request);
const canViewAll = user ? userHasPermission(user, 'admin.threads.messages.delete') : false;
const messages = await listThreadMessages(
Number(id),
request.query as Record<string, string | string[] | undefined>,
user?.id ?? null,
canViewAll
);
return messages ? messages : notFound(reply, request);
});
app.post('/api/threads/:id/messages', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.messages.create', 'communityWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const message = await createThreadMessage(Number(id), request.body as Record<string, unknown>, user.id);
return message ? reply.code(201).send(message) : notFound(reply, request);
});
app.put('/api/threads/:id/follow', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.follow', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const thread = await followThread(Number(id), user.id);
return thread ? thread : notFound(reply, request);
});
app.delete('/api/threads/:id/follow', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.follow', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const thread = await unfollowThread(Number(id), user.id);
return thread ? thread : notFound(reply, request);
});
app.post('/api/threads/:id/read', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.follow', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const thread = await markThreadRead(Number(id), user.id);
return thread ? thread : notFound(reply, request);
});
app.put('/api/threads/:id/reaction', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.reactions.set', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const thread = await setThreadReaction(Number(id), request.body as Record<string, unknown>, user.id);
return thread ? thread : notFound(reply, request);
});
app.delete('/api/threads/:id/reaction', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.reactions.set', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const thread = await deleteThreadReaction(Number(id), request.body as Record<string, unknown>, user.id);
return thread ? thread : notFound(reply, request);
});
app.put('/api/thread-messages/:id/reaction', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.reactions.set', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const message = await setThreadMessageReaction(Number(id), request.body as Record<string, unknown>, user.id);
return message ? message : notFound(reply, request);
});
app.delete('/api/thread-messages/:id/reaction', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'threads.reactions.set', 'communityReaction');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const message = await deleteThreadMessageReaction(Number(id), request.body as Record<string, unknown>, user.id);
return message ? message : notFound(reply, request);
});
app.get('/api/pokemon', async (request) =>
listPokemon(request.query as Record<string, string | string[] | undefined>, requestLocale(request))
);
@@ -2215,6 +2360,69 @@ app.post('/api/admin/data-tools/wipe', async (request, reply) => {
return user ? wipeAdminData(request.body as Record<string, unknown>) : undefined;
});
app.get('/api/admin/thread-channels', async (request, reply) => {
const user = await requirePermission(request, reply, 'admin.threads.channels.read');
return user ? listAdminThreadChannels() : undefined;
});
app.post('/api/admin/thread-channels', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.channels.create', 'adminWrite');
return user
? reply.code(201).send(await createAdminThreadChannel(request.body as Record<string, unknown>, user.id))
: undefined;
});
app.put('/api/admin/thread-channels/:id', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.channels.update', 'adminWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const channels = await updateAdminThreadChannel(Number(id), request.body as Record<string, unknown>, user.id);
return channels ? channels : notFound(reply, request);
});
app.delete('/api/admin/thread-channels/:id', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.channels.delete', 'adminWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const deleted = await deleteAdminThreadChannel(Number(id));
return deleted ? reply.code(204).send() : notFound(reply, request);
});
app.put('/api/admin/threads/:id/lock', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.threads.lock', 'adminWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const payload = request.body as Record<string, unknown>;
const thread = await updateThreadLock(Number(id), payload.locked === true, user.id);
return thread ? thread : notFound(reply, request);
});
app.delete('/api/admin/threads/:id', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.threads.delete', 'adminWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const deleted = await deleteThread(Number(id), user.id);
return deleted ? reply.code(204).send() : notFound(reply, request);
});
app.delete('/api/admin/thread-messages/:id', async (request, reply) => {
const user = await requirePermissionWithRateLimits(request, reply, 'admin.threads.messages.delete', 'adminWrite');
if (!user) {
return;
}
const { id } = request.params as { id: string };
const deleted = await deleteThreadMessage(Number(id), user.id);
return deleted ? reply.code(204).send() : notFound(reply, request);
});
app.get('/api/admin/config/:type', async (request, reply) => {
const user = await requirePermission(request, reply, 'admin.config.read');
if (!user) {
@@ -2286,6 +2494,7 @@ try {
await syncSystemWordingCatalog();
await startAiModerationWorker(app.log);
setupNotificationWebSocketServer(app.server, app.log);
setupThreadWebSocketServer(app.server, app.log);
await app.listen({ host: '0.0.0.0', port });
} catch (error) {
app.log.error(error);

View File

@@ -0,0 +1,421 @@
import type { FastifyBaseLogger } from 'fastify';
import { Buffer } from 'node:buffer';
import { createHash, randomBytes } from 'node:crypto';
import type { Server } from 'node:http';
import type { Duplex } from 'node:stream';
import { pool, query, queryOne } from './db.ts';
import type { ThreadMessage, ThreadReactionCounts, ThreadReactionType, ThreadSummary } from './queries.ts';
export type ThreadWsMessage =
| { type: 'threads.connected'; followedUnreadCount: number }
| { type: 'thread.message.created'; threadId: number; message: ThreadMessage; thread: ThreadSummary }
| { type: 'thread.message.moderation'; threadId: number; message: ThreadMessage | null }
| {
type: 'thread.reactions.updated';
target: 'thread' | 'message';
threadId: number;
messageId: number | null;
reactionCounts: ThreadReactionCounts;
myReactions: ThreadReactionType[];
}
| { type: 'thread.read.updated'; threadId: number; unread: boolean; unreadCount: number };
const websocketGuid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
const websocketTicketMinutes = 2;
const threadClients = new Map<number, Set<Duplex>>();
const clientUsers = new WeakMap<Duplex, number>();
function hashToken(token: string): string {
return createHash('sha256').update(token).digest('hex');
}
export async function createThreadWebSocketTicket(userId: number): Promise<{ ticket: string; expiresAt: Date }> {
const ticket = randomBytes(32).toString('base64url');
const expiresAt = new Date(Date.now() + websocketTicketMinutes * 60_000);
await pool.query(
`
INSERT INTO thread_ws_tickets (ticket_hash, user_id, expires_at)
VALUES ($1, $2, $3)
`,
[hashToken(ticket), userId, expiresAt]
);
await pool.query('DELETE FROM thread_ws_tickets WHERE expires_at < now()');
return { ticket, expiresAt };
}
async function consumeThreadWebSocketTicket(ticket: string): Promise<number | null> {
if (!ticket) {
return null;
}
const row = await queryOne<{ userId: number }>(
`
DELETE FROM thread_ws_tickets
WHERE ticket_hash = $1
AND expires_at > now()
RETURNING user_id AS "userId"
`,
[hashToken(ticket)]
);
return row?.userId ?? null;
}
async function followedUnreadCount(userId: number): Promise<number> {
const row = await queryOne<{ count: number }>(
`
SELECT COUNT(*)::integer AS count
FROM thread_follows tf
JOIN threads t ON t.id = tf.thread_id
LEFT JOIN thread_reads tr ON tr.thread_id = t.id AND tr.user_id = tf.user_id
WHERE tf.user_id = $1
AND t.deleted_at IS NULL
AND t.last_message_id IS NOT NULL
AND (
tr.last_read_message_id IS NULL
OR t.last_message_id > tr.last_read_message_id
)
`,
[userId]
);
return row?.count ?? 0;
}
function wsFrame(data: Buffer, opcode = 0x1): Buffer {
const length = data.byteLength;
if (length < 126) {
return Buffer.concat([Buffer.from([0x80 | opcode, length]), data]);
}
if (length < 65536) {
const header = Buffer.alloc(4);
header[0] = 0x80 | opcode;
header[1] = 126;
header.writeUInt16BE(length, 2);
return Buffer.concat([header, data]);
}
const header = Buffer.alloc(10);
header[0] = 0x80 | opcode;
header[1] = 127;
header.writeBigUInt64BE(BigInt(length), 2);
return Buffer.concat([header, data]);
}
function sendWsJson(socket: Duplex, message: ThreadWsMessage): void {
if (!socket.destroyed) {
socket.write(wsFrame(Buffer.from(JSON.stringify(message), 'utf8')));
}
}
function websocketPayload(buffer: Buffer): { opcode: number; payload: Buffer } | null {
if (buffer.byteLength < 2) {
return null;
}
const opcode = buffer[0] & 0x0f;
const masked = (buffer[1] & 0x80) !== 0;
let length = buffer[1] & 0x7f;
let offset = 2;
if (length === 126) {
if (buffer.byteLength < offset + 2) return null;
length = buffer.readUInt16BE(offset);
offset += 2;
} else if (length === 127) {
if (buffer.byteLength < offset + 8) return null;
const longLength = buffer.readBigUInt64BE(offset);
if (longLength > BigInt(Number.MAX_SAFE_INTEGER)) return null;
length = Number(longLength);
offset += 8;
}
let mask: Buffer | null = null;
if (masked) {
if (buffer.byteLength < offset + 4) return null;
mask = buffer.subarray(offset, offset + 4);
offset += 4;
}
if (buffer.byteLength < offset + length) {
return null;
}
const payload = Buffer.from(buffer.subarray(offset, offset + length));
if (mask) {
for (let index = 0; index < payload.byteLength; index += 1) {
payload[index] ^= mask[index % 4];
}
}
return { opcode, payload };
}
function closeSocket(socket: Duplex, statusCode = 1000): void {
if (socket.destroyed) {
return;
}
const payload = Buffer.alloc(2);
payload.writeUInt16BE(statusCode, 0);
socket.end(wsFrame(payload, 0x8));
}
function rejectUpgrade(socket: Duplex, statusCode: number, statusText: string): void {
socket.write(`HTTP/1.1 ${statusCode} ${statusText}\r\nConnection: close\r\n\r\n`);
socket.destroy();
}
function addThreadClient(userId: number, socket: Duplex): void {
clientUsers.set(socket, userId);
let clients = threadClients.get(userId);
if (!clients) {
clients = new Set();
threadClients.set(userId, clients);
}
clients.add(socket);
socket.on('close', () => {
clients?.delete(socket);
if (clients?.size === 0) {
threadClients.delete(userId);
}
});
}
async function recipientUserIds(threadId: number): Promise<number[]> {
const rows = await query<{ userId: number }>(
`
SELECT DISTINCT user_id AS "userId"
FROM thread_follows
WHERE thread_id = $1
`,
[threadId]
);
return rows.map((row) => row.userId);
}
function connectedUserIds(): number[] {
return [...threadClients.keys()];
}
async function publishToUsers(userIds: number[], message: ThreadWsMessage): Promise<void> {
for (const userId of userIds) {
const clients = threadClients.get(userId);
if (!clients) {
continue;
}
for (const socket of clients) {
sendWsJson(socket, message);
}
}
}
export async function publishThreadMessageCreated(thread: ThreadSummary, message: ThreadMessage): Promise<void> {
const users = [...new Set([...(await recipientUserIds(thread.id)), ...connectedUserIds()])];
if (message.author?.id && !users.includes(message.author.id)) {
users.push(message.author.id);
}
await publishToUsers(users, {
type: 'thread.message.created',
threadId: thread.id,
message,
thread
});
}
export async function applyApprovedThreadMessage(messageId: number): Promise<void> {
const row = await queryOne<{
threadId: number;
channelId: number;
title: string;
languageCode: string;
locked: boolean;
messageCount: number;
lastActiveAt: Date;
threadCreatedAt: Date;
threadAuthor: { id: number; displayName: string } | null;
messageBody: string;
moderationStatus: ThreadMessage['moderationStatus'];
moderationLanguageCode: string | null;
moderationReason: string | null;
messageCreatedAt: Date;
messageUpdatedAt: Date;
messageAuthor: { id: number; displayName: string } | null;
}>(
`
WITH updated_thread AS (
UPDATE threads t
SET last_message_id = tm.id,
message_count = (
SELECT COUNT(*)::integer
FROM thread_messages visible_message
WHERE visible_message.thread_id = t.id
AND visible_message.deleted_at IS NULL
AND visible_message.ai_moderation_status = 'approved'
),
last_active_at = GREATEST(t.last_active_at, tm.created_at),
updated_at = now()
FROM thread_messages tm
WHERE tm.id = $1
AND tm.thread_id = t.id
AND tm.deleted_at IS NULL
AND tm.ai_moderation_status = 'approved'
RETURNING
t.id,
t.channel_id,
t.title,
t.language_code,
t.locked,
t.message_count,
t.last_active_at,
t.created_at,
t.created_by_user_id
)
SELECT
ut.id AS "threadId",
ut.channel_id AS "channelId",
ut.title,
ut.language_code AS "languageCode",
ut.locked,
ut.message_count AS "messageCount",
ut.last_active_at AS "lastActiveAt",
ut.created_at AS "threadCreatedAt",
CASE WHEN thread_user.id IS NULL THEN NULL ELSE json_build_object('id', thread_user.id, 'displayName', thread_user.display_name) END AS "threadAuthor",
tm.body AS "messageBody",
tm.ai_moderation_status AS "moderationStatus",
tm.ai_moderation_language_code AS "moderationLanguageCode",
tm.ai_moderation_reason AS "moderationReason",
tm.created_at AS "messageCreatedAt",
tm.updated_at AS "messageUpdatedAt",
CASE WHEN message_user.id IS NULL THEN NULL ELSE json_build_object('id', message_user.id, 'displayName', message_user.display_name) END AS "messageAuthor"
FROM updated_thread ut
JOIN thread_messages tm ON tm.id = $1
LEFT JOIN users thread_user ON thread_user.id = ut.created_by_user_id
LEFT JOIN users message_user ON message_user.id = tm.created_by_user_id
`,
[messageId]
);
if (!row) {
return;
}
await publishThreadMessageCreated(
{
id: row.threadId,
channelId: row.channelId,
title: row.title,
languageCode: row.languageCode,
tags: [],
locked: row.locked,
messageCount: row.messageCount,
lastActiveAt: row.lastActiveAt,
createdAt: row.threadCreatedAt,
author: row.threadAuthor,
reactionCounts: { 'thumbs-up': 0, heart: 0, laugh: 0, fire: 0, eyes: 0 },
myReactions: [],
followed: true,
unread: true
},
{
id: messageId,
threadId: row.threadId,
body: row.messageBody,
moderationStatus: row.moderationStatus,
moderationLanguageCode: row.moderationLanguageCode,
moderationReason: row.moderationReason,
createdAt: row.messageCreatedAt,
updatedAt: row.messageUpdatedAt,
author: row.messageAuthor,
reactionCounts: { 'thumbs-up': 0, heart: 0, laugh: 0, fire: 0, eyes: 0 },
myReactions: []
}
);
}
export async function publishThreadMessageModeration(threadId: number, message: ThreadMessage | null): Promise<void> {
await publishToUsers([...new Set([...(await recipientUserIds(threadId)), ...connectedUserIds()])], {
type: 'thread.message.moderation',
threadId,
message
});
}
export async function publishThreadReactionUpdated(
userId: number,
message: Extract<ThreadWsMessage, { type: 'thread.reactions.updated' }>
): Promise<void> {
const users = await recipientUserIds(message.threadId);
for (const connectedUserId of connectedUserIds()) {
if (!users.includes(connectedUserId)) {
users.push(connectedUserId);
}
}
if (!users.includes(userId)) {
users.push(userId);
}
await publishToUsers(users, message);
}
export async function publishThreadReadUpdated(userId: number, threadId: number, unread: boolean, unreadCount: number): Promise<void> {
await publishToUsers([userId], { type: 'thread.read.updated', threadId, unread, unreadCount });
}
export function setupThreadWebSocketServer(server: Server, logger: FastifyBaseLogger): void {
server.on('upgrade', async (request, socket) => {
const url = new URL(request.url ?? '/', 'http://localhost');
if (url.pathname !== '/api/threads/ws') {
return;
}
const key = request.headers['sec-websocket-key'];
if (request.method !== 'GET' || typeof key !== 'string' || key.trim() === '') {
rejectUpgrade(socket, 400, 'Bad Request');
return;
}
try {
const ticket = url.searchParams.get('ticket') ?? '';
const userId = await consumeThreadWebSocketTicket(ticket);
if (!userId) {
rejectUpgrade(socket, 401, 'Unauthorized');
return;
}
const accept = createHash('sha1').update(`${key}${websocketGuid}`).digest('base64');
socket.write(
[
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${accept}`,
'\r\n'
].join('\r\n')
);
addThreadClient(userId, socket);
sendWsJson(socket, {
type: 'threads.connected',
followedUnreadCount: await followedUnreadCount(userId)
});
socket.on('data', (buffer: Buffer) => {
const frame = websocketPayload(buffer);
if (!frame) {
return;
}
if (frame.opcode === 0x8) {
closeSocket(socket);
} else if (frame.opcode === 0x9) {
socket.write(wsFrame(frame.payload, 0x0a));
}
});
socket.on('error', () => {
socket.destroy();
});
} catch (error) {
logger.warn({ err: error }, 'Thread WebSocket upgrade failed');
rejectUpgrade(socket, 500, 'Internal Server Error');
}
});
}