import { randomUUID } from "node:crypto"; import type { AgentReply, AgentTraceEvent } from "./crmAgent"; import { prisma } from "../utils/prisma"; import { ensureDataset } from "../dataset/exporter"; import { createReactAgent } from "@langchain/langgraph/prebuilt"; import { ChatOpenAI } from "@langchain/openai"; import { tool } from "@langchain/core/tools"; import { z } from "zod"; import { getLangfuseClient } from "../utils/langfuse"; import { Prisma } from "@prisma/client"; function iso(d: Date) { return d.toISOString(); } function cyclePrompt(userText: string, cycle: number, cycleNotes: string[], pendingCount: number) { if (cycle === 1) return userText; return [ "Continue solving the same user request.", `User request: ${userText}`, cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.", `Pending staged changes: ${pendingCount}.`, "Do the next useful step. If done, produce final concise answer.", ].join("\n"); } type GigachatTokenCache = { token: string; expiresAtSec: number; }; let gigachatTokenCache: GigachatTokenCache | null = null; function normalizeAuthHeader(authKey: string, scheme: "Basic" | "Bearer") { const key = authKey.trim(); if (!key) return ""; if (key.startsWith("Basic ") || key.startsWith("Bearer ")) return key; return `${scheme} ${key}`; } async function requestGigachatToken(input: { authKey: string; scope: string; oauthUrl: string; authScheme: "Basic" | "Bearer"; }) { const body = new URLSearchParams(); body.set("scope", input.scope); const res = await fetch(input.oauthUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", Accept: "application/json", RqUID: randomUUID(), Authorization: normalizeAuthHeader(input.authKey, input.authScheme), }, body: body.toString(), }); if (!res.ok) { const text = await res.text().catch(() => ""); throw new Error(`GigaChat oauth failed: ${res.status} ${text.slice(0, 240)}`); } const payload = (await res.json()) as { access_token?: string; expires_at?: number }; if (!payload?.access_token) { throw new Error("GigaChat oauth failed: access_token is missing"); } return { token: payload.access_token, expiresAtSec: typeof payload.expires_at === "number" ? payload.expires_at : Math.floor(Date.now() / 1000) + 25 * 60, }; } async function getGigachatAccessToken(input: { authKey: string; scope: string; oauthUrl: string; }) { const nowSec = Math.floor(Date.now() / 1000); if (gigachatTokenCache && gigachatTokenCache.expiresAtSec - nowSec > 60) { return gigachatTokenCache.token; } try { const token = await requestGigachatToken({ ...input, authScheme: "Basic" }); gigachatTokenCache = token; return token.token; } catch { const token = await requestGigachatToken({ ...input, authScheme: "Bearer" }); gigachatTokenCache = token; return token.token; } } type SnapshotOptions = { teamId: string; contact?: string; contactsLimit?: number; }; type PendingChange = | { id: string; type: "update_contact_note"; createdAt: string; contactId: string; contactName: string; content: string; } | { id: string; type: "create_event"; createdAt: string; contactId: string | null; contactName: string | null; title: string; start: string; end: string | null; note: string | null; status: string | null; } | { id: string; type: "create_message"; createdAt: string; contactId: string; contactName: string; text: string; kind: "message" | "call"; direction: "in" | "out"; channel: "Telegram" | "WhatsApp" | "Instagram" | "Phone" | "Email"; at: string; durationSec: number | null; transcript: string[] | null; } | { id: string; type: "update_deal_stage"; createdAt: string; dealId: string; stage: string; dealTitle: string; }; function makeId(prefix: string) { return `${prefix}_${Date.now()}_${Math.random().toString(16).slice(2, 10)}`; } function toChannel(channel: "Telegram" | "WhatsApp" | "Instagram" | "Phone" | "Email") { if (channel === "Telegram") return "TELEGRAM" as const; if (channel === "WhatsApp") return "WHATSAPP" as const; if (channel === "Instagram") return "INSTAGRAM" as const; if (channel === "Email") return "EMAIL" as const; return "PHONE" as const; } async function resolveContact(teamId: string, contactRef: string) { const contact = contactRef.trim(); if (!contact) return null; const exact = await prisma.contact.findFirst({ where: { teamId, OR: [{ id: contact }, { name: contact }] }, select: { id: true, name: true }, }); if (exact) return exact; return prisma.contact.findFirst({ where: { teamId, name: { contains: contact } }, orderBy: { updatedAt: "desc" }, select: { id: true, name: true }, }); } async function buildCrmSnapshot(input: SnapshotOptions) { const now = new Date(); const in7 = new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000); const contactLimit = Math.max(1, Math.min(input.contactsLimit ?? 25, 80)); const selectedContact = input.contact ? await resolveContact(input.teamId, input.contact) : null; const contactWhere = selectedContact ? { teamId: input.teamId, id: selectedContact.id } : { teamId: input.teamId }; const [contacts, upcoming, deals, docs, totalContacts, totalDeals, totalEvents] = await Promise.all([ prisma.contact.findMany({ where: contactWhere, orderBy: { updatedAt: "desc" }, take: selectedContact ? 1 : contactLimit, include: { note: { select: { content: true, updatedAt: true } }, messages: { select: { id: true, occurredAt: true, channel: true, direction: true, kind: true, content: true }, orderBy: { occurredAt: "desc" }, take: 4, }, events: { select: { id: true, title: true, startsAt: true, endsAt: true, status: true }, orderBy: { startsAt: "asc" }, where: { startsAt: { gte: new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000) } }, take: 4, }, deals: { select: { id: true, title: true, stage: true, amount: true, updatedAt: true, nextStep: true, summary: true, currentStepId: true, steps: { select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true }, orderBy: [{ order: "asc" }, { createdAt: "asc" }], }, }, orderBy: { updatedAt: "desc" }, take: 3, }, pins: { select: { id: true, text: true, updatedAt: true }, orderBy: { updatedAt: "desc" }, take: 3, }, }, }), prisma.calendarEvent.findMany({ where: { teamId: input.teamId, startsAt: { gte: now, lte: in7 } }, orderBy: { startsAt: "asc" }, take: 20, include: { contact: { select: { name: true } } }, }), prisma.deal.findMany({ where: { teamId: input.teamId }, orderBy: { updatedAt: "desc" }, take: 20, include: { contact: { select: { name: true, company: true } }, steps: { select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true }, orderBy: [{ order: "asc" }, { createdAt: "asc" }] }, }, }), prisma.workspaceDocument.findMany({ where: { teamId: input.teamId }, orderBy: { updatedAt: "desc" }, take: 12, select: { id: true, type: true, title: true, summary: true, updatedAt: true, owner: true, scope: true }, }), prisma.contact.count({ where: { teamId: input.teamId } }), prisma.deal.count({ where: { teamId: input.teamId } }), prisma.calendarEvent.count({ where: { teamId: input.teamId, startsAt: { gte: now } } }), ]); const byStage = new Map(); for (const d of deals) byStage.set(d.stage, (byStage.get(d.stage) ?? 0) + 1); return { meta: { generatedAt: iso(now), focusContactId: selectedContact?.id ?? null, contactsIncluded: contacts.length, mode: selectedContact ? "focused" : "team", }, totals: { contacts: totalContacts, deals: totalDeals, upcomingEvents: totalEvents, }, stats: { dealsByStage: [...byStage.entries()] .sort((a, b) => b[1] - a[1]) .map(([stage, count]) => ({ stage, count })), }, upcomingEvents: upcoming.map((e) => ({ id: e.id, title: e.title, startsAt: iso(e.startsAt), endsAt: iso(e.endsAt ?? e.startsAt), status: e.status, note: e.note, contact: e.contact?.name ?? null, })), deals: deals.map((d) => ({ id: d.id, title: d.title, stage: d.stage, amount: d.amount, nextStep: d.nextStep, summary: d.summary, currentStepId: d.currentStepId, steps: d.steps.map((s) => ({ id: s.id, title: s.title, status: s.status, dueAt: s.dueAt ? iso(s.dueAt) : null, order: s.order, completedAt: s.completedAt ? iso(s.completedAt) : null, })), updatedAt: iso(d.updatedAt), contact: { name: d.contact.name, company: d.contact.company, }, })), contacts: contacts.map((c) => ({ id: c.id, name: c.name, company: c.company, country: c.country, location: c.location, email: c.email, phone: c.phone, avatarUrl: c.avatarUrl, updatedAt: iso(c.updatedAt), summary: c.note?.content ?? null, summaryUpdatedAt: c.note?.updatedAt ? iso(c.note.updatedAt) : null, latestMessages: c.messages.map((m) => ({ id: m.id, occurredAt: iso(m.occurredAt), channel: m.channel, direction: m.direction, kind: m.kind, content: m.content, })), latestEvents: c.events.map((e) => ({ id: e.id, title: e.title, startsAt: iso(e.startsAt), endsAt: iso(e.endsAt ?? e.startsAt), status: e.status, })), deals: c.deals.map((d) => ({ id: d.id, title: d.title, stage: d.stage, amount: d.amount, nextStep: d.nextStep, summary: d.summary, currentStepId: d.currentStepId, steps: d.steps.map((s) => ({ id: s.id, title: s.title, status: s.status, dueAt: s.dueAt ? iso(s.dueAt) : null, order: s.order, completedAt: s.completedAt ? iso(s.completedAt) : null, })), updatedAt: iso(d.updatedAt), })), pins: c.pins.map((p) => ({ id: p.id, text: p.text, updatedAt: iso(p.updatedAt), })), })), documents: docs.map((d) => ({ id: d.id, type: d.type, title: d.title, summary: d.summary, owner: d.owner, scope: d.scope, updatedAt: iso(d.updatedAt), })), }; } export async function runLangGraphCrmAgentFor(input: { teamId: string; userId: string; userText: string; requestId?: string; conversationId?: string; onTrace?: (event: AgentTraceEvent) => Promise | void; }): Promise { const openrouterApiKey = (process.env.OPENROUTER_API_KEY ?? "").trim(); const openrouterBaseURL = (process.env.OPENROUTER_BASE_URL ?? "https://openrouter.ai/api/v1").trim(); const openrouterModel = (process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini").trim(); const openrouterReferer = (process.env.OPENROUTER_HTTP_REFERER ?? "").trim(); const openrouterTitle = (process.env.OPENROUTER_X_TITLE ?? "").trim(); const openrouterReasoningEnabled = (process.env.OPENROUTER_REASONING_ENABLED ?? "").trim() === "1"; const genericApiKey = process.env.LLM_API_KEY || process.env.OPENAI_API_KEY || process.env.DASHSCOPE_API_KEY || process.env.QWEN_API_KEY; const genericBaseURL = process.env.LLM_BASE_URL || process.env.OPENAI_BASE_URL || process.env.DASHSCOPE_BASE_URL || process.env.QWEN_BASE_URL; const genericModel = process.env.LLM_MODEL || process.env.OPENAI_MODEL || process.env.DASHSCOPE_MODEL || process.env.QWEN_MODEL || "gpt-4o-mini"; const gigachatAuthKey = (process.env.GIGACHAT_AUTH_KEY ?? "").trim(); const gigachatModel = (process.env.GIGACHAT_MODEL ?? "").trim(); const gigachatScope = (process.env.GIGACHAT_SCOPE ?? "").trim(); const gigachatOauthUrl = (process.env.GIGACHAT_OAUTH_URL ?? "https://ngw.devices.sberbank.ru:9443/api/v2/oauth").trim(); const gigachatBaseUrl = (process.env.GIGACHAT_BASE_URL ?? "https://gigachat.devices.sberbank.ru/api/v1").trim(); const useGigachat = Boolean(gigachatAuthKey && gigachatScope); let llmApiKey = genericApiKey; let llmBaseURL = genericBaseURL; let llmModel = genericModel; let llmHeaders: Record | undefined; let llmReasoningEnabled = false; if (openrouterApiKey) { llmApiKey = openrouterApiKey; llmBaseURL = openrouterBaseURL; llmModel = openrouterModel; llmReasoningEnabled = openrouterReasoningEnabled; llmHeaders = { ...(openrouterReferer ? { "HTTP-Referer": openrouterReferer } : {}), ...(openrouterTitle ? { "X-Title": openrouterTitle } : {}), }; } if (useGigachat) { try { llmApiKey = await getGigachatAccessToken({ authKey: gigachatAuthKey, scope: gigachatScope, oauthUrl: gigachatOauthUrl, }); llmBaseURL = gigachatBaseUrl; llmModel = gigachatModel || "GigaChat-2-Max"; } catch (e: any) { return { text: `Не удалось получить токен GigaChat: ${String(e?.message || e)}`, plan: ["Проверить GIGACHAT_AUTH_KEY", "Проверить GIGACHAT_SCOPE", "Проверить сетевой доступ до OAuth endpoint и перезапустить dev-сервер"], tools: [], thinking: ["Провайдер GigaChat настроен, но OAuth не прошел."], toolRuns: [], }; } } if (!llmApiKey) { throw new Error( "LLM API key is not configured. Set OPENROUTER_API_KEY (or GIGACHAT_AUTH_KEY/GIGACHAT_SCOPE) and restart.", ); } // Keep the dataset fresh so the "CRM filesystem" stays in sync with DB. await ensureDataset({ teamId: input.teamId, userId: input.userId }); const toolsUsed: string[] = []; const dbWrites: Array<{ kind: string; detail: string }> = []; const toolRuns: NonNullable = []; const pendingChanges: PendingChange[] = []; async function emitTrace(event: AgentTraceEvent) { lfTrace?.event({ name: "agent.trace", input: { text: event.text, toolRun: event.toolRun ?? null, }, metadata: { requestId: input.requestId ?? null, }, }); if (!input.onTrace) return; try { await input.onTrace(event); } catch { // Trace transport errors must not break the agent response. } } function compact(value: unknown, max = 240) { const text = typeof value === "string" ? value : JSON.stringify(value); if (!text) return ""; return text.length > max ? `${text.slice(0, max)}...` : text; } const CrmToolSchema = z.object({ action: z.enum([ "get_snapshot", "query_contacts", "query_deals", "query_events", "pending_changes", "discard_changes", "commit_changes", "update_contact_note", "create_event", "create_message", "update_deal_stage", ]), // queries query: z.string().optional(), stage: z.string().optional(), from: z.string().optional(), to: z.string().optional(), limit: z.number().int().optional(), mode: z.enum(["stage", "apply"]).optional(), // writes contact: z.string().optional(), content: z.string().optional(), title: z.string().optional(), start: z.string().optional(), end: z.string().optional(), note: z.string().optional(), status: z.string().optional(), channel: z.enum(["Telegram", "WhatsApp", "Instagram", "Phone", "Email"]).optional(), kind: z.enum(["message", "call"]).optional(), direction: z.enum(["in", "out"]).optional(), text: z.string().optional(), at: z.string().optional(), durationSec: z.number().int().optional(), transcript: z.array(z.string()).optional(), dealId: z.string().optional(), }); const applyPendingChanges = async () => { const queue = pendingChanges.splice(0, pendingChanges.length); if (!queue.length) { return { ok: true, applied: 0, changes: [] as Array<{ id: string; type: string; detail: string }> }; } const applied: Array<{ id: string; type: string; detail: string }> = []; await prisma.$transaction(async (tx) => { for (const change of queue) { if (change.type === "update_contact_note") { await tx.contactNote.upsert({ where: { contactId: change.contactId }, update: { content: change.content }, create: { contactId: change.contactId, content: change.content }, }); applied.push({ id: change.id, type: change.type, detail: `${change.contactName}: summary updated` }); continue; } if (change.type === "create_event") { const created = await tx.calendarEvent.create({ data: { teamId: input.teamId, contactId: change.contactId, title: change.title, startsAt: new Date(change.start), endsAt: change.end ? new Date(change.end) : null, note: change.note, status: change.status, }, }); applied.push({ id: change.id, type: change.type, detail: `created event ${created.id}` }); continue; } if (change.type === "create_message") { const created = await tx.contactMessage.create({ data: { contactId: change.contactId, kind: change.kind === "call" ? "CALL" : "MESSAGE", direction: change.direction === "in" ? "IN" : "OUT", channel: toChannel(change.channel), content: change.text, durationSec: change.durationSec, transcriptJson: Array.isArray(change.transcript) ? change.transcript : Prisma.JsonNull, occurredAt: new Date(change.at), }, }); applied.push({ id: change.id, type: change.type, detail: `created message ${created.id}` }); continue; } if (change.type === "update_deal_stage") { const updated = await tx.deal.updateMany({ where: { id: change.dealId, teamId: input.teamId }, data: { stage: change.stage }, }); if (updated.count === 0) throw new Error(`deal not found: ${change.dealId}`); applied.push({ id: change.id, type: change.type, detail: `${change.dealTitle}: stage -> ${change.stage}` }); continue; } } }); for (const item of applied) { dbWrites.push({ kind: item.type, detail: item.detail }); } return { ok: true, applied: applied.length, changes: applied }; }; const crmTool = tool( async (rawInput: unknown) => { const raw = CrmToolSchema.parse(rawInput); const toolName = `crm:${raw.action}`; const startedAt = new Date().toISOString(); toolsUsed.push(toolName); await emitTrace({ text: `Использую инструмент: ${toolName}` }); const executeAction = async () => { if (raw.action === "get_snapshot") { const snapshot = await buildCrmSnapshot({ teamId: input.teamId, contact: raw.contact, contactsLimit: raw.limit, }); return JSON.stringify(snapshot, null, 2); } if (raw.action === "query_contacts") { const q = (raw.query ?? "").trim(); const items = await prisma.contact.findMany({ where: { teamId: input.teamId, ...(q ? { OR: [ { name: { contains: q } }, { company: { contains: q } }, { email: { contains: q } }, { phone: { contains: q } }, ], } : {}), }, orderBy: { updatedAt: "desc" }, take: Math.max(1, Math.min(raw.limit ?? 20, 100)), include: { note: { select: { content: true, updatedAt: true } } }, }); return JSON.stringify( items.map((c) => ({ id: c.id, name: c.name, company: c.company, country: c.country, location: c.location, email: c.email, phone: c.phone, note: c.note?.content ?? null, })), null, 2, ); } if (raw.action === "query_deals") { const items = await prisma.deal.findMany({ where: { teamId: input.teamId, ...(raw.stage ? { stage: raw.stage } : {}) }, orderBy: { updatedAt: "desc" }, take: Math.max(1, Math.min(raw.limit ?? 20, 100)), include: { contact: { select: { name: true, company: true } }, steps: { select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true }, orderBy: [{ order: "asc" }, { createdAt: "asc" }] }, }, }); return JSON.stringify( items.map((d) => ({ id: d.id, title: d.title, stage: d.stage, amount: d.amount, nextStep: d.nextStep, summary: d.summary, currentStepId: d.currentStepId, steps: d.steps.map((s) => ({ id: s.id, title: s.title, status: s.status, dueAt: s.dueAt ? s.dueAt.toISOString() : null, order: s.order, completedAt: s.completedAt ? s.completedAt.toISOString() : null, })), contact: d.contact.name, company: d.contact.company, })), null, 2, ); } if (raw.action === "query_events") { const from = raw.from ? new Date(raw.from) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const to = raw.to ? new Date(raw.to) : new Date(Date.now() + 60 * 24 * 60 * 60 * 1000); const items = await prisma.calendarEvent.findMany({ where: { teamId: input.teamId, startsAt: { gte: from, lte: to } }, orderBy: { startsAt: "asc" }, take: Math.max(1, Math.min(raw.limit ?? 100, 500)), include: { contact: { select: { name: true } } }, }); return JSON.stringify( items.map((e) => ({ id: e.id, title: e.title, startsAt: e.startsAt.toISOString(), endsAt: (e.endsAt ?? e.startsAt).toISOString(), note: e.note, contact: e.contact?.name ?? null, })), null, 2, ); } if (raw.action === "pending_changes") { return JSON.stringify( { count: pendingChanges.length, items: pendingChanges.map((item) => ({ id: item.id, type: item.type, createdAt: item.createdAt, })), }, null, 2, ); } if (raw.action === "discard_changes") { const discarded = pendingChanges.length; pendingChanges.splice(0, pendingChanges.length); return JSON.stringify({ ok: true, discarded }, null, 2); } if (raw.action === "commit_changes") { const committed = await applyPendingChanges(); return JSON.stringify(committed, null, 2); } if (raw.action === "update_contact_note") { const contactName = (raw.contact ?? "").trim(); const content = (raw.content ?? "").trim(); if (!contactName) throw new Error("contact is required"); if (!content) throw new Error("content is required"); const contact = await resolveContact(input.teamId, contactName); if (!contact) throw new Error("contact not found"); pendingChanges.push({ id: makeId("chg"), type: "update_contact_note", createdAt: iso(new Date()), contactId: contact.id, contactName: contact.name, content, }); if (raw.mode === "apply") { const committed = await applyPendingChanges(); return JSON.stringify(committed, null, 2); } return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); } if (raw.action === "create_event") { const title = (raw.title ?? "").trim(); const start = raw.start ? new Date(raw.start) : null; if (!title) throw new Error("title is required"); if (!start || Number.isNaN(start.getTime())) throw new Error("start is invalid"); const end = raw.end ? new Date(raw.end) : null; const contactName = (raw.contact ?? "").trim(); const contact = contactName ? await resolveContact(input.teamId, contactName) : null; pendingChanges.push({ id: makeId("chg"), type: "create_event", createdAt: iso(new Date()), contactId: contact?.id ?? null, contactName: contact?.name ?? null, title, start: iso(start), end: end && !Number.isNaN(end.getTime()) ? iso(end) : null, note: (raw.note ?? "").trim() || null, status: (raw.status ?? "").trim() || null, }); if (raw.mode === "apply") { const committed = await applyPendingChanges(); return JSON.stringify(committed, null, 2); } return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); } if (raw.action === "create_message") { const contactName = (raw.contact ?? "").trim(); const text = (raw.text ?? "").trim(); if (!contactName) throw new Error("contact is required"); if (!text) throw new Error("text is required"); const contact = await resolveContact(input.teamId, contactName); if (!contact) throw new Error("contact not found"); const occurredAt = raw.at ? new Date(raw.at) : new Date(); if (Number.isNaN(occurredAt.getTime())) throw new Error("at is invalid"); pendingChanges.push({ id: makeId("chg"), type: "create_message", createdAt: iso(new Date()), contactId: contact.id, contactName: contact.name, kind: raw.kind === "call" ? "call" : "message", direction: raw.direction === "in" ? "in" : "out", channel: raw.channel ?? "Phone", text, at: iso(occurredAt), durationSec: typeof raw.durationSec === "number" ? raw.durationSec : null, transcript: Array.isArray(raw.transcript) ? raw.transcript : null, }); if (raw.mode === "apply") { const committed = await applyPendingChanges(); return JSON.stringify(committed, null, 2); } return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); } if (raw.action === "update_deal_stage") { const dealId = (raw.dealId ?? "").trim(); const stage = (raw.stage ?? "").trim(); if (!dealId) throw new Error("dealId is required"); if (!stage) throw new Error("stage is required"); const deal = await prisma.deal.findFirst({ where: { id: dealId, teamId: input.teamId }, select: { id: true, title: true }, }); if (!deal) throw new Error("deal not found"); pendingChanges.push({ id: makeId("chg"), type: "update_deal_stage", createdAt: iso(new Date()), dealId: deal.id, dealTitle: deal.title, stage, }); if (raw.mode === "apply") { const committed = await applyPendingChanges(); return JSON.stringify(committed, null, 2); } return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); } return JSON.stringify({ ok: false, error: "unknown action" }); }; try { const result = await executeAction(); const run = { name: toolName, status: "ok", input: compact(raw), output: compact(result), at: startedAt, } as const; toolRuns.push(run); await emitTrace({ text: `Tool finished: ${toolName}`, toolRun: run, }); return result; } catch (error: any) { const run = { name: toolName, status: "error", input: compact(raw), output: compact(error?.message || String(error)), at: startedAt, } as const; toolRuns.push(run); await emitTrace({ text: `Tool failed: ${toolName}`, toolRun: run, }); throw error; } }, { name: "crm", description: "Query and update CRM data (contacts, deals, events, communications). Use this tool for any data you need beyond the snapshot.", schema: CrmToolSchema, }, ); const snapshot = await buildCrmSnapshot({ teamId: input.teamId }); const snapshotJson = JSON.stringify(snapshot, null, 2); const model = new ChatOpenAI({ apiKey: llmApiKey, model: llmModel, temperature: 0.2, ...(llmReasoningEnabled ? { modelKwargs: { reasoning: { enabled: true }, }, } : {}), ...(llmBaseURL || llmHeaders ? { configuration: { ...(llmBaseURL ? { baseURL: llmBaseURL } : {}), ...(llmHeaders ? { defaultHeaders: llmHeaders } : {}), }, } : {}), }); const agent = createReactAgent({ llm: model, tools: [crmTool], }); const maxCycles = Math.max(1, Math.min(Number(process.env.CF_AGENT_MAX_CYCLES ?? "3"), 8)); const cycleTimeoutMs = Math.max(5000, Math.min(Number(process.env.CF_AGENT_CYCLE_TIMEOUT_MS ?? "1200000"), 1800000)); const tracingFlag = (process.env.LANGSMITH_TRACING ?? process.env.LANGCHAIN_TRACING_V2 ?? "").trim().toLowerCase(); const tracingEnabled = tracingFlag === "1" || tracingFlag === "true" || tracingFlag === "yes"; const langfuse = getLangfuseClient(); const lfTrace = langfuse?.trace({ id: input.requestId ?? makeId("trace"), name: "clientsflow.crm_agent_request", userId: input.userId, sessionId: input.conversationId ?? undefined, input: input.userText, metadata: { teamId: input.teamId, userId: input.userId, requestId: input.requestId ?? null, conversationId: input.conversationId ?? null, }, tags: ["clientsflow", "crm-agent", "langgraph"], }); let consecutiveNoProgress = 0; let finalText = ""; const cycleNotes: string[] = []; const system = [ "You are Pilot, a CRM assistant.", "Rules:", "- Be concrete and concise.", "- Work in short iterative cycles. Do not stop after the first thought if the task needs more than one action.", "- You are given a structured CRM JSON snapshot as baseline context.", "- If you need fresher or narrower data, call crm.get_snapshot/query_* tools.", "- For changes, stage first with mode=stage. Commit only when user asks to execute.", "- You can apply immediately with mode=apply only if user explicitly asked to do it now.", "- Use pending_changes and commit_changes to control staged updates.", "- Do not claim you sent an external message; you can only create CRM records.", "", "CRM Snapshot JSON:", snapshotJson, ].join("\n"); const extractText = (value: unknown, depth = 0): string => { if (depth > 5 || value == null) return ""; if (typeof value === "string") return value.trim(); if (Array.isArray(value)) { const parts = value .map((item) => extractText(item, depth + 1)) .filter(Boolean); return parts.join("\n").trim(); } if (typeof value !== "object") return ""; const obj = value as Record; for (const key of ["text", "content", "answer", "output_text", "final_text"]) { const text = extractText(obj[key], depth + 1); if (text) return text; } if (Array.isArray(obj.parts)) { const text = extractText(obj.parts, depth + 1); if (text) return text; } return ""; }; const messageType = (msg: any): string => { if (typeof msg?._getType === "function") { try { return String(msg._getType() ?? ""); } catch { // ignore } } return String(msg?.type ?? msg?.role ?? msg?.constructor?.name ?? ""); }; const extractResult = (res: any) => { const extractedText = (() => { const messages = Array.isArray(res?.messages) ? res.messages : []; for (let i = messages.length - 1; i >= 0; i -= 1) { const msg = messages[i]; const type = messageType(msg).toLowerCase(); if (!type.includes("ai") && !type.includes("assistant")) continue; const text = extractText(msg?.content) || extractText(msg); if (text) return text; } return extractText(res?.output) || extractText(res?.response) || extractText(res?.finalResponse) || ""; })(); return { text: extractedText.trim(), }; }; for (let cycle = 1; cycle <= maxCycles; cycle += 1) { const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes, pendingChanges.length); const cycleSpan = lfTrace?.span({ name: "agent.cycle", input: userPrompt, metadata: { cycle, requestId: input.requestId ?? null, }, }); await emitTrace({ text: "Анализирую задачу и текущий контекст CRM." }); const beforeRuns = toolRuns.length; const beforeWrites = dbWrites.length; const beforePending = pendingChanges.length; let res: any; try { const invokeConfig: Record = { recursionLimit: 30 }; if (tracingEnabled) { invokeConfig.runName = "clientsflow.crm_agent_cycle"; invokeConfig.tags = ["clientsflow", "crm-agent", "langgraph"]; invokeConfig.metadata = { teamId: input.teamId, userId: input.userId, requestId: input.requestId ?? null, conversationId: input.conversationId ?? null, cycle, }; } res = await Promise.race([ agent.invoke( { messages: [ { role: "system", content: system }, { role: "user", content: userPrompt }, ], }, invokeConfig, ), new Promise((_resolve, reject) => setTimeout(() => reject(new Error(`Cycle timeout after ${cycleTimeoutMs}ms`)), cycleTimeoutMs), ), ]); } catch (e: any) { await emitTrace({ text: "Один из шагов завершился ошибкой." }); cycleSpan?.end({ output: "error", level: "ERROR", statusMessage: String(e?.message ?? e ?? "unknown_error"), }); throw e; } const parsed = extractResult(res); if (parsed.text) { finalText = parsed.text; } const progressed = toolRuns.length > beforeRuns || dbWrites.length > beforeWrites || pendingChanges.length !== beforePending; cycleSpan?.end({ output: parsed.text || "", metadata: { progressed, toolRunsDelta: toolRuns.length - beforeRuns, dbWritesDelta: dbWrites.length - beforeWrites, pendingDelta: pendingChanges.length - beforePending, }, }); if (progressed) { cycleNotes.push(`Cycle ${cycle}: updated tools/data state.`); } await emitTrace({ text: progressed ? "Продвигаюсь по задаче и обновляю рабочий набор изменений." : "Промежуточный шаг не дал прогресса, проверяю следующий вариант.", }); if (!progressed) { consecutiveNoProgress += 1; } else { consecutiveNoProgress = 0; } const done = (!progressed && cycle > 1) || cycle === maxCycles; if (done) { await emitTrace({ text: "Формирую итоговый ответ." }); break; } if (consecutiveNoProgress >= 2) { await emitTrace({ text: "Останавливаюсь, чтобы не крутиться в пустом цикле." }); break; } } lfTrace?.update({ output: finalText || null, metadata: { toolsUsedCount: toolsUsed.length, toolRunsCount: toolRuns.length, dbWritesCount: dbWrites.length, pendingChangesCount: pendingChanges.length, maxCycles, }, }); void langfuse?.flushAsync().catch(() => {}); if (!finalText) { throw new Error("Model returned empty response"); } const plan: string[] = []; return { text: finalText, plan, thinking: [], tools: toolsUsed, toolRuns, dbWrites: dbWrites.length ? dbWrites : undefined, }; }