diff --git a/Frontend/server/agent/langgraphCrmAgent.ts b/Frontend/server/agent/langgraphCrmAgent.ts index c7f535b..f2f8c8f 100644 --- a/Frontend/server/agent/langgraphCrmAgent.ts +++ b/Frontend/server/agent/langgraphCrmAgent.ts @@ -7,19 +7,17 @@ import { ChatOpenAI } from "@langchain/openai"; import { tool } from "@langchain/core/tools"; import { z } from "zod"; import { getLangfuseClient } from "../utils/langfuse"; -import { Prisma } from "@prisma/client"; function iso(d: Date) { return d.toISOString(); } -function cyclePrompt(userText: string, cycle: number, cycleNotes: string[], pendingCount: number) { +function cyclePrompt(userText: string, cycle: number, cycleNotes: string[]) { if (cycle === 1) return userText; return [ "Continue solving the same user request.", `User request: ${userText}`, cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.", - `Pending staged changes: ${pendingCount}.`, "Do the next useful step. If done, produce final concise answer.", ].join("\n"); } @@ -101,62 +99,10 @@ type SnapshotOptions = { contactsLimit?: number; }; -type PendingChange = - | { - id: string; - type: "update_contact_note"; - createdAt: string; - contactId: string; - contactName: string; - content: string; - } - | { - id: string; - type: "create_event"; - createdAt: string; - contactId: string | null; - contactName: string | null; - title: string; - start: string; - end: string | null; - note: string | null; - isArchived: boolean; - } - | { - id: string; - type: "create_message"; - createdAt: string; - contactId: string; - contactName: string; - text: string; - kind: "message" | "call"; - direction: "in" | "out"; - channel: "Telegram" | "WhatsApp" | "Instagram" | "Phone" | "Email"; - at: string; - durationSec: number | null; - transcript: string[] | null; - } - | { - id: string; - type: "update_deal_stage"; - createdAt: string; - dealId: string; - stage: string; - dealTitle: string; - }; - function makeId(prefix: string) { return `${prefix}_${Date.now()}_${Math.random().toString(16).slice(2, 10)}`; } -function toChannel(channel: "Telegram" | "WhatsApp" | "Instagram" | "Phone" | "Email") { - if (channel === "Telegram") return "TELEGRAM" as const; - if (channel === "WhatsApp") return "WHATSAPP" as const; - if (channel === "Instagram") return "INSTAGRAM" as const; - if (channel === "Email") return "EMAIL" as const; - return "PHONE" as const; -} - async function resolveContact(teamId: string, contactRef: string) { const contact = contactRef.trim(); if (!contact) return null; @@ -446,7 +392,6 @@ export async function runLangGraphCrmAgentFor(input: { const toolsUsed: string[] = []; const dbWrites: Array<{ kind: string; detail: string }> = []; const toolRuns: NonNullable = []; - const pendingChanges: PendingChange[] = []; async function emitTrace(event: AgentTraceEvent) { lfTrace?.event({ @@ -489,146 +434,38 @@ export async function runLangGraphCrmAgentFor(input: { const CrmToolSchema = z.object({ action: z.enum([ - "get_snapshot", - "list_contacts_digest", - "get_contact_snapshot", - "get_calendar_window", - "query_contacts", - "query_deals", - "query_events", - "pending_changes", - "discard_changes", - "commit_changes", - "update_contact_note", - "create_event", - "create_events_batch", - "create_message", - "update_deal_stage", + "getContactsList", + "getContactSnapshot", + "getUserCalendarWindow", + "updateContactSummary", + "createUserCalendarEvent", ]), - // queries + // read actions query: z.string().optional(), - stage: z.string().optional(), from: z.string().optional(), to: z.string().optional(), limit: z.number().int().optional(), offset: z.number().int().min(0).optional(), - mode: z.enum(["stage", "apply"]).optional(), includeArchived: z.boolean().optional(), messagesLimit: z.number().int().optional(), eventsLimit: z.number().int().optional(), dealsLimit: z.number().int().optional(), - // writes + // write actions contact: z.string().optional(), contactId: z.string().optional(), + summary: z.string().optional(), content: z.string().optional(), title: z.string().optional(), start: z.string().optional(), end: z.string().optional(), note: z.string().optional(), archived: z.boolean().optional(), - channel: z.enum(["Telegram", "WhatsApp", "Instagram", "Phone", "Email"]).optional(), - kind: z.enum(["message", "call"]).optional(), - direction: z.enum(["in", "out"]).optional(), - text: z.string().optional(), - at: z.string().optional(), - durationSec: z.number().int().optional(), - transcript: z.array(z.string()).optional(), - dealId: z.string().optional(), - events: z - .array( - z.object({ - contact: z.string().optional(), - contactId: z.string().optional(), - title: z.string(), - start: z.string(), - end: z.string().optional(), - note: z.string().optional(), - archived: z.boolean().optional(), - }), - ) - .optional(), - }); - - const applyPendingChanges = async () => { - const queue = pendingChanges.splice(0, pendingChanges.length); - if (!queue.length) { - return { ok: true, applied: 0, changes: [] as Array<{ id: string; type: string; detail: string }> }; - } - - const applied: Array<{ id: string; type: string; detail: string }> = []; - - await prisma.$transaction(async (tx) => { - for (const change of queue) { - if (change.type === "update_contact_note") { - await tx.contactNote.upsert({ - where: { contactId: change.contactId }, - update: { content: change.content }, - create: { contactId: change.contactId, content: change.content }, - }); - applied.push({ id: change.id, type: change.type, detail: `${change.contactName}: summary updated` }); - continue; - } - - if (change.type === "create_event") { - const created = await tx.calendarEvent.create({ - data: { - teamId: input.teamId, - contactId: change.contactId, - title: change.title, - startsAt: new Date(change.start), - endsAt: change.end ? new Date(change.end) : null, - note: change.note, - isArchived: change.isArchived, - }, - }); - applied.push({ id: change.id, type: change.type, detail: `created event ${created.id}` }); - continue; - } - - if (change.type === "create_message") { - const created = await tx.contactMessage.create({ - data: { - contactId: change.contactId, - kind: change.kind === "call" ? "CALL" : "MESSAGE", - direction: change.direction === "in" ? "IN" : "OUT", - channel: toChannel(change.channel), - content: change.text, - durationSec: change.durationSec, - transcriptJson: Array.isArray(change.transcript) ? change.transcript : Prisma.JsonNull, - occurredAt: new Date(change.at), - }, - }); - applied.push({ id: change.id, type: change.type, detail: `created message ${created.id}` }); - continue; - } - - if (change.type === "update_deal_stage") { - const updated = await tx.deal.updateMany({ - where: { id: change.dealId, teamId: input.teamId }, - data: { stage: change.stage }, - }); - if (updated.count === 0) throw new Error(`deal not found: ${change.dealId}`); - applied.push({ id: change.id, type: change.type, detail: `${change.dealTitle}: stage -> ${change.stage}` }); - continue; - } - } - }); - - for (const item of applied) { - dbWrites.push({ kind: item.type, detail: item.detail }); - } - - return { ok: true, applied: applied.length, changes: applied }; }; const readActionNames = new Set([ - "get_snapshot", - "list_contacts_digest", - "get_contact_snapshot", - "get_calendar_window", - "query_contacts", - "query_deals", - "query_events", + "getContactsList", + "getContactSnapshot", + "getUserCalendarWindow", ]); const readToolCache = new Map(); const invalidateReadCache = () => { @@ -667,16 +504,7 @@ export async function runLangGraphCrmAgentFor(input: { const fromValue = raw.from ?? raw.start; const toValue = raw.to ?? raw.end; - if (raw.action === "get_snapshot") { - const snapshot = await buildCrmSnapshot({ - teamId: input.teamId, - contact: raw.contact, - contactsLimit: raw.limit, - }); - return cacheReadResult(JSON.stringify(snapshot, null, 2)); - } - - if (raw.action === "list_contacts_digest") { + if (raw.action === "getContactsList") { const q = (raw.query ?? "").trim(); const limit = Math.max(1, Math.min(raw.limit ?? 50, 200)); const offset = Math.max(0, raw.offset ?? 0); @@ -773,7 +601,7 @@ export async function runLangGraphCrmAgentFor(input: { ); } - if (raw.action === "get_contact_snapshot") { + if (raw.action === "getContactSnapshot") { const contactRef = (raw.contact ?? "").trim(); const contactId = (raw.contactId ?? "").trim(); const messagesLimit = Math.max(1, Math.min(raw.messagesLimit ?? 20, 100)); @@ -907,7 +735,7 @@ export async function runLangGraphCrmAgentFor(input: { ); } - if (raw.action === "get_calendar_window") { + if (raw.action === "getUserCalendarWindow") { const from = fromValue ? new Date(fromValue) : new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); const to = toValue ? new Date(toValue) : new Date(Date.now() + 30 * 24 * 60 * 60 * 1000); if (Number.isNaN(from.getTime()) || Number.isNaN(to.getTime())) { @@ -966,223 +794,25 @@ export async function runLangGraphCrmAgentFor(input: { ); } - if (raw.action === "query_contacts") { - const q = (raw.query ?? "").trim(); - const offset = Math.max(0, raw.offset ?? 0); - const limit = Math.max(1, Math.min(raw.limit ?? 20, 100)); - const items = await prisma.contact.findMany({ - where: { - teamId: input.teamId, - ...(q - ? { - OR: [ - { name: { contains: q } }, - { company: { contains: q } }, - { email: { contains: q } }, - { phone: { contains: q } }, - ], - } - : {}), - }, - orderBy: { updatedAt: "desc" }, - skip: offset, - take: limit, - include: { note: { select: { content: true, updatedAt: true } } }, - }); - return cacheReadResult( - JSON.stringify( - { - items: items.map((c) => ({ - id: c.id, - name: c.name, - company: c.company, - country: c.country, - location: c.location, - email: c.email, - phone: c.phone, - note: c.note?.content ?? null, - })), - pagination: { - offset, - limit, - returned: items.length, - hasMore: items.length === limit, - nextOffset: offset + items.length, - }, - }, - null, - 2, - ), - ); - } - - if (raw.action === "query_deals") { - const offset = Math.max(0, raw.offset ?? 0); - const limit = Math.max(1, Math.min(raw.limit ?? 20, 100)); - const updatedAt: { gte?: Date; lte?: Date } = {}; - if (fromValue) { - const from = new Date(fromValue); - if (!Number.isNaN(from.getTime())) updatedAt.gte = from; - } - if (toValue) { - const to = new Date(toValue); - if (!Number.isNaN(to.getTime())) updatedAt.lte = to; - } - const items = await prisma.deal.findMany({ - where: { - teamId: input.teamId, - ...(raw.stage ? { stage: raw.stage } : {}), - ...(updatedAt.gte || updatedAt.lte ? { updatedAt } : {}), - }, - orderBy: { updatedAt: "desc" }, - skip: offset, - take: limit, - include: { - contact: { select: { name: true, company: true } }, - steps: { - select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true }, - orderBy: [{ order: "asc" }, { createdAt: "asc" }], - }, - }, - }); - return cacheReadResult( - JSON.stringify( - { - items: items.map((c) => ({ - id: c.id, - title: c.title, - stage: c.stage, - amount: c.amount, - nextStep: c.nextStep, - summary: c.summary, - currentStepId: c.currentStepId, - steps: c.steps.map((s) => ({ - id: s.id, - title: s.title, - status: s.status, - dueAt: s.dueAt ? s.dueAt.toISOString() : null, - order: s.order, - completedAt: s.completedAt ? s.completedAt.toISOString() : null, - })), - contact: c.contact.name, - company: c.contact.company, - })), - pagination: { - offset, - limit, - returned: items.length, - hasMore: items.length === limit, - nextOffset: offset + items.length, - }, - }, - null, - 2, - ), - ); - } - - if (raw.action === "query_events") { - const from = fromValue ? new Date(fromValue) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); - const to = toValue ? new Date(toValue) : new Date(Date.now() + 60 * 24 * 60 * 60 * 1000); - if (Number.isNaN(from.getTime()) || Number.isNaN(to.getTime())) { - throw new Error("from/to range is invalid"); - } - const offset = Math.max(0, raw.offset ?? 0); - const limit = Math.max(1, Math.min(raw.limit ?? 100, 500)); - const items = await prisma.calendarEvent.findMany({ - where: { - teamId: input.teamId, - startsAt: { gte: from, lte: to }, - ...(raw.title ? { title: { contains: raw.title } } : {}), - ...(raw.includeArchived ? {} : { isArchived: false }), - }, - orderBy: { startsAt: "asc" }, - skip: offset, - take: limit, - include: { contact: { select: { name: true } } }, - }); - return cacheReadResult( - JSON.stringify( - { - items: items.map((e) => ({ - id: e.id, - title: e.title, - startsAt: e.startsAt.toISOString(), - endsAt: (e.endsAt ?? e.startsAt).toISOString(), - note: e.note, - isArchived: e.isArchived, - contact: e.contact?.name ?? null, - })), - pagination: { - offset, - limit, - returned: items.length, - hasMore: items.length === limit, - nextOffset: offset + items.length, - }, - }, - null, - 2, - ), - ); - } - - if (raw.action === "pending_changes") { - return JSON.stringify( - { - count: pendingChanges.length, - items: pendingChanges.map((item) => ({ - id: item.id, - type: item.type, - createdAt: item.createdAt, - })), - }, - null, - 2, - ); - } - - if (raw.action === "discard_changes") { - const discarded = pendingChanges.length; - pendingChanges.splice(0, pendingChanges.length); - invalidateReadCache(); - return JSON.stringify({ ok: true, discarded }, null, 2); - } - - if (raw.action === "commit_changes") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - - if (raw.action === "update_contact_note") { + if (raw.action === "updateContactSummary") { const contactName = (raw.contact ?? "").trim(); - const content = (raw.content ?? "").trim(); + const content = (raw.summary ?? raw.content ?? "").trim(); if (!contactName) throw new Error("contact is required"); - if (!content) throw new Error("content is required"); + if (!content) throw new Error("summary is required"); const contact = await resolveContact(input.teamId, contactName); if (!contact) throw new Error("contact not found"); - - pendingChanges.push({ - id: makeId("chg"), - type: "update_contact_note", - createdAt: iso(new Date()), - contactId: contact.id, - contactName: contact.name, - content, + await prisma.contactNote.upsert({ + where: { contactId: contact.id }, + update: { content }, + create: { contactId: contact.id, content }, }); invalidateReadCache(); - - if (raw.mode === "apply") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); + dbWrites.push({ kind: "updateContactSummary", detail: `${contact.name}: summary updated` }); + return JSON.stringify({ ok: true, applied: 1 }, null, 2); } - if (raw.action === "create_event") { + if (raw.action === "createUserCalendarEvent") { const title = (raw.title ?? "").trim(); const start = raw.start ? new Date(raw.start) : null; if (!title) throw new Error("title is required"); @@ -1199,140 +829,26 @@ export async function runLangGraphCrmAgentFor(input: { }) : null) || (contactName ? await resolveContact(input.teamId, contactName) : null); - - pendingChanges.push({ - id: makeId("chg"), - type: "create_event", - createdAt: iso(new Date()), - contactId: contact?.id ?? null, - contactName: contact?.name ?? null, - title, - start: iso(start), - end: end && !Number.isNaN(end.getTime()) ? iso(end) : null, - note: (raw.note ?? "").trim() || null, - isArchived: Boolean(raw.archived), - }); - invalidateReadCache(); - - if (raw.mode === "apply") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); - } - - if (raw.action === "create_events_batch") { - const batch = Array.isArray(raw.events) ? raw.events : []; - if (!batch.length) throw new Error("events is required"); - const capped = batch.slice(0, 200); - - for (const item of capped) { - const title = (item.title ?? "").trim(); - const start = item.start ? new Date(item.start) : null; - if (!title) throw new Error("events[].title is required"); - if (!start || Number.isNaN(start.getTime())) throw new Error("events[].start is invalid"); - - const end = item.end ? new Date(item.end) : null; - const contactId = (item.contactId ?? "").trim(); - const contactName = (item.contact ?? "").trim(); - const contact = - (contactId - ? await prisma.contact.findFirst({ - where: { id: contactId, teamId: input.teamId }, - select: { id: true, name: true }, - }) - : null) || - (contactName ? await resolveContact(input.teamId, contactName) : null); - if (contactId && !contact) throw new Error(`contact not found: ${contactId}`); - - pendingChanges.push({ - id: makeId("chg"), - type: "create_event", - createdAt: iso(new Date()), - contactId: contact?.id ?? null, - contactName: contact?.name ?? null, + if (contactId && !contact) throw new Error("contact not found"); + const created = await prisma.calendarEvent.create({ + data: { + teamId: input.teamId, title, - start: iso(start), - end: end && !Number.isNaN(end.getTime()) ? iso(end) : null, - note: (item.note ?? "").trim() || null, - isArchived: Boolean(item.archived), - }); - } - invalidateReadCache(); - - if (raw.mode === "apply") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - return JSON.stringify({ ok: true, staged: capped.length, pending: pendingChanges.length }, null, 2); - } - - if (raw.action === "create_message") { - const contactName = (raw.contact ?? "").trim(); - const text = (raw.text ?? "").trim(); - if (!contactName) throw new Error("contact is required"); - if (!text) throw new Error("text is required"); - - const contact = await resolveContact(input.teamId, contactName); - if (!contact) throw new Error("contact not found"); - - const occurredAt = raw.at ? new Date(raw.at) : new Date(); - if (Number.isNaN(occurredAt.getTime())) throw new Error("at is invalid"); - - pendingChanges.push({ - id: makeId("chg"), - type: "create_message", - createdAt: iso(new Date()), - contactId: contact.id, - contactName: contact.name, - kind: raw.kind === "call" ? "call" : "message", - direction: raw.direction === "in" ? "in" : "out", - channel: raw.channel ?? "Phone", - text, - at: iso(occurredAt), - durationSec: typeof raw.durationSec === "number" ? raw.durationSec : null, - transcript: Array.isArray(raw.transcript) ? raw.transcript : null, + startsAt: start, + endsAt: end && !Number.isNaN(end.getTime()) ? end : null, + note: (raw.note ?? "").trim() || null, + isArchived: Boolean(raw.archived), + contactId: contact?.id ?? null, + }, }); invalidateReadCache(); - - if (raw.mode === "apply") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); - } - - if (raw.action === "update_deal_stage") { - const dealId = (raw.dealId ?? "").trim(); - const stage = (raw.stage ?? "").trim(); - if (!dealId) throw new Error("dealId is required"); - if (!stage) throw new Error("stage is required"); - - const deal = await prisma.deal.findFirst({ - where: { id: dealId, teamId: input.teamId }, - select: { id: true, title: true }, - }); - if (!deal) throw new Error("deal not found"); - - pendingChanges.push({ - id: makeId("chg"), - type: "update_deal_stage", - createdAt: iso(new Date()), - dealId: deal.id, - dealTitle: deal.title, - stage, - }); - invalidateReadCache(); - - if (raw.mode === "apply") { - const committed = await applyPendingChanges(); - invalidateReadCache(); - return JSON.stringify(committed, null, 2); - } - return JSON.stringify({ ok: true, staged: true, pending: pendingChanges.length }, null, 2); + dbWrites.push({ kind: "createUserCalendarEvent", detail: `created event ${created.id}` }); + return JSON.stringify({ + ok: true, + applied: 1, + eventId: created.id, + contactId: contact?.id ?? null, + }, null, 2); } return JSON.stringify({ ok: false, error: "unknown action" }); @@ -1372,7 +888,7 @@ export async function runLangGraphCrmAgentFor(input: { { name: "crm", description: - "Query and update CRM data (contacts, deals, events, communications). Use this tool for any data you need beyond the snapshot.", + "CRM tool with exactly five actions: getContactsList, getContactSnapshot, getUserCalendarWindow, updateContactSummary, createUserCalendarEvent.", schema: CrmToolSchema, }, ); @@ -1432,16 +948,10 @@ export async function runLangGraphCrmAgentFor(input: { "- Be concrete and complete. Do not cut important details in the final answer.", "- Work in short iterative cycles. Do not stop after the first thought if the task needs more than one action.", "- You are given a structured CRM JSON snapshot as baseline context.", - "- Prefer this data flow to keep context small:", - " 1) crm.list_contacts_digest for the roster and prioritization.", - " 2) crm.get_contact_snapshot for one focused contact (messages/events/deals/summary in one call).", - " 3) crm.get_calendar_window for calendar constraints.", - "- Use crm.query_* only for narrow follow-up filters.", + "- Only use these actions: crm.getContactsList, crm.getContactSnapshot, crm.getUserCalendarWindow, crm.updateContactSummary, crm.createUserCalendarEvent.", + "- Use crm.getContactsList first to choose contacts, then crm.getContactSnapshot for deep context, then crm.getUserCalendarWindow for schedule validation.", "- Avoid repeating identical read calls with the same arguments.", - "- If creating many events, prefer crm.create_events_batch instead of many crm.create_event calls.", - "- For changes, stage first with mode=stage. Commit only when user asks to execute.", - "- You can apply immediately with mode=apply only if user explicitly asked to do it now.", - "- Use pending_changes and commit_changes to control staged updates.", + "- Write actions are immediate DB changes. Do not mention staging or commit.", "- Do not claim you sent an external message; you can only create CRM records.", "", "CRM Snapshot JSON:", @@ -1503,7 +1013,7 @@ export async function runLangGraphCrmAgentFor(input: { }; for (let cycle = 1; ; cycle += 1) { - const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes, pendingChanges.length); + const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes); const cycleSpan = lfTrace?.span({ name: "agent.cycle", input: userPrompt, @@ -1515,7 +1025,6 @@ export async function runLangGraphCrmAgentFor(input: { await emitTrace({ text: "Анализирую задачу и текущий контекст CRM." }); const beforeRuns = toolRuns.length; const beforeWrites = dbWrites.length; - const beforePending = pendingChanges.length; let res: any; try { @@ -1559,15 +1068,13 @@ export async function runLangGraphCrmAgentFor(input: { finalText = parsed.text; } - const progressed = - toolRuns.length > beforeRuns || dbWrites.length > beforeWrites || pendingChanges.length !== beforePending; + const progressed = toolRuns.length > beforeRuns || dbWrites.length > beforeWrites; cycleSpan?.end({ output: parsed.text || "", metadata: { progressed, toolRunsDelta: toolRuns.length - beforeRuns, dbWritesDelta: dbWrites.length - beforeWrites, - pendingDelta: pendingChanges.length - beforePending, }, }); if (progressed) { @@ -1593,7 +1100,6 @@ export async function runLangGraphCrmAgentFor(input: { toolsUsedCount: toolsUsed.length, toolRunsCount: toolRuns.length, dbWritesCount: dbWrites.length, - pendingChangesCount: pendingChanges.length, }, }); void langfuse?.flushAsync().catch(() => {});