From bf7f4ae93315870e691deb38fd3491f369a99c8e Mon Sep 17 00:00:00 2001 From: Ruslan Bakiev <572431+veikab@users.noreply.github.com> Date: Wed, 25 Feb 2026 08:45:32 +0700 Subject: [PATCH] feat: broadcast pilot agent traces via WebSocket for live status on reconnect Agent trace logs are now stored in-memory (pilotRunStore) and broadcast through the existing /ws/crm-updates WebSocket channel. When a client reconnects, it receives a pilot.catchup with all accumulated logs so the user sees agent progress even after page reload. Three new WS event types: pilot.trace, pilot.finished, pilot.catchup. Co-Authored-By: Claude Opus 4.6 --- .../components/workspace/CrmWorkspaceApp.vue | 4 ++ frontend/app/composables/useCrmRealtime.ts | 15 ++++ frontend/app/composables/usePilotChat.ts | 18 +++++ frontend/server/api/pilot-chat.post.ts | 23 ++++-- frontend/server/routes/ws/crm-updates.ts | 71 ++++++++++++++++--- frontend/server/utils/pilotRunStore.ts | 59 +++++++++++++++ 6 files changed, 174 insertions(+), 16 deletions(-) create mode 100644 frontend/server/utils/pilotRunStore.ts diff --git a/frontend/app/components/workspace/CrmWorkspaceApp.vue b/frontend/app/components/workspace/CrmWorkspaceApp.vue index 258aeb4..2a8d88f 100644 --- a/frontend/app/components/workspace/CrmWorkspaceApp.vue +++ b/frontend/app/components/workspace/CrmWorkspaceApp.vue @@ -373,6 +373,8 @@ const { pushPilotNote, refetchChatMessages, refetchChatConversations, + handleRealtimePilotTrace, + handleRealtimePilotFinished, destroyPilotWaveSurfer, togglePilotLiveLogsExpanded, } = pilotChat; @@ -678,6 +680,8 @@ const { crmRealtimeState, startCrmRealtime, stopCrmRealtime } = useCrmRealtime({ // Refresh contacts to update sidebar preview (lastMessageText, lastAt, hasUnread) void refetchContacts(); }, + onPilotTrace: (log) => handleRealtimePilotTrace(log), + onPilotFinished: () => void handleRealtimePilotFinished(), }); // --------------------------------------------------------------------------- diff --git a/frontend/app/composables/useCrmRealtime.ts b/frontend/app/composables/useCrmRealtime.ts index 835f272..593b05a 100644 --- a/frontend/app/composables/useCrmRealtime.ts +++ b/frontend/app/composables/useCrmRealtime.ts @@ -9,10 +9,14 @@ export type RealtimeNewMessage = { at: string; }; +export type RealtimePilotTrace = { text: string; at: string }; + export function useCrmRealtime(opts: { isAuthenticated: () => boolean; onDashboardChanged: () => Promise; onNewMessage?: (msg: RealtimeNewMessage) => void; + onPilotTrace?: (log: RealtimePilotTrace) => void; + onPilotFinished?: () => void; }) { const crmRealtimeState = ref<"idle" | "connecting" | "open" | "error">("idle"); let crmRealtimeSocket: WebSocket | null = null; @@ -116,6 +120,17 @@ export function useCrmRealtime(opts: { if (payload.type === "message.new" && opts.onNewMessage) { opts.onNewMessage(payload as unknown as RealtimeNewMessage); } + if (payload.type === "pilot.trace" && opts.onPilotTrace) { + opts.onPilotTrace({ text: String(payload.text ?? ""), at: String(payload.at ?? "") }); + } + if (payload.type === "pilot.catchup" && opts.onPilotTrace && Array.isArray(payload.logs)) { + for (const log of payload.logs) { + opts.onPilotTrace({ text: String((log as any).text ?? ""), at: String((log as any).at ?? "") }); + } + } + if (payload.type === "pilot.finished" && opts.onPilotFinished) { + opts.onPilotFinished(); + } } catch { // ignore malformed realtime payloads } diff --git a/frontend/app/composables/usePilotChat.ts b/frontend/app/composables/usePilotChat.ts index 5cd999f..a9f85e5 100644 --- a/frontend/app/composables/usePilotChat.ts +++ b/frontend/app/composables/usePilotChat.ts @@ -689,6 +689,24 @@ export function usePilotChat(opts: { pushPilotNote, refetchChatMessages, refetchChatConversations, + // realtime pilot trace handlers (called from useCrmRealtime) + handleRealtimePilotTrace(log: { text: string; at: string }) { + const text = String(log.text ?? "").trim(); + if (!text) return; + // Mark as sending so the UI shows live-log panel + if (!pilotSending.value) pilotSending.value = true; + pilotLiveLogs.value = [ + ...pilotLiveLogs.value, + { id: `ws-${Date.now()}-${Math.random()}`, text, at: log.at || new Date().toISOString() }, + ]; + }, + async handleRealtimePilotFinished() { + pilotSending.value = false; + livePilotUserText.value = ""; + livePilotAssistantText.value = ""; + pilotLiveLogs.value = []; + await Promise.all([refetchChatMessages(), refetchChatConversations(), opts.refetchAllCrmQueries()]); + }, // cleanup destroyPilotWaveSurfer, }; diff --git a/frontend/server/api/pilot-chat.post.ts b/frontend/server/api/pilot-chat.post.ts index 165b65d..d7a5ad5 100644 --- a/frontend/server/api/pilot-chat.post.ts +++ b/frontend/server/api/pilot-chat.post.ts @@ -6,6 +6,8 @@ import { buildChangeSet, captureSnapshot } from "../utils/changeSet"; import { persistAiMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent"; import type { PilotContextPayload } from "../agent/crmAgent"; import type { ChangeSet } from "../utils/changeSet"; +import { startPilotRun, addPilotTrace, finishPilotRun } from "../utils/pilotRunStore"; +import { broadcastToConversation } from "../routes/ws/crm-updates"; function extractMessageText(message: any): string { if (!message || !Array.isArray(message.parts)) return ""; @@ -140,6 +142,7 @@ export default defineEventHandler(async (event) => { execute: async ({ writer }) => { const textId = `text-${Date.now()}`; writer.write({ type: "start" }); + startPilotRun(auth.conversationId); try { const snapshotBefore = await captureSnapshot(prisma, auth.teamId); @@ -163,13 +166,17 @@ export default defineEventHandler(async (event) => { requestId, conversationId: auth.conversationId, onTrace: async (trace: AgentTraceEvent) => { + const traceText = humanizeTraceText(trace); + const traceAt = new Date().toISOString(); writer.write({ type: "data-agent-log", - data: { - requestId, - at: new Date().toISOString(), - text: humanizeTraceText(trace), - }, + data: { requestId, at: traceAt, text: traceText }, + }); + addPilotTrace(auth.conversationId, traceText); + broadcastToConversation(auth.conversationId, { + type: "pilot.trace", + text: traceText, + at: traceAt, }); }, }); @@ -205,6 +212,9 @@ export default defineEventHandler(async (event) => { }); } + finishPilotRun(auth.conversationId, "finished"); + broadcastToConversation(auth.conversationId, { type: "pilot.finished", at: new Date().toISOString() }); + writer.write({ type: "text-start", id: textId }); writer.write({ type: "text-delta", id: textId, delta: reply.text }); writer.write({ type: "text-end", id: textId }); @@ -224,6 +234,9 @@ export default defineEventHandler(async (event) => { transient: false, }); + finishPilotRun(auth.conversationId, "error"); + broadcastToConversation(auth.conversationId, { type: "pilot.finished", at: new Date().toISOString() }); + writer.write({ type: "data-agent-log", data: { diff --git a/frontend/server/routes/ws/crm-updates.ts b/frontend/server/routes/ws/crm-updates.ts index 6ca4922..a76eaf6 100644 --- a/frontend/server/routes/ws/crm-updates.ts +++ b/frontend/server/routes/ws/crm-updates.ts @@ -1,4 +1,5 @@ import { prisma } from "../../utils/prisma"; +import { getActivePilotRun } from "../../utils/pilotRunStore"; function mapChannel(channel: string) { if (channel === "TELEGRAM") return "Telegram"; @@ -19,6 +20,10 @@ const peerTeamById = new Map(); const lastSignatureByTeam = new Map(); const lastMsgCreatedAtByTeam = new Map(); +// Conversation-level tracking for pilot events +const peersByConversation = new Map>(); +const peerConversationById = new Map(); + let pollTimer: ReturnType | null = null; function parseCookies(raw: string | null) { @@ -42,19 +47,41 @@ function attachPeerToTeam(peer: any, teamId: string) { peerTeamById.set(String(peer.id), teamId); } +function attachPeerToConversation(peer: any, conversationId: string) { + if (!peersByConversation.has(conversationId)) peersByConversation.set(conversationId, new Set()); + peersByConversation.get(conversationId)?.add(peer); + peerConversationById.set(String(peer.id), conversationId); +} + function detachPeer(peer: any) { const key = String(peer.id); - const teamId = peerTeamById.get(key); - if (!teamId) return; - peerTeamById.delete(key); - const peers = peersByTeam.get(teamId); - if (!peers) return; - peers.delete(peer); - if (peers.size === 0) { - peersByTeam.delete(teamId); - lastSignatureByTeam.delete(teamId); - lastMsgCreatedAtByTeam.delete(teamId); + // Detach from team + const teamId = peerTeamById.get(key); + if (teamId) { + peerTeamById.delete(key); + const peers = peersByTeam.get(teamId); + if (peers) { + peers.delete(peer); + if (peers.size === 0) { + peersByTeam.delete(teamId); + lastSignatureByTeam.delete(teamId); + lastMsgCreatedAtByTeam.delete(teamId); + } + } + } + + // Detach from conversation + const convId = peerConversationById.get(key); + if (convId) { + peerConversationById.delete(key); + const convPeers = peersByConversation.get(convId); + if (convPeers) { + convPeers.delete(peer); + if (convPeers.size === 0) { + peersByConversation.delete(convId); + } + } } } @@ -83,7 +110,7 @@ async function validateSessionFromPeer(peer: any) { ]); if (!user || !team || !conv) return null; - return { teamId }; + return { teamId, conversationId }; } async function computeTeamSignature(teamId: string) { @@ -209,6 +236,17 @@ function ensurePoll() { }, TEAM_POLL_INTERVAL_MS); } +// --------------------------------------------------------------------------- +// Public: broadcast to all peers connected for a given conversation +// --------------------------------------------------------------------------- +export function broadcastToConversation(conversationId: string, payload: Record) { + const peers = peersByConversation.get(conversationId); + if (!peers) return; + for (const peer of peers) { + sendJson(peer, payload); + } +} + export default defineWebSocketHandler({ async open(peer) { const session = await validateSessionFromPeer(peer); @@ -218,9 +256,20 @@ export default defineWebSocketHandler({ } attachPeerToTeam(peer, session.teamId); + attachPeerToConversation(peer, session.conversationId); ensurePoll(); sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() }); void pollAndBroadcast(); + + // Send catch-up for active pilot run + const activeRun = getActivePilotRun(session.conversationId); + if (activeRun) { + sendJson(peer, { + type: "pilot.catchup", + logs: activeRun.logs, + at: new Date().toISOString(), + }); + } }, close(peer) { diff --git a/frontend/server/utils/pilotRunStore.ts b/frontend/server/utils/pilotRunStore.ts new file mode 100644 index 0000000..7974681 --- /dev/null +++ b/frontend/server/utils/pilotRunStore.ts @@ -0,0 +1,59 @@ +export type PilotRunLog = { id: string; text: string; at: string }; + +export type PilotRun = { + status: "running" | "finished" | "error"; + logs: PilotRunLog[]; + startedAt: string; + finishedAt?: string; +}; + +const activeRuns = new Map(); + +const MAX_AGE_MS = 10 * 60 * 1000; // 10 minutes + +function cleanup() { + const now = Date.now(); + for (const [key, run] of activeRuns) { + if (now - new Date(run.startedAt).getTime() > MAX_AGE_MS) { + activeRuns.delete(key); + } + } +} + +export function startPilotRun(conversationId: string) { + cleanup(); + activeRuns.set(conversationId, { + status: "running", + logs: [], + startedAt: new Date().toISOString(), + }); +} + +export function addPilotTrace(conversationId: string, text: string) { + const run = activeRuns.get(conversationId); + if (!run || run.status !== "running") return; + run.logs.push({ + id: `${Date.now()}-${Math.floor(Math.random() * 1_000_000)}`, + text, + at: new Date().toISOString(), + }); +} + +export function finishPilotRun(conversationId: string, status: "finished" | "error" = "finished") { + const run = activeRuns.get(conversationId); + if (!run) return; + run.status = status; + run.finishedAt = new Date().toISOString(); + // Keep for a short time so late-connecting clients can see it finished + setTimeout(() => { + if (activeRuns.get(conversationId) === run) { + activeRuns.delete(conversationId); + } + }, 5000); +} + +export function getActivePilotRun(conversationId: string): PilotRun | null { + const run = activeRuns.get(conversationId); + if (!run || run.status !== "running") return null; + return run; +}