From 643d8d02ba7ab8c2902d377574953b368c611bc7 Mon Sep 17 00:00:00 2001 From: Ruslan Bakiev <572431+veikab@users.noreply.github.com> Date: Tue, 24 Feb 2026 20:04:55 +0700 Subject: [PATCH] feat: granular WebSocket message.new events - WebSocket now detects new ContactMessages and broadcasts message.new events with contactId, text, channel, direction - Frontend handles message.new: refreshes timeline for open chat, refreshes contacts for sidebar preview update - dashboard.changed still fires for non-message changes Co-Authored-By: Claude Opus 4.6 --- .../components/workspace/CrmWorkspaceApp.vue | 8 ++++ frontend/app/composables/useCrmRealtime.ts | 15 +++++- frontend/server/routes/ws/crm-updates.ts | 47 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/frontend/app/components/workspace/CrmWorkspaceApp.vue b/frontend/app/components/workspace/CrmWorkspaceApp.vue index e515cab..4e6ea18 100644 --- a/frontend/app/components/workspace/CrmWorkspaceApp.vue +++ b/frontend/app/components/workspace/CrmWorkspaceApp.vue @@ -659,6 +659,14 @@ const { crmRealtimeState, startCrmRealtime, stopCrmRealtime } = useCrmRealtime({ onDashboardChanged: async () => { await Promise.all([refetchAllCrmQueries(), loadTelegramConnectStatus()]); }, + onNewMessage: (msg) => { + // If the message is for the currently open thread → refresh its timeline + if (msg.contactId === selectedCommThreadId.value) { + void refreshSelectedClientTimeline(selectedCommThreadId.value); + } + // Refresh contacts to update sidebar preview (lastMessageText, lastAt) + void refetchContacts(); + }, }); // --------------------------------------------------------------------------- diff --git a/frontend/app/composables/useCrmRealtime.ts b/frontend/app/composables/useCrmRealtime.ts index cedbe97..835f272 100644 --- a/frontend/app/composables/useCrmRealtime.ts +++ b/frontend/app/composables/useCrmRealtime.ts @@ -1,8 +1,18 @@ import { ref } from "vue"; +export type RealtimeNewMessage = { + contactId: string; + contactName: string; + text: string; + channel: string; + direction: string; + at: string; +}; + export function useCrmRealtime(opts: { isAuthenticated: () => boolean; onDashboardChanged: () => Promise; + onNewMessage?: (msg: RealtimeNewMessage) => void; }) { const crmRealtimeState = ref<"idle" | "connecting" | "open" | "error">("idle"); let crmRealtimeSocket: WebSocket | null = null; @@ -99,10 +109,13 @@ export function useCrmRealtime(opts: { const raw = typeof event.data === "string" ? event.data : ""; if (!raw) return; try { - const payload = JSON.parse(raw) as { type?: string }; + const payload = JSON.parse(raw) as { type?: string; [key: string]: any }; if (payload.type === "dashboard.changed") { scheduleCrmRealtimeRefresh(); } + if (payload.type === "message.new" && opts.onNewMessage) { + opts.onNewMessage(payload as unknown as RealtimeNewMessage); + } } catch { // ignore malformed realtime payloads } diff --git a/frontend/server/routes/ws/crm-updates.ts b/frontend/server/routes/ws/crm-updates.ts index d844236..6ca4922 100644 --- a/frontend/server/routes/ws/crm-updates.ts +++ b/frontend/server/routes/ws/crm-updates.ts @@ -1,5 +1,13 @@ import { prisma } from "../../utils/prisma"; +function mapChannel(channel: string) { + if (channel === "TELEGRAM") return "Telegram"; + if (channel === "WHATSAPP") return "WhatsApp"; + if (channel === "INSTAGRAM") return "Instagram"; + if (channel === "EMAIL") return "Email"; + return "Phone"; +} + const COOKIE_USER = "cf_user"; const COOKIE_TEAM = "cf_team"; const COOKIE_CONV = "cf_conv"; @@ -9,6 +17,7 @@ const TEAM_POLL_INTERVAL_MS = 2000; const peersByTeam = new Map>(); const peerTeamById = new Map(); const lastSignatureByTeam = new Map(); +const lastMsgCreatedAtByTeam = new Map(); let pollTimer: ReturnType | null = null; @@ -45,6 +54,7 @@ function detachPeer(peer: any) { if (peers.size === 0) { peersByTeam.delete(teamId); lastSignatureByTeam.delete(teamId); + lastMsgCreatedAtByTeam.delete(teamId); } } @@ -135,9 +145,46 @@ function sendJson(peer: any, payload: Record) { } } +async function checkNewMessages(teamId: string) { + const lastTs = lastMsgCreatedAtByTeam.get(teamId) ?? new Date(0); + try { + const msgs = await prisma.contactMessage.findMany({ + where: { contact: { teamId }, createdAt: { gt: lastTs } }, + include: { contact: { select: { id: true, name: true } } }, + orderBy: { createdAt: "asc" }, + take: 50, + }); + if (msgs.length) { + lastMsgCreatedAtByTeam.set(teamId, msgs[msgs.length - 1]!.createdAt); + } + return msgs; + } catch { + return []; + } +} + async function pollAndBroadcast() { for (const [teamId, peers] of peersByTeam.entries()) { if (!peers.size) continue; + + // Check for new messages → send granular events + const newMessages = await checkNewMessages(teamId); + for (const msg of newMessages) { + const payload = { + type: "message.new", + contactId: msg.contact.id, + contactName: msg.contact.name, + text: msg.content ?? "", + channel: mapChannel(msg.channel), + direction: msg.direction === "OUTBOUND" ? "out" : "in", + at: msg.occurredAt?.toISOString() ?? msg.createdAt.toISOString(), + }; + for (const peer of peers) { + sendJson(peer, payload); + } + } + + // Check for other dashboard changes (contacts, calendar, deals, etc.) const signature = await computeTeamSignature(teamId); const previous = lastSignatureByTeam.get(teamId); if (signature === previous) continue;