feat: granular WebSocket message.new events
- WebSocket now detects new ContactMessages and broadcasts message.new events with contactId, text, channel, direction - Frontend handles message.new: refreshes timeline for open chat, refreshes contacts for sidebar preview update - dashboard.changed still fires for non-message changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -659,6 +659,14 @@ const { crmRealtimeState, startCrmRealtime, stopCrmRealtime } = useCrmRealtime({
|
||||
onDashboardChanged: async () => {
|
||||
await Promise.all([refetchAllCrmQueries(), loadTelegramConnectStatus()]);
|
||||
},
|
||||
onNewMessage: (msg) => {
|
||||
// If the message is for the currently open thread → refresh its timeline
|
||||
if (msg.contactId === selectedCommThreadId.value) {
|
||||
void refreshSelectedClientTimeline(selectedCommThreadId.value);
|
||||
}
|
||||
// Refresh contacts to update sidebar preview (lastMessageText, lastAt)
|
||||
void refetchContacts();
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1,8 +1,18 @@
|
||||
import { ref } from "vue";
|
||||
|
||||
export type RealtimeNewMessage = {
|
||||
contactId: string;
|
||||
contactName: string;
|
||||
text: string;
|
||||
channel: string;
|
||||
direction: string;
|
||||
at: string;
|
||||
};
|
||||
|
||||
export function useCrmRealtime(opts: {
|
||||
isAuthenticated: () => boolean;
|
||||
onDashboardChanged: () => Promise<void>;
|
||||
onNewMessage?: (msg: RealtimeNewMessage) => void;
|
||||
}) {
|
||||
const crmRealtimeState = ref<"idle" | "connecting" | "open" | "error">("idle");
|
||||
let crmRealtimeSocket: WebSocket | null = null;
|
||||
@@ -99,10 +109,13 @@ export function useCrmRealtime(opts: {
|
||||
const raw = typeof event.data === "string" ? event.data : "";
|
||||
if (!raw) return;
|
||||
try {
|
||||
const payload = JSON.parse(raw) as { type?: string };
|
||||
const payload = JSON.parse(raw) as { type?: string; [key: string]: any };
|
||||
if (payload.type === "dashboard.changed") {
|
||||
scheduleCrmRealtimeRefresh();
|
||||
}
|
||||
if (payload.type === "message.new" && opts.onNewMessage) {
|
||||
opts.onNewMessage(payload as unknown as RealtimeNewMessage);
|
||||
}
|
||||
} catch {
|
||||
// ignore malformed realtime payloads
|
||||
}
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
import { prisma } from "../../utils/prisma";
|
||||
|
||||
function mapChannel(channel: string) {
|
||||
if (channel === "TELEGRAM") return "Telegram";
|
||||
if (channel === "WHATSAPP") return "WhatsApp";
|
||||
if (channel === "INSTAGRAM") return "Instagram";
|
||||
if (channel === "EMAIL") return "Email";
|
||||
return "Phone";
|
||||
}
|
||||
|
||||
const COOKIE_USER = "cf_user";
|
||||
const COOKIE_TEAM = "cf_team";
|
||||
const COOKIE_CONV = "cf_conv";
|
||||
@@ -9,6 +17,7 @@ const TEAM_POLL_INTERVAL_MS = 2000;
|
||||
const peersByTeam = new Map<string, Set<any>>();
|
||||
const peerTeamById = new Map<string, string>();
|
||||
const lastSignatureByTeam = new Map<string, string>();
|
||||
const lastMsgCreatedAtByTeam = new Map<string, Date>();
|
||||
|
||||
let pollTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
@@ -45,6 +54,7 @@ function detachPeer(peer: any) {
|
||||
if (peers.size === 0) {
|
||||
peersByTeam.delete(teamId);
|
||||
lastSignatureByTeam.delete(teamId);
|
||||
lastMsgCreatedAtByTeam.delete(teamId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,9 +145,46 @@ function sendJson(peer: any, payload: Record<string, unknown>) {
|
||||
}
|
||||
}
|
||||
|
||||
async function checkNewMessages(teamId: string) {
|
||||
const lastTs = lastMsgCreatedAtByTeam.get(teamId) ?? new Date(0);
|
||||
try {
|
||||
const msgs = await prisma.contactMessage.findMany({
|
||||
where: { contact: { teamId }, createdAt: { gt: lastTs } },
|
||||
include: { contact: { select: { id: true, name: true } } },
|
||||
orderBy: { createdAt: "asc" },
|
||||
take: 50,
|
||||
});
|
||||
if (msgs.length) {
|
||||
lastMsgCreatedAtByTeam.set(teamId, msgs[msgs.length - 1]!.createdAt);
|
||||
}
|
||||
return msgs;
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function pollAndBroadcast() {
|
||||
for (const [teamId, peers] of peersByTeam.entries()) {
|
||||
if (!peers.size) continue;
|
||||
|
||||
// Check for new messages → send granular events
|
||||
const newMessages = await checkNewMessages(teamId);
|
||||
for (const msg of newMessages) {
|
||||
const payload = {
|
||||
type: "message.new",
|
||||
contactId: msg.contact.id,
|
||||
contactName: msg.contact.name,
|
||||
text: msg.content ?? "",
|
||||
channel: mapChannel(msg.channel),
|
||||
direction: msg.direction === "OUTBOUND" ? "out" : "in",
|
||||
at: msg.occurredAt?.toISOString() ?? msg.createdAt.toISOString(),
|
||||
};
|
||||
for (const peer of peers) {
|
||||
sendJson(peer, payload);
|
||||
}
|
||||
}
|
||||
|
||||
// Check for other dashboard changes (contacts, calendar, deals, etc.)
|
||||
const signature = await computeTeamSignature(teamId);
|
||||
const previous = lastSignatureByTeam.get(teamId);
|
||||
if (signature === previous) continue;
|
||||
|
||||
Reference in New Issue
Block a user