Agent trace logs are now stored in-memory (pilotRunStore) and broadcast through the existing /ws/crm-updates WebSocket channel. When a client reconnects, it receives a pilot.catchup with all accumulated logs so the user sees agent progress even after page reload. Three new WS event types: pilot.trace, pilot.finished, pilot.catchup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
285 lines
8.1 KiB
TypeScript
285 lines
8.1 KiB
TypeScript
import { prisma } from "../../utils/prisma";
|
|
import { getActivePilotRun } from "../../utils/pilotRunStore";
|
|
|
|
function mapChannel(channel: string) {
|
|
if (channel === "TELEGRAM") return "Telegram";
|
|
if (channel === "WHATSAPP") return "WhatsApp";
|
|
if (channel === "INSTAGRAM") return "Instagram";
|
|
if (channel === "EMAIL") return "Email";
|
|
return "Phone";
|
|
}
|
|
|
|
const COOKIE_USER = "cf_user";
|
|
const COOKIE_TEAM = "cf_team";
|
|
const COOKIE_CONV = "cf_conv";
|
|
|
|
const TEAM_POLL_INTERVAL_MS = 2000;
|
|
|
|
const peersByTeam = new Map<string, Set<any>>();
|
|
const peerTeamById = new Map<string, string>();
|
|
const lastSignatureByTeam = new Map<string, string>();
|
|
const lastMsgCreatedAtByTeam = new Map<string, Date>();
|
|
|
|
// Conversation-level tracking for pilot events
|
|
const peersByConversation = new Map<string, Set<any>>();
|
|
const peerConversationById = new Map<string, string>();
|
|
|
|
let pollTimer: ReturnType<typeof setInterval> | null = null;
|
|
|
|
function parseCookies(raw: string | null) {
|
|
const out = new Map<string, string>();
|
|
for (const part of String(raw ?? "").split(";")) {
|
|
const [key, ...rest] = part.trim().split("=");
|
|
if (!key) continue;
|
|
const value = rest.join("=");
|
|
try {
|
|
out.set(key, decodeURIComponent(value));
|
|
} catch {
|
|
out.set(key, value);
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
function attachPeerToTeam(peer: any, teamId: string) {
|
|
if (!peersByTeam.has(teamId)) peersByTeam.set(teamId, new Set());
|
|
peersByTeam.get(teamId)?.add(peer);
|
|
peerTeamById.set(String(peer.id), teamId);
|
|
}
|
|
|
|
function attachPeerToConversation(peer: any, conversationId: string) {
|
|
if (!peersByConversation.has(conversationId)) peersByConversation.set(conversationId, new Set());
|
|
peersByConversation.get(conversationId)?.add(peer);
|
|
peerConversationById.set(String(peer.id), conversationId);
|
|
}
|
|
|
|
function detachPeer(peer: any) {
|
|
const key = String(peer.id);
|
|
|
|
// Detach from team
|
|
const teamId = peerTeamById.get(key);
|
|
if (teamId) {
|
|
peerTeamById.delete(key);
|
|
const peers = peersByTeam.get(teamId);
|
|
if (peers) {
|
|
peers.delete(peer);
|
|
if (peers.size === 0) {
|
|
peersByTeam.delete(teamId);
|
|
lastSignatureByTeam.delete(teamId);
|
|
lastMsgCreatedAtByTeam.delete(teamId);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Detach from conversation
|
|
const convId = peerConversationById.get(key);
|
|
if (convId) {
|
|
peerConversationById.delete(key);
|
|
const convPeers = peersByConversation.get(convId);
|
|
if (convPeers) {
|
|
convPeers.delete(peer);
|
|
if (convPeers.size === 0) {
|
|
peersByConversation.delete(convId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
function stopPollIfIdle() {
|
|
if (peersByTeam.size > 0 || !pollTimer) return;
|
|
clearInterval(pollTimer);
|
|
pollTimer = null;
|
|
}
|
|
|
|
async function validateSessionFromPeer(peer: any) {
|
|
const cookieHeader = peer?.request?.headers?.get?.("cookie") ?? null;
|
|
const cookies = parseCookies(cookieHeader);
|
|
|
|
const userId = String(cookies.get(COOKIE_USER) ?? "").trim();
|
|
const teamId = String(cookies.get(COOKIE_TEAM) ?? "").trim();
|
|
const conversationId = String(cookies.get(COOKIE_CONV) ?? "").trim();
|
|
if (!userId || !teamId || !conversationId) return null;
|
|
|
|
const [user, team, conv] = await Promise.all([
|
|
prisma.user.findUnique({ where: { id: userId }, select: { id: true } }),
|
|
prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }),
|
|
prisma.aiConversation.findFirst({
|
|
where: { id: conversationId, teamId, createdByUserId: userId },
|
|
select: { id: true },
|
|
}),
|
|
]);
|
|
|
|
if (!user || !team || !conv) return null;
|
|
return { teamId, conversationId };
|
|
}
|
|
|
|
async function computeTeamSignature(teamId: string) {
|
|
const [
|
|
omniMessageMax,
|
|
contactMax,
|
|
contactMessageMax,
|
|
telegramConnectionMax,
|
|
contactInboxMax,
|
|
inboxPrefMax,
|
|
clientTimelineEntryMax,
|
|
] = await Promise.all([
|
|
prisma.omniMessage.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
prisma.contact.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
prisma.contactMessage.aggregate({
|
|
where: { contact: { teamId } },
|
|
_max: { createdAt: true },
|
|
}),
|
|
prisma.telegramBusinessConnection.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
prisma.contactInbox.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
prisma.contactInboxPreference.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
prisma.clientTimelineEntry.aggregate({
|
|
where: { teamId },
|
|
_max: { updatedAt: true },
|
|
}),
|
|
]);
|
|
|
|
return [
|
|
omniMessageMax._max.updatedAt?.toISOString() ?? "",
|
|
contactMax._max.updatedAt?.toISOString() ?? "",
|
|
contactMessageMax._max.createdAt?.toISOString() ?? "",
|
|
telegramConnectionMax._max.updatedAt?.toISOString() ?? "",
|
|
contactInboxMax._max.updatedAt?.toISOString() ?? "",
|
|
inboxPrefMax._max.updatedAt?.toISOString() ?? "",
|
|
clientTimelineEntryMax._max.updatedAt?.toISOString() ?? "",
|
|
].join("|");
|
|
}
|
|
|
|
function sendJson(peer: any, payload: Record<string, unknown>) {
|
|
try {
|
|
peer.send(JSON.stringify(payload));
|
|
} catch {
|
|
// ignore socket write errors
|
|
}
|
|
}
|
|
|
|
async function checkNewMessages(teamId: string) {
|
|
const lastTs = lastMsgCreatedAtByTeam.get(teamId) ?? new Date(0);
|
|
try {
|
|
const msgs = await prisma.contactMessage.findMany({
|
|
where: { contact: { teamId }, createdAt: { gt: lastTs } },
|
|
include: { contact: { select: { id: true, name: true } } },
|
|
orderBy: { createdAt: "asc" },
|
|
take: 50,
|
|
});
|
|
if (msgs.length) {
|
|
lastMsgCreatedAtByTeam.set(teamId, msgs[msgs.length - 1]!.createdAt);
|
|
}
|
|
return msgs;
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async function pollAndBroadcast() {
|
|
for (const [teamId, peers] of peersByTeam.entries()) {
|
|
if (!peers.size) continue;
|
|
|
|
// Check for new messages → send granular events
|
|
const newMessages = await checkNewMessages(teamId);
|
|
for (const msg of newMessages) {
|
|
const payload = {
|
|
type: "message.new",
|
|
contactId: msg.contact.id,
|
|
contactName: msg.contact.name,
|
|
text: msg.content ?? "",
|
|
channel: mapChannel(msg.channel),
|
|
direction: msg.direction === "OUTBOUND" ? "out" : "in",
|
|
at: msg.occurredAt?.toISOString() ?? msg.createdAt.toISOString(),
|
|
};
|
|
for (const peer of peers) {
|
|
sendJson(peer, payload);
|
|
}
|
|
}
|
|
|
|
// Check for other dashboard changes (contacts, calendar, deals, etc.)
|
|
const signature = await computeTeamSignature(teamId);
|
|
const previous = lastSignatureByTeam.get(teamId);
|
|
if (signature === previous) continue;
|
|
|
|
lastSignatureByTeam.set(teamId, signature);
|
|
const payload = {
|
|
type: "dashboard.changed",
|
|
teamId,
|
|
at: new Date().toISOString(),
|
|
};
|
|
|
|
for (const peer of peers) {
|
|
sendJson(peer, payload);
|
|
}
|
|
}
|
|
}
|
|
|
|
function ensurePoll() {
|
|
if (pollTimer) return;
|
|
pollTimer = setInterval(() => {
|
|
void pollAndBroadcast();
|
|
}, TEAM_POLL_INTERVAL_MS);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Public: broadcast to all peers connected for a given conversation
|
|
// ---------------------------------------------------------------------------
|
|
export function broadcastToConversation(conversationId: string, payload: Record<string, unknown>) {
|
|
const peers = peersByConversation.get(conversationId);
|
|
if (!peers) return;
|
|
for (const peer of peers) {
|
|
sendJson(peer, payload);
|
|
}
|
|
}
|
|
|
|
export default defineWebSocketHandler({
|
|
async open(peer) {
|
|
const session = await validateSessionFromPeer(peer);
|
|
if (!session) {
|
|
peer.close(4401, "Unauthorized");
|
|
return;
|
|
}
|
|
|
|
attachPeerToTeam(peer, session.teamId);
|
|
attachPeerToConversation(peer, session.conversationId);
|
|
ensurePoll();
|
|
sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() });
|
|
void pollAndBroadcast();
|
|
|
|
// Send catch-up for active pilot run
|
|
const activeRun = getActivePilotRun(session.conversationId);
|
|
if (activeRun) {
|
|
sendJson(peer, {
|
|
type: "pilot.catchup",
|
|
logs: activeRun.logs,
|
|
at: new Date().toISOString(),
|
|
});
|
|
}
|
|
},
|
|
|
|
close(peer) {
|
|
detachPeer(peer);
|
|
stopPollIfIdle();
|
|
},
|
|
|
|
error(peer) {
|
|
detachPeer(peer);
|
|
stopPollIfIdle();
|
|
},
|
|
});
|