import { WebSocketServer } from "ws"; import Redis from "ioredis"; const redisSub = new Redis({ host: process.env.REDIS_HOST || "main_redis", port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD || "", }); const redisPub = new Redis({ host: process.env.REDIS_HOST || "main_redis", port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD || "", }); const PORT = 8080; const wss = new WebSocketServer({ port: PORT }); const clients = new Map(); // ws => { app, channels: Set } function parseMessage(msg) { try { return JSON.parse(msg); } catch { return null; } } wss.on("connection", (ws) => { console.log("👤 新客户端连接"); clients.set(ws, { app: null, channels: new Set() }); ws.on("message", async (raw) => { const msg = parseMessage(raw); if (!msg) return; const { type, app, channel, event, data, token } = msg; if (type === "auth") { clients.get(ws).app = app; ws.send(JSON.stringify({ type: "auth_ok", app })); return; } if (type === "subscribe") { clients.get(ws).channels.add(channel); ws.send(JSON.stringify({ type: "subscribed", channel })); return; } if (type === "publish") { const payload = { app, channel, event, data }; await redisPub.publish(channel, JSON.stringify(payload)); return; } }); ws.on("close", () => { clients.delete(ws); console.log("❌ 客户端断开"); }); }); // Redis 订阅所有频道 redisSub.psubscribe("*", (err) => { if (err) console.error("Redis 订阅失败", err); }); // 分发消息 redisSub.on("pmessage", (pattern, channel, message) => { const payload = JSON.parse(message); for (const [ws, info] of clients) { if (info.channels.has(channel)) { ws.send(JSON.stringify(payload)); } } }); console.log(`✅ WebSocket 服务运行在 ws://0.0.0.0:${PORT}`);