import { prisma } from "../../utils/prisma"; function mapChannel(channel: string) { if (channel === "TELEGRAM") return "Telegram"; if (channel === "WHATSAPP") return "WhatsApp"; if (channel === "INSTAGRAM") return "Instagram"; if (channel === "EMAIL") return "Email"; return "Phone"; } const COOKIE_USER = "cf_user"; const COOKIE_TEAM = "cf_team"; const COOKIE_CONV = "cf_conv"; const TEAM_POLL_INTERVAL_MS = 2000; const peersByTeam = new Map>(); const peerTeamById = new Map(); const lastSignatureByTeam = new Map(); const lastMsgCreatedAtByTeam = new Map(); let pollTimer: ReturnType | null = null; function parseCookies(raw: string | null) { const out = new Map(); for (const part of String(raw ?? "").split(";")) { const [key, ...rest] = part.trim().split("="); if (!key) continue; const value = rest.join("="); try { out.set(key, decodeURIComponent(value)); } catch { out.set(key, value); } } return out; } function attachPeerToTeam(peer: any, teamId: string) { if (!peersByTeam.has(teamId)) peersByTeam.set(teamId, new Set()); peersByTeam.get(teamId)?.add(peer); peerTeamById.set(String(peer.id), teamId); } function detachPeer(peer: any) { const key = String(peer.id); const teamId = peerTeamById.get(key); if (!teamId) return; peerTeamById.delete(key); const peers = peersByTeam.get(teamId); if (!peers) return; peers.delete(peer); if (peers.size === 0) { peersByTeam.delete(teamId); lastSignatureByTeam.delete(teamId); lastMsgCreatedAtByTeam.delete(teamId); } } function stopPollIfIdle() { if (peersByTeam.size > 0 || !pollTimer) return; clearInterval(pollTimer); pollTimer = null; } async function validateSessionFromPeer(peer: any) { const cookieHeader = peer?.request?.headers?.get?.("cookie") ?? null; const cookies = parseCookies(cookieHeader); const userId = String(cookies.get(COOKIE_USER) ?? "").trim(); const teamId = String(cookies.get(COOKIE_TEAM) ?? "").trim(); const conversationId = String(cookies.get(COOKIE_CONV) ?? "").trim(); if (!userId || !teamId || !conversationId) return null; const [user, team, conv] = await Promise.all([ prisma.user.findUnique({ where: { id: userId }, select: { id: true } }), prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }), prisma.aiConversation.findFirst({ where: { id: conversationId, teamId, createdByUserId: userId }, select: { id: true }, }), ]); if (!user || !team || !conv) return null; return { teamId }; } async function computeTeamSignature(teamId: string) { const [ omniMessageMax, contactMax, contactMessageMax, telegramConnectionMax, contactInboxMax, inboxPrefMax, clientTimelineEntryMax, ] = await Promise.all([ prisma.omniMessage.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contact.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactMessage.aggregate({ where: { contact: { teamId } }, _max: { createdAt: true }, }), prisma.telegramBusinessConnection.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactInbox.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.contactInboxPreference.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), prisma.clientTimelineEntry.aggregate({ where: { teamId }, _max: { updatedAt: true }, }), ]); return [ omniMessageMax._max.updatedAt?.toISOString() ?? "", contactMax._max.updatedAt?.toISOString() ?? "", contactMessageMax._max.createdAt?.toISOString() ?? "", telegramConnectionMax._max.updatedAt?.toISOString() ?? "", contactInboxMax._max.updatedAt?.toISOString() ?? "", inboxPrefMax._max.updatedAt?.toISOString() ?? "", clientTimelineEntryMax._max.updatedAt?.toISOString() ?? "", ].join("|"); } function sendJson(peer: any, payload: Record) { try { peer.send(JSON.stringify(payload)); } catch { // ignore socket write errors } } async function checkNewMessages(teamId: string) { const lastTs = lastMsgCreatedAtByTeam.get(teamId) ?? new Date(0); try { const msgs = await prisma.contactMessage.findMany({ where: { contact: { teamId }, createdAt: { gt: lastTs } }, include: { contact: { select: { id: true, name: true } } }, orderBy: { createdAt: "asc" }, take: 50, }); if (msgs.length) { lastMsgCreatedAtByTeam.set(teamId, msgs[msgs.length - 1]!.createdAt); } return msgs; } catch { return []; } } async function pollAndBroadcast() { for (const [teamId, peers] of peersByTeam.entries()) { if (!peers.size) continue; // Check for new messages → send granular events const newMessages = await checkNewMessages(teamId); for (const msg of newMessages) { const payload = { type: "message.new", contactId: msg.contact.id, contactName: msg.contact.name, text: msg.content ?? "", channel: mapChannel(msg.channel), direction: msg.direction === "OUTBOUND" ? "out" : "in", at: msg.occurredAt?.toISOString() ?? msg.createdAt.toISOString(), }; for (const peer of peers) { sendJson(peer, payload); } } // Check for other dashboard changes (contacts, calendar, deals, etc.) const signature = await computeTeamSignature(teamId); const previous = lastSignatureByTeam.get(teamId); if (signature === previous) continue; lastSignatureByTeam.set(teamId, signature); const payload = { type: "dashboard.changed", teamId, at: new Date().toISOString(), }; for (const peer of peers) { sendJson(peer, payload); } } } function ensurePoll() { if (pollTimer) return; pollTimer = setInterval(() => { void pollAndBroadcast(); }, TEAM_POLL_INTERVAL_MS); } export default defineWebSocketHandler({ async open(peer) { const session = await validateSessionFromPeer(peer); if (!session) { peer.close(4401, "Unauthorized"); return; } attachPeerToTeam(peer, session.teamId); ensurePoll(); sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() }); void pollAndBroadcast(); }, close(peer) { detachPeer(peer); stopPollIfIdle(); }, error(peer) { detachPeer(peer); stopPollIfIdle(); }, });