import { prisma } from "../../utils/prisma"; const COOKIE_USER = "cf_user"; const COOKIE_TEAM = "cf_team"; const COOKIE_CONV = "cf_conv"; const TEAM_POLL_INTERVAL_MS = 2000; const peersByTeam = new Map>(); const peerTeamById = new Map(); const lastSignatureByTeam = new Map(); let pollTimer: ReturnType | null = null; function parseCookies(raw: string | null) { const out = new Map(); for (const part of String(raw ?? "").split(";")) { const [key, ...rest] = part.trim().split("="); if (!key) continue; const value = rest.join("="); try { out.set(key, decodeURIComponent(value)); } catch { out.set(key, value); } } return out; } function attachPeerToTeam(peer: any, teamId: string) { if (!peersByTeam.has(teamId)) peersByTeam.set(teamId, new Set()); peersByTeam.get(teamId)?.add(peer); peerTeamById.set(String(peer.id), teamId); } function detachPeer(peer: any) { const key = String(peer.id); const teamId = peerTeamById.get(key); if (!teamId) return; peerTeamById.delete(key); const peers = peersByTeam.get(teamId); if (!peers) return; peers.delete(peer); if (peers.size === 0) { peersByTeam.delete(teamId); lastSignatureByTeam.delete(teamId); } } function stopPollIfIdle() { if (peersByTeam.size > 0 || !pollTimer) return; clearInterval(pollTimer); pollTimer = null; } async function validateSessionFromPeer(peer: any) { const cookieHeader = peer?.request?.headers?.get?.("cookie") ?? null; const cookies = parseCookies(cookieHeader); const userId = String(cookies.get(COOKIE_USER) ?? "").trim(); const teamId = String(cookies.get(COOKIE_TEAM) ?? "").trim(); const conversationId = String(cookies.get(COOKIE_CONV) ?? "").trim(); if (!userId || !teamId || !conversationId) return null; const [user, team, conv] = await Promise.all([ prisma.user.findUnique({ where: { id: userId }, select: { id: true } }), prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }), prisma.aiConversation.findFirst({ where: { id: conversationId, teamId, createdByUserId: userId }, select: { id: true }, }), ]); if (!user || !team || !conv) return null; return { teamId }; } async function computeTeamSignature(teamId: string) { const [ omniMessageMax, contactMax, contactMessageMax, telegramConnectionMax, contactInboxMax, inboxPrefMax, clientTimelineEntryMax, ] = await Promise.all([ prisma.omniMessage.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contact.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactMessage.aggregate({ where: { contact: { teamId } }, _max: { createdAt: true }, }), prisma.telegramBusinessConnection.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactInbox.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactInboxPreference.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.clientTimelineEntry.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), ]); return [ omniMessageMax._max.updatedAt?.toISOString() ?? "", contactMax._max.updatedAt?.toISOString() ?? "", contactMessageMax._max.createdAt?.toISOString() ?? "", telegramConnectionMax._max.updatedAt?.toISOString() ?? "", contactInboxMax._max.updatedAt?.toISOString() ?? "", inboxPrefMax._max.updatedAt?.toISOString() ?? "", clientTimelineEntryMax._max.updatedAt?.toISOString() ?? "", ].join("|"); } function sendJson(peer: any, payload: Record) { try { peer.send(JSON.stringify(payload)); } catch { // ignore socket write errors } } async function pollAndBroadcast() { for (const [teamId, peers] of peersByTeam.entries()) { if (!peers.size) continue; const signature = await computeTeamSignature(teamId); const previous = lastSignatureByTeam.get(teamId); if (signature === previous) continue; lastSignatureByTeam.set(teamId, signature); const payload = { type: "dashboard.changed", teamId, at: new Date().toISOString(), }; for (const peer of peers) { sendJson(peer, payload); } } } function ensurePoll() { if (pollTimer) return; pollTimer = setInterval(() => { void pollAndBroadcast(); }, TEAM_POLL_INTERVAL_MS); } export default defineWebSocketHandler({ async open(peer) { const session = await validateSessionFromPeer(peer); if (!session) { peer.close(4401, "Unauthorized"); return; } attachPeerToTeam(peer, session.teamId); ensurePoll(); sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() }); void pollAndBroadcast(); }, close(peer) { detachPeer(peer); stopPollIfIdle(); }, error(peer) { detachPeer(peer); stopPollIfIdle(); }, });