diff --git a/frontend/app.vue b/frontend/app.vue index 9aea6a3..73c1f43 100644 --- a/frontend/app.vue +++ b/frontend/app.vue @@ -492,6 +492,12 @@ const loginBusy = ref(false); let pilotBackgroundPoll: ReturnType | null = null; const lifecycleNowMs = ref(Date.now()); let lifecycleClock: ReturnType | null = null; +const crmRealtimeState = ref<"idle" | "connecting" | "open" | "error">("idle"); +let crmRealtimeSocket: WebSocket | null = null; +let crmRealtimeReconnectTimer: ReturnType | null = null; +let crmRealtimeRefreshTimer: ReturnType | null = null; +let crmRealtimeRefreshInFlight = false; +let crmRealtimeReconnectAttempt = 0; watch( () => pilotLiveLogs.value.length, @@ -846,6 +852,7 @@ async function bootstrapSession() { try { await loadMe(); if (!authMe.value) { + stopCrmRealtime(); pilotMessages.value = []; chatConversations.value = []; telegramConnectStatus.value = "not_connected"; @@ -854,7 +861,11 @@ async function bootstrapSession() { return; } await Promise.all([loadPilotMessages(), loadChatConversations(), refreshCrmData(), loadTelegramConnectStatus()]); + if (process.client) { + startCrmRealtime(); + } } catch { + stopCrmRealtime(); authMe.value = null; pilotMessages.value = []; chatConversations.value = []; @@ -911,6 +922,7 @@ async function login() { }); await loadMe(); startPilotBackgroundPolling(); + startCrmRealtime(); await Promise.all([loadPilotMessages(), loadChatConversations(), refreshCrmData(), loadTelegramConnectStatus()]); } catch (e: any) { loginError.value = e?.data?.message || e?.message || "Login failed"; @@ -921,6 +933,7 @@ async function login() { async function logout() { await gqlFetch<{ logout: { ok: boolean } }>(logoutMutation); + stopCrmRealtime(); stopPilotBackgroundPolling(); authMe.value = null; pilotMessages.value = []; @@ -966,6 +979,121 @@ async function refreshCrmData() { })); } +function clearCrmRealtimeReconnectTimer() { + if (!crmRealtimeReconnectTimer) return; + clearTimeout(crmRealtimeReconnectTimer); + crmRealtimeReconnectTimer = null; +} + +function clearCrmRealtimeRefreshTimer() { + if (!crmRealtimeRefreshTimer) return; + clearTimeout(crmRealtimeRefreshTimer); + crmRealtimeRefreshTimer = null; +} + +async function runCrmRealtimeRefresh() { + if (!authMe.value || crmRealtimeRefreshInFlight) return; + crmRealtimeRefreshInFlight = true; + try { + await Promise.all([refreshCrmData(), loadTelegramConnectStatus()]); + } catch { + // ignore transient realtime refresh errors + } finally { + crmRealtimeRefreshInFlight = false; + } +} + +function scheduleCrmRealtimeRefresh(delayMs = 250) { + clearCrmRealtimeRefreshTimer(); + crmRealtimeRefreshTimer = setTimeout(() => { + crmRealtimeRefreshTimer = null; + void runCrmRealtimeRefresh(); + }, delayMs); +} + +function scheduleCrmRealtimeReconnect() { + clearCrmRealtimeReconnectTimer(); + const attempt = Math.min(crmRealtimeReconnectAttempt + 1, 8); + crmRealtimeReconnectAttempt = attempt; + const delayMs = Math.min(1000 * 2 ** (attempt - 1), 15000); + crmRealtimeReconnectTimer = setTimeout(() => { + crmRealtimeReconnectTimer = null; + startCrmRealtime(); + }, delayMs); +} + +function stopCrmRealtime() { + clearCrmRealtimeReconnectTimer(); + clearCrmRealtimeRefreshTimer(); + + if (crmRealtimeSocket) { + const socket = crmRealtimeSocket; + crmRealtimeSocket = null; + socket.onopen = null; + socket.onmessage = null; + socket.onerror = null; + socket.onclose = null; + try { + socket.close(1000, "client stop"); + } catch { + // ignore socket close errors + } + } + + crmRealtimeState.value = "idle"; +} + +function startCrmRealtime() { + if (process.server || !authMe.value) return; + if (crmRealtimeSocket) { + const state = crmRealtimeSocket.readyState; + if (state === WebSocket.OPEN || state === WebSocket.CONNECTING) return; + } + + clearCrmRealtimeReconnectTimer(); + const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + const url = `${protocol}//${window.location.host}/ws/crm-updates`; + + const socket = new WebSocket(url); + crmRealtimeSocket = socket; + crmRealtimeState.value = "connecting"; + + socket.onopen = () => { + crmRealtimeState.value = "open"; + crmRealtimeReconnectAttempt = 0; + }; + + socket.onmessage = (event) => { + const raw = typeof event.data === "string" ? event.data : ""; + if (!raw) return; + try { + const payload = JSON.parse(raw) as { type?: string }; + if (payload.type === "dashboard.changed") { + scheduleCrmRealtimeRefresh(); + } + } catch { + // ignore malformed realtime payloads + } + }; + + socket.onerror = () => { + crmRealtimeState.value = "error"; + }; + + socket.onclose = () => { + const wasActive = crmRealtimeSocket === socket; + if (wasActive) { + crmRealtimeSocket = null; + } + if (!authMe.value) { + crmRealtimeState.value = "idle"; + return; + } + crmRealtimeState.value = "error"; + scheduleCrmRealtimeReconnect(); + }; +} + async function sendPilotText(rawText: string) { const text = rawText.trim(); if (!text || pilotSending.value) return; @@ -2059,15 +2187,22 @@ onMounted(() => { if (!authResolved.value) { void bootstrapSession().finally(() => { - if (authMe.value) startPilotBackgroundPolling(); + if (authMe.value) { + startPilotBackgroundPolling(); + startCrmRealtime(); + } }); return; } - if (authMe.value) startPilotBackgroundPolling(); + if (authMe.value) { + startPilotBackgroundPolling(); + startCrmRealtime(); + } }); onBeforeUnmount(() => { + stopCrmRealtime(); if (pilotRecording.value) { stopPilotRecording("fill"); } @@ -2866,6 +3001,7 @@ function openDocumentsTab(push = false) { const peopleListMode = ref<"contacts" | "deals">("contacts"); const peopleSearch = ref(""); const peopleSortMode = ref("lastContact"); +const brokenAvatarByContactId = ref>({}); const peopleSortOptions: Array<{ value: PeopleSortMode; label: string }> = [ { value: "lastContact", label: "Last contact" }, { value: "name", label: "Name" }, @@ -2875,6 +3011,31 @@ const peopleSortOptions: Array<{ value: PeopleSortMode; label: string }> = [ const selectedDealId = ref(deals.value[0]?.id ?? ""); const selectedDealStepsExpanded = ref(false); +function contactInitials(name: string) { + const words = String(name ?? "") + .trim() + .split(/\s+/) + .filter(Boolean); + if (!words.length) return "?"; + return words + .slice(0, 2) + .map((part) => part[0]?.toUpperCase() ?? "") + .join(""); +} + +function avatarSrcForThread(thread: { id: string; avatar: string }) { + if (brokenAvatarByContactId.value[thread.id]) return ""; + return String(thread.avatar ?? "").trim(); +} + +function markAvatarBroken(contactId: string) { + if (!contactId) return; + brokenAvatarByContactId.value = { + ...brokenAvatarByContactId.value, + [contactId]: true, + }; +} + const commThreads = computed(() => { const sorted = [...commItems.value].sort((a, b) => a.at.localeCompare(b.at)); const map = new Map(); @@ -4978,7 +5139,15 @@ async function decideFeedCard(card: FeedCard, decision: "accepted" | "rejected")
- + + + {{ contactInitials(thread.contact) }} +
diff --git a/frontend/nuxt.config.ts b/frontend/nuxt.config.ts index b7f3188..fe56aa7 100644 --- a/frontend/nuxt.config.ts +++ b/frontend/nuxt.config.ts @@ -4,6 +4,11 @@ export default defineNuxtConfig({ compatibilityDate: "2025-07-15", devtools: { enabled: true }, css: ["~/assets/css/main.css"], + nitro: { + experimental: { + websocket: true, + }, + }, vite: { plugins: [tailwindcss() as any], diff --git a/frontend/server/api/omni/telegram/avatar/[userId].get.ts b/frontend/server/api/omni/telegram/avatar/[userId].get.ts new file mode 100644 index 0000000..ef55606 --- /dev/null +++ b/frontend/server/api/omni/telegram/avatar/[userId].get.ts @@ -0,0 +1,63 @@ +import { getRouterParam, setHeader } from "h3"; +import { getAuthContext } from "../../../../utils/auth"; +import { requireTelegramBotToken, telegramApiBase, telegramBotApi } from "../../../../utils/telegram"; + +type TelegramProfilePhotoSize = { + file_id?: string; +}; + +type TelegramUserProfilePhotosResult = { + photos?: TelegramProfilePhotoSize[][]; +}; + +type TelegramGetFileResult = { + file_path?: string; +}; + +function parseUserId(input: string) { + if (!/^\d+$/.test(input)) return null; + const value = Number(input); + if (!Number.isSafeInteger(value) || value <= 0) return null; + return value; +} + +export default defineEventHandler(async (event) => { + await getAuthContext(event); + + const rawUserId = String(getRouterParam(event, "userId") ?? "").trim(); + const userId = parseUserId(rawUserId); + if (!userId) { + throw createError({ statusCode: 400, statusMessage: "invalid telegram user id" }); + } + + const profile = await telegramBotApi("getUserProfilePhotos", { + user_id: userId, + limit: 1, + }); + + const sizes = profile.photos?.[0] ?? []; + const best = sizes[sizes.length - 1]; + const fileId = String(best?.file_id ?? "").trim(); + if (!fileId) { + throw createError({ statusCode: 404, statusMessage: "avatar not found" }); + } + + const fileMeta = await telegramBotApi("getFile", { file_id: fileId }); + const filePath = String(fileMeta.file_path ?? "").trim(); + if (!filePath) { + throw createError({ statusCode: 404, statusMessage: "avatar path not found" }); + } + + const token = requireTelegramBotToken(); + const upstream = await fetch(`${telegramApiBase()}/file/bot${token}/${filePath}`); + if (!upstream.ok) { + throw createError({ statusCode: 502, statusMessage: "failed to load telegram avatar" }); + } + + const contentType = upstream.headers.get("content-type") || "image/jpeg"; + const buffer = Buffer.from(await upstream.arrayBuffer()); + + setHeader(event, "content-type", contentType); + setHeader(event, "cache-control", "private, max-age=300"); + return buffer; +}); diff --git a/frontend/server/routes/ws/crm-updates.ts b/frontend/server/routes/ws/crm-updates.ts new file mode 100644 index 0000000..f1d6df1 --- /dev/null +++ b/frontend/server/routes/ws/crm-updates.ts @@ -0,0 +1,165 @@ +import { prisma } from "../../utils/prisma"; + +const COOKIE_USER = "cf_user"; +const COOKIE_TEAM = "cf_team"; +const COOKIE_CONV = "cf_conv"; + +const TEAM_POLL_INTERVAL_MS = 2000; + +const peersByTeam = new Map>(); +const peerTeamById = new Map(); +const lastSignatureByTeam = new Map(); + +let pollTimer: ReturnType | null = null; + +function parseCookies(raw: string | null) { + const out = new Map(); + for (const part of String(raw ?? "").split(";")) { + const [key, ...rest] = part.trim().split("="); + if (!key) continue; + const value = rest.join("="); + try { + out.set(key, decodeURIComponent(value)); + } catch { + out.set(key, value); + } + } + return out; +} + +function attachPeerToTeam(peer: any, teamId: string) { + if (!peersByTeam.has(teamId)) peersByTeam.set(teamId, new Set()); + peersByTeam.get(teamId)?.add(peer); + peerTeamById.set(String(peer.id), teamId); +} + +function detachPeer(peer: any) { + const key = String(peer.id); + const teamId = peerTeamById.get(key); + if (!teamId) return; + peerTeamById.delete(key); + + const peers = peersByTeam.get(teamId); + if (!peers) return; + peers.delete(peer); + if (peers.size === 0) { + peersByTeam.delete(teamId); + lastSignatureByTeam.delete(teamId); + } +} + +function stopPollIfIdle() { + if (peersByTeam.size > 0 || !pollTimer) return; + clearInterval(pollTimer); + pollTimer = null; +} + +async function validateSessionFromPeer(peer: any) { + const cookieHeader = peer?.request?.headers?.get?.("cookie") ?? null; + const cookies = parseCookies(cookieHeader); + + const userId = String(cookies.get(COOKIE_USER) ?? "").trim(); + const teamId = String(cookies.get(COOKIE_TEAM) ?? "").trim(); + const conversationId = String(cookies.get(COOKIE_CONV) ?? "").trim(); + if (!userId || !teamId || !conversationId) return null; + + const [user, team, conv] = await Promise.all([ + prisma.user.findUnique({ where: { id: userId }, select: { id: true } }), + prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }), + prisma.chatConversation.findFirst({ + where: { id: conversationId, teamId, createdByUserId: userId }, + select: { id: true }, + }), + ]); + + if (!user || !team || !conv) return null; + return { teamId }; +} + +async function computeTeamSignature(teamId: string) { + const [omniMessageMax, contactMax, contactMessageMax, telegramConnectionMax] = await Promise.all([ + prisma.omniMessage.aggregate({ + where: { teamId }, + _max: { updatedAt: true }, + }), + prisma.contact.aggregate({ + where: { teamId }, + _max: { updatedAt: true }, + }), + prisma.contactMessage.aggregate({ + where: { contact: { teamId } }, + _max: { createdAt: true }, + }), + prisma.telegramBusinessConnection.aggregate({ + where: { teamId }, + _max: { updatedAt: true }, + }), + ]); + + return [ + omniMessageMax._max.updatedAt?.toISOString() ?? "", + contactMax._max.updatedAt?.toISOString() ?? "", + contactMessageMax._max.createdAt?.toISOString() ?? "", + telegramConnectionMax._max.updatedAt?.toISOString() ?? "", + ].join("|"); +} + +function sendJson(peer: any, payload: Record) { + try { + peer.send(JSON.stringify(payload)); + } catch { + // ignore socket write errors + } +} + +async function pollAndBroadcast() { + for (const [teamId, peers] of peersByTeam.entries()) { + if (!peers.size) continue; + const signature = await computeTeamSignature(teamId); + const previous = lastSignatureByTeam.get(teamId); + if (signature === previous) continue; + + lastSignatureByTeam.set(teamId, signature); + const payload = { + type: "dashboard.changed", + teamId, + at: new Date().toISOString(), + }; + + for (const peer of peers) { + sendJson(peer, payload); + } + } +} + +function ensurePoll() { + if (pollTimer) return; + pollTimer = setInterval(() => { + void pollAndBroadcast(); + }, TEAM_POLL_INTERVAL_MS); +} + +export default defineWebSocketHandler({ + async open(peer) { + const session = await validateSessionFromPeer(peer); + if (!session) { + peer.close(4401, "Unauthorized"); + return; + } + + attachPeerToTeam(peer, session.teamId); + ensurePoll(); + sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() }); + void pollAndBroadcast(); + }, + + close(peer) { + detachPeer(peer); + stopPollIfIdle(); + }, + + error(peer) { + detachPeer(peer); + stopPollIfIdle(); + }, +}); diff --git a/omni_chat/src/worker.ts b/omni_chat/src/worker.ts index a12eb39..67e96dc 100644 --- a/omni_chat/src/worker.ts +++ b/omni_chat/src/worker.ts @@ -72,6 +72,12 @@ type ContactProfile = { avatarUrl: string | null; }; +function toTelegramAvatarProxyUrl(externalContactId: string) { + const id = String(externalContactId ?? "").trim(); + if (!/^\d+$/.test(id)) return null; + return `/api/omni/telegram/avatar/${id}`; +} + function buildContactProfile( normalized: OmniInboundEnvelopeV1["payloadNormalized"], externalContactId: string, @@ -99,7 +105,7 @@ function buildContactProfile( return { displayName, - avatarUrl: asString(normalized.contactAvatarUrl), + avatarUrl: asString(normalized.contactAvatarUrl) ?? toTelegramAvatarProxyUrl(externalContactId), }; }