import { readBody } from "h3"; import { createUIMessageStream, createUIMessageStreamResponse, type UIMessage, type UIMessageStreamWriter, validateUIMessages, } from "ai"; import { getAuthContext } from "../utils/auth"; import { prisma } from "../utils/prisma"; import { buildChangeSet, captureSnapshot } from "../utils/changeSet"; import { persistAiMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent"; import type { PilotContextPayload } from "../agent/crmAgent"; import type { ChangeSet } from "../utils/changeSet"; import { startPilotRun, addPilotTrace, finishPilotRun } from "../utils/pilotRunStore"; import { broadcastToConversation } from "../routes/ws/crm-updates"; type PilotDataTypes = { "agent-log": { requestId: string; at: string; text: string; }; }; type PilotUiMessage = UIMessage; type PilotChatRequestBody = { messages?: unknown; contextPayload?: unknown; }; function extractMessageText(message: PilotUiMessage): string { return message.parts .filter((part): part is Extract => part.type === "text") .map((part) => part.text) .join("") .trim(); } function getLastUserText(messages: PilotUiMessage[]): string { for (let i = messages.length - 1; i >= 0; i -= 1) { const message = messages[i]; if (message.role !== "user") continue; const text = extractMessageText(message); if (text) return text; } return ""; } function sanitizeContextPayload(raw: unknown): PilotContextPayload | null { if (!raw || typeof raw !== "object") return null; const item = raw as Record; const scopesRaw = Array.isArray(item.scopes) ? item.scopes : []; const scopes = scopesRaw .map((scope) => String(scope)) .filter((scope) => scope === "summary" || scope === "deal" || scope === "message" || scope === "calendar") as PilotContextPayload["scopes"]; if (!scopes.length) return null; const payload: PilotContextPayload = { scopes }; if (item.summary && typeof item.summary === "object") { const summary = item.summary as Record; const contactId = String(summary.contactId ?? "").trim(); const name = String(summary.name ?? "").trim(); if (contactId && name) payload.summary = { contactId, name }; } if (item.deal && typeof item.deal === "object") { const deal = item.deal as Record; const dealId = String(deal.dealId ?? "").trim(); const title = String(deal.title ?? "").trim(); const contact = String(deal.contact ?? "").trim(); if (dealId && title && contact) payload.deal = { dealId, title, contact }; } if (item.message && typeof item.message === "object") { const message = item.message as Record; const contactId = String(message.contactId ?? "").trim(); const contact = String(message.contact ?? "").trim(); const intent = String(message.intent ?? "").trim(); if (intent === "add_message_or_reminder") { payload.message = { ...(contactId ? { contactId } : {}), ...(contact ? { contact } : {}), intent: "add_message_or_reminder", }; } } if (item.calendar && typeof item.calendar === "object") { const calendar = item.calendar as Record; const view = String(calendar.view ?? "").trim(); const period = String(calendar.period ?? "").trim(); const selectedDateKey = String(calendar.selectedDateKey ?? "").trim(); const focusedEventId = String(calendar.focusedEventId ?? "").trim(); const eventIds = Array.isArray(calendar.eventIds) ? calendar.eventIds.map((id) => String(id ?? "").trim()).filter(Boolean) : []; if ( (view === "day" || view === "week" || view === "month" || view === "year" || view === "agenda") && period && selectedDateKey ) { payload.calendar = { view, period, selectedDateKey, ...(focusedEventId ? { focusedEventId } : {}), eventIds, }; } } return payload; } function humanizeTraceText(trace: AgentTraceEvent): string { if (trace.toolRun?.name) { return `Использую инструмент: ${trace.toolRun.name}`; } const text = (trace.text ?? "").trim(); if (!text) return "Агент работает с данными CRM."; if (text.toLowerCase().includes("ошиб")) return "Возникла ошибка шага, пробую другой путь."; if (text.toLowerCase().includes("итог")) return "Готовлю финальный ответ."; return text; } function renderChangeSetSummary(changeSet: ChangeSet): string { const totals = { created: 0, updated: 0, deleted: 0 }; for (const item of changeSet.items) { if (item.action === "created") totals.created += 1; else if (item.action === "updated") totals.updated += 1; else if (item.action === "deleted") totals.deleted += 1; } const byEntity = new Map(); for (const item of changeSet.items) { byEntity.set(item.entity, (byEntity.get(item.entity) ?? 0) + 1); } const lines = [ "Technical change summary", `Total: ${changeSet.items.length} · Created: ${totals.created} · Updated: ${totals.updated} · Archived: ${totals.deleted}`, ...[...byEntity.entries()].map(([entity, count]) => `- ${entity}: ${count}`), ]; return lines.join("\n"); } function writePilotLog(writer: UIMessageStreamWriter, payload: PilotDataTypes["agent-log"]) { writer.write({ type: "data-agent-log", data: payload, }); } function writeAssistantText(writer: UIMessageStreamWriter, textId: string, text: string) { writer.write({ type: "text-start", id: textId }); writer.write({ type: "text-delta", id: textId, delta: text }); writer.write({ type: "text-end", id: textId }); } function finalizePilotExecution(conversationId: string, status: "finished" | "error") { finishPilotRun(conversationId, status); broadcastToConversation(conversationId, { type: "pilot.finished", at: new Date().toISOString(), }); } export default defineEventHandler(async (event) => { const auth = await getAuthContext(event); const body = await readBody(event); let messages: PilotUiMessage[] = []; try { messages = await validateUIMessages({ messages: body?.messages ?? [] }); } catch { throw createError({ statusCode: 400, statusMessage: "Invalid chat messages payload" }); } const userText = getLastUserText(messages); const contextPayload = sanitizeContextPayload(body?.contextPayload); if (!userText) { throw createError({ statusCode: 400, statusMessage: "Last user message is required" }); } const requestId = `req_${Date.now()}_${Math.floor(Math.random() * 1_000_000)}`; const stream = createUIMessageStream({ execute: async ({ writer }) => { const textId = `text-${Date.now()}`; writer.write({ type: "start" }); startPilotRun(auth.conversationId); try { const snapshotBefore = await captureSnapshot(prisma, auth.teamId); await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: auth.userId, role: "USER", text: userText, requestId, eventType: "user", phase: "final", transient: false, }); const reply = await runCrmAgentFor({ teamId: auth.teamId, userId: auth.userId, userText, contextPayload, requestId, conversationId: auth.conversationId, onTrace: async (trace: AgentTraceEvent) => { const traceText = humanizeTraceText(trace); const traceAt = new Date().toISOString(); writePilotLog(writer, { requestId, at: traceAt, text: traceText, }); addPilotTrace(auth.conversationId, traceText); broadcastToConversation(auth.conversationId, { type: "pilot.trace", text: traceText, at: traceAt, }); }, }); const snapshotAfter = await captureSnapshot(prisma, auth.teamId); const changeSet = buildChangeSet(snapshotBefore, snapshotAfter); await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, role: "ASSISTANT", text: reply.text, requestId, eventType: "assistant", phase: "final", transient: false, }); if (changeSet) { await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, role: "ASSISTANT", text: renderChangeSetSummary(changeSet), requestId, eventType: "note", phase: "final", transient: false, messageKind: "change_set_summary", changeSet, }); } finalizePilotExecution(auth.conversationId, "finished"); writeAssistantText(writer, textId, reply.text); writer.write({ type: "finish", finishReason: "stop" }); } catch (error) { const errorText = error instanceof Error ? error.message : String(error); await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, role: "ASSISTANT", text: errorText, requestId, eventType: "assistant", phase: "error", transient: false, }).catch(() => undefined); finalizePilotExecution(auth.conversationId, "error"); writePilotLog(writer, { requestId, at: new Date().toISOString(), text: "Ошибка выполнения агентского цикла.", }); writeAssistantText(writer, textId, errorText); writer.write({ type: "finish", finishReason: "error" }); } }, }); return createUIMessageStreamResponse({ stream }); });