Files
clientsflow/frontend/server/agent/langgraphCrmAgent.ts
2026-02-20 12:10:25 +07:00

1121 lines
38 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { randomUUID } from "node:crypto";
import type { AgentReply, AgentTraceEvent } from "./crmAgent";
import { prisma } from "../utils/prisma";
import { ensureDataset } from "../dataset/exporter";
import { createReactAgent } from "@langchain/langgraph/prebuilt";
import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { getLangfuseClient } from "../utils/langfuse";
function iso(d: Date) {
return d.toISOString();
}
function cyclePrompt(userText: string, cycle: number, cycleNotes: string[]) {
if (cycle === 1) return userText;
return [
"Continue solving the same user request.",
`User request: ${userText}`,
cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.",
"Do the next useful step. If done, produce final concise answer.",
].join("\n");
}
type GigachatTokenCache = {
token: string;
expiresAtSec: number;
};
let gigachatTokenCache: GigachatTokenCache | null = null;
function normalizeAuthHeader(authKey: string, scheme: "Basic" | "Bearer") {
const key = authKey.trim();
if (!key) return "";
if (key.startsWith("Basic ") || key.startsWith("Bearer ")) return key;
return `${scheme} ${key}`;
}
async function requestGigachatToken(input: {
authKey: string;
scope: string;
oauthUrl: string;
authScheme: "Basic" | "Bearer";
}) {
const body = new URLSearchParams();
body.set("scope", input.scope);
const res = await fetch(input.oauthUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
Accept: "application/json",
RqUID: randomUUID(),
Authorization: normalizeAuthHeader(input.authKey, input.authScheme),
},
body: body.toString(),
});
if (!res.ok) {
const text = await res.text().catch(() => "");
throw new Error(`GigaChat oauth failed: ${res.status} ${text.slice(0, 240)}`);
}
const payload = (await res.json()) as { access_token?: string; expires_at?: number };
if (!payload?.access_token) {
throw new Error("GigaChat oauth failed: access_token is missing");
}
return {
token: payload.access_token,
expiresAtSec: typeof payload.expires_at === "number" ? payload.expires_at : Math.floor(Date.now() / 1000) + 25 * 60,
};
}
async function getGigachatAccessToken(input: {
authKey: string;
scope: string;
oauthUrl: string;
}) {
const nowSec = Math.floor(Date.now() / 1000);
if (gigachatTokenCache && gigachatTokenCache.expiresAtSec - nowSec > 60) {
return gigachatTokenCache.token;
}
try {
const token = await requestGigachatToken({ ...input, authScheme: "Basic" });
gigachatTokenCache = token;
return token.token;
} catch {
const token = await requestGigachatToken({ ...input, authScheme: "Bearer" });
gigachatTokenCache = token;
return token.token;
}
}
type SnapshotOptions = {
teamId: string;
contact?: string;
contactsLimit?: number;
};
function makeId(prefix: string) {
return `${prefix}_${Date.now()}_${Math.random().toString(16).slice(2, 10)}`;
}
async function resolveContact(teamId: string, contactRef: string) {
const contact = contactRef.trim();
if (!contact) return null;
const exact = await prisma.contact.findFirst({
where: { teamId, OR: [{ id: contact }, { name: contact }] },
select: { id: true, name: true },
});
if (exact) return exact;
return prisma.contact.findFirst({
where: { teamId, name: { contains: contact } },
orderBy: { updatedAt: "desc" },
select: { id: true, name: true },
});
}
async function buildCrmSnapshot(input: SnapshotOptions) {
const now = new Date();
const in7 = new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000);
const contactLimit = Math.max(1, Math.min(input.contactsLimit ?? 25, 80));
const selectedContact = input.contact ? await resolveContact(input.teamId, input.contact) : null;
const contactWhere = selectedContact ? { teamId: input.teamId, id: selectedContact.id } : { teamId: input.teamId };
const [contacts, upcoming, deals, docs, totalContacts, totalDeals, totalEvents] = await Promise.all([
prisma.contact.findMany({
where: contactWhere,
orderBy: { updatedAt: "desc" },
take: selectedContact ? 1 : contactLimit,
include: {
note: { select: { content: true, updatedAt: true } },
messages: {
select: { id: true, occurredAt: true, channel: true, direction: true, kind: true, content: true },
orderBy: { occurredAt: "desc" },
take: 4,
},
events: {
select: { id: true, title: true, startsAt: true, endsAt: true, isArchived: true },
orderBy: { startsAt: "asc" },
where: { startsAt: { gte: new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000) } },
take: 4,
},
deals: {
select: {
id: true,
title: true,
stage: true,
amount: true,
updatedAt: true,
nextStep: true,
summary: true,
currentStepId: true,
steps: {
select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true },
orderBy: [{ order: "asc" }, { createdAt: "asc" }],
},
},
orderBy: { updatedAt: "desc" },
take: 3,
},
pins: {
select: { id: true, text: true, updatedAt: true },
orderBy: { updatedAt: "desc" },
take: 3,
},
},
}),
prisma.calendarEvent.findMany({
where: { teamId: input.teamId, startsAt: { gte: now, lte: in7 } },
orderBy: { startsAt: "asc" },
take: 20,
include: { contact: { select: { name: true } } },
}),
prisma.deal.findMany({
where: { teamId: input.teamId },
orderBy: { updatedAt: "desc" },
take: 20,
include: {
contact: { select: { name: true, company: true } },
steps: { select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true }, orderBy: [{ order: "asc" }, { createdAt: "asc" }] },
},
}),
prisma.workspaceDocument.findMany({
where: { teamId: input.teamId },
orderBy: { updatedAt: "desc" },
take: 12,
select: { id: true, type: true, title: true, summary: true, updatedAt: true, owner: true, scope: true },
}),
prisma.contact.count({ where: { teamId: input.teamId } }),
prisma.deal.count({ where: { teamId: input.teamId } }),
prisma.calendarEvent.count({ where: { teamId: input.teamId, startsAt: { gte: now } } }),
]);
const byStage = new Map<string, number>();
for (const d of deals) byStage.set(d.stage, (byStage.get(d.stage) ?? 0) + 1);
return {
meta: {
generatedAt: iso(now),
focusContactId: selectedContact?.id ?? null,
contactsIncluded: contacts.length,
mode: selectedContact ? "focused" : "team",
},
totals: {
contacts: totalContacts,
deals: totalDeals,
upcomingEvents: totalEvents,
},
stats: {
dealsByStage: [...byStage.entries()]
.sort((a, b) => b[1] - a[1])
.map(([stage, count]) => ({ stage, count })),
},
upcomingEvents: upcoming.map((e) => ({
id: e.id,
title: e.title,
startsAt: iso(e.startsAt),
endsAt: iso(e.endsAt ?? e.startsAt),
isArchived: e.isArchived,
note: e.note,
contact: e.contact?.name ?? null,
})),
deals: deals.map((d) => ({
id: d.id,
title: d.title,
stage: d.stage,
amount: d.amount,
nextStep: d.nextStep,
summary: d.summary,
currentStepId: d.currentStepId,
steps: d.steps.map((s) => ({
id: s.id,
title: s.title,
status: s.status,
dueAt: s.dueAt ? iso(s.dueAt) : null,
order: s.order,
completedAt: s.completedAt ? iso(s.completedAt) : null,
})),
updatedAt: iso(d.updatedAt),
contact: {
name: d.contact.name,
company: d.contact.company,
},
})),
contacts: contacts.map((c) => ({
id: c.id,
name: c.name,
company: c.company,
country: c.country,
location: c.location,
email: c.email,
phone: c.phone,
avatarUrl: c.avatarUrl,
updatedAt: iso(c.updatedAt),
summary: c.note?.content ?? null,
summaryUpdatedAt: c.note?.updatedAt ? iso(c.note.updatedAt) : null,
latestMessages: c.messages.map((m) => ({
id: m.id,
occurredAt: iso(m.occurredAt),
channel: m.channel,
direction: m.direction,
kind: m.kind,
content: m.content,
})),
latestEvents: c.events.map((e) => ({
id: e.id,
title: e.title,
startsAt: iso(e.startsAt),
endsAt: iso(e.endsAt ?? e.startsAt),
isArchived: e.isArchived,
})),
deals: c.deals.map((d) => ({
id: d.id,
title: d.title,
stage: d.stage,
amount: d.amount,
nextStep: d.nextStep,
summary: d.summary,
currentStepId: d.currentStepId,
steps: d.steps.map((s) => ({
id: s.id,
title: s.title,
status: s.status,
dueAt: s.dueAt ? iso(s.dueAt) : null,
order: s.order,
completedAt: s.completedAt ? iso(s.completedAt) : null,
})),
updatedAt: iso(d.updatedAt),
})),
pins: c.pins.map((p) => ({
id: p.id,
text: p.text,
updatedAt: iso(p.updatedAt),
})),
})),
documents: docs.map((d) => ({
id: d.id,
type: d.type,
title: d.title,
summary: d.summary,
owner: d.owner,
scope: d.scope,
updatedAt: iso(d.updatedAt),
})),
};
}
export async function runLangGraphCrmAgentFor(input: {
teamId: string;
userId: string;
userText: string;
requestId?: string;
conversationId?: string;
onTrace?: (event: AgentTraceEvent) => Promise<void> | void;
}): Promise<AgentReply> {
const openrouterApiKey = (process.env.OPENROUTER_API_KEY ?? "").trim();
const openrouterBaseURL = (process.env.OPENROUTER_BASE_URL ?? "https://openrouter.ai/api/v1").trim();
const openrouterModel = (process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini").trim();
const openrouterReferer = (process.env.OPENROUTER_HTTP_REFERER ?? "").trim();
const openrouterTitle = (process.env.OPENROUTER_X_TITLE ?? "").trim();
const openrouterReasoningEnabled = (process.env.OPENROUTER_REASONING_ENABLED ?? "").trim() === "1";
const genericApiKey =
process.env.LLM_API_KEY ||
process.env.OPENAI_API_KEY ||
process.env.DASHSCOPE_API_KEY ||
process.env.QWEN_API_KEY;
const genericBaseURL =
process.env.LLM_BASE_URL ||
process.env.OPENAI_BASE_URL ||
process.env.DASHSCOPE_BASE_URL ||
process.env.QWEN_BASE_URL;
const genericModel =
process.env.LLM_MODEL ||
process.env.OPENAI_MODEL ||
process.env.DASHSCOPE_MODEL ||
process.env.QWEN_MODEL ||
"gpt-4o-mini";
const gigachatAuthKey = (process.env.GIGACHAT_AUTH_KEY ?? "").trim();
const gigachatModel = (process.env.GIGACHAT_MODEL ?? "").trim();
const gigachatScope = (process.env.GIGACHAT_SCOPE ?? "").trim();
const gigachatOauthUrl = (process.env.GIGACHAT_OAUTH_URL ?? "https://ngw.devices.sberbank.ru:9443/api/v2/oauth").trim();
const gigachatBaseUrl = (process.env.GIGACHAT_BASE_URL ?? "https://gigachat.devices.sberbank.ru/api/v1").trim();
const useGigachat = Boolean(gigachatAuthKey && gigachatScope);
let llmApiKey = genericApiKey;
let llmBaseURL = genericBaseURL;
let llmModel = genericModel;
let llmHeaders: Record<string, string> | undefined;
let llmReasoningEnabled = false;
if (openrouterApiKey) {
llmApiKey = openrouterApiKey;
llmBaseURL = openrouterBaseURL;
llmModel = openrouterModel;
llmReasoningEnabled = openrouterReasoningEnabled;
llmHeaders = {
...(openrouterReferer ? { "HTTP-Referer": openrouterReferer } : {}),
...(openrouterTitle ? { "X-Title": openrouterTitle } : {}),
};
}
if (useGigachat) {
try {
llmApiKey = await getGigachatAccessToken({
authKey: gigachatAuthKey,
scope: gigachatScope,
oauthUrl: gigachatOauthUrl,
});
llmBaseURL = gigachatBaseUrl;
llmModel = gigachatModel || "GigaChat-2-Max";
} catch (e: any) {
throw new Error(`Не удалось получить токен GigaChat: ${String(e?.message || e)}`);
}
}
if (!llmApiKey) {
throw new Error(
"LLM API key is not configured. Set OPENROUTER_API_KEY (or GIGACHAT_AUTH_KEY/GIGACHAT_SCOPE) and restart.",
);
}
// Keep the dataset fresh so the "CRM filesystem" stays in sync with DB.
await ensureDataset({ teamId: input.teamId, userId: input.userId });
const toolsUsed: string[] = [];
const dbWrites: Array<{ kind: string; detail: string }> = [];
const toolRuns: NonNullable<AgentReply["toolRuns"]> = [];
async function emitTrace(event: AgentTraceEvent) {
lfTrace?.event({
name: "agent.trace",
input: {
text: event.text,
toolRun: event.toolRun ?? null,
},
metadata: {
requestId: input.requestId ?? null,
},
});
if (!input.onTrace) return;
try {
await input.onTrace(event);
} catch {
// Trace transport errors must not break the agent response.
}
}
function compact(value: unknown, max = 240) {
const text = typeof value === "string" ? value : JSON.stringify(value);
if (!text) return "";
return text.length > max ? `${text.slice(0, max)}...` : text;
}
function stableStringify(value: unknown): string {
const walk = (input: unknown): unknown => {
if (Array.isArray(input)) return input.map(walk);
if (!input || typeof input !== "object") return input;
const obj = input as Record<string, unknown>;
return Object.fromEntries(
Object.keys(obj)
.sort()
.map((key) => [key, walk(obj[key])]),
);
};
return JSON.stringify(walk(value));
}
const CrmToolSchema = z.object({
action: z.enum([
"getContactsList",
"getContactSnapshot",
"getUserCalendarWindow",
"updateContactSummary",
"createUserCalendarEvent",
]),
// read actions
query: z.string().optional(),
from: z.string().optional(),
to: z.string().optional(),
limit: z.number().int().optional(),
offset: z.number().int().min(0).optional(),
includeArchived: z.boolean().optional(),
messagesLimit: z.number().int().optional(),
eventsLimit: z.number().int().optional(),
dealsLimit: z.number().int().optional(),
// write actions
contact: z.string().optional(),
contactId: z.string().optional(),
summary: z.string().optional(),
content: z.string().optional(),
title: z.string().optional(),
start: z.string().optional(),
end: z.string().optional(),
note: z.string().optional(),
archived: z.boolean().optional(),
};
const readActionNames = new Set([
"getContactsList",
"getContactSnapshot",
"getUserCalendarWindow",
]);
const readToolCache = new Map<string, string>();
const invalidateReadCache = () => {
readToolCache.clear();
};
const crmTool = tool(
async (rawInput: unknown) => {
const raw = CrmToolSchema.parse(rawInput);
const toolName = `crm:${raw.action}`;
const startedAt = new Date().toISOString();
toolsUsed.push(toolName);
await emitTrace({ text: `Использую инструмент: ${toolName}` });
const executeAction = async () => {
const readCacheKey = readActionNames.has(raw.action) ? `${raw.action}:${stableStringify(raw)}` : "";
const cacheReadResult = (result: string) => {
if (readCacheKey) {
readToolCache.set(readCacheKey, result);
}
return result;
};
if (readCacheKey && readToolCache.has(readCacheKey)) {
return JSON.stringify(
{
cached: true,
action: raw.action,
note: "Identical read query was already returned earlier in this request. Reuse previous tool output.",
},
null,
2,
);
}
const fromValue = raw.from ?? raw.start;
const toValue = raw.to ?? raw.end;
if (raw.action === "getContactsList") {
const q = (raw.query ?? "").trim();
const limit = Math.max(1, Math.min(raw.limit ?? 50, 200));
const offset = Math.max(0, raw.offset ?? 0);
const now = new Date();
const items = await prisma.contact.findMany({
where: {
teamId: input.teamId,
...(q
? {
OR: [
{ name: { contains: q } },
{ company: { contains: q } },
{ email: { contains: q } },
{ phone: { contains: q } },
],
}
: {}),
},
orderBy: [{ updatedAt: "desc" }, { id: "asc" }],
skip: offset,
take: limit,
include: {
note: { select: { content: true, updatedAt: true } },
messages: {
select: { occurredAt: true, channel: true, direction: true, kind: true, content: true },
orderBy: { occurredAt: "desc" },
take: 1,
},
events: {
select: { id: true, title: true, startsAt: true, endsAt: true, isArchived: true },
where: {
startsAt: { gte: now },
...(raw.includeArchived ? {} : { isArchived: false }),
},
orderBy: { startsAt: "asc" },
take: 1,
},
deals: {
select: { id: true, stage: true, title: true, amount: true, updatedAt: true, nextStep: true, summary: true },
orderBy: { updatedAt: "desc" },
take: 1,
},
_count: {
select: { messages: true, events: true, deals: true },
},
},
});
return cacheReadResult(
JSON.stringify(
{
items: items.map((c) => ({
id: c.id,
name: c.name,
company: c.company,
country: c.country,
location: c.location,
email: c.email,
phone: c.phone,
summary: c.note?.content ?? null,
lastMessage: c.messages[0]
? {
occurredAt: c.messages[0].occurredAt.toISOString(),
channel: c.messages[0].channel,
direction: c.messages[0].direction,
kind: c.messages[0].kind,
content: c.messages[0].content,
}
: null,
nextEvent: c.events[0]
? {
id: c.events[0].id,
title: c.events[0].title,
startsAt: c.events[0].startsAt.toISOString(),
endsAt: (c.events[0].endsAt ?? c.events[0].startsAt).toISOString(),
isArchived: c.events[0].isArchived,
}
: null,
latestDeal: c.deals[0] ?? null,
counts: c._count,
})),
pagination: {
offset,
limit,
returned: items.length,
hasMore: items.length === limit,
nextOffset: offset + items.length,
},
},
null,
2,
),
);
}
if (raw.action === "getContactSnapshot") {
const contactRef = (raw.contact ?? "").trim();
const contactId = (raw.contactId ?? "").trim();
const messagesLimit = Math.max(1, Math.min(raw.messagesLimit ?? 20, 100));
const eventsLimit = Math.max(1, Math.min(raw.eventsLimit ?? 20, 100));
const dealsLimit = Math.max(1, Math.min(raw.dealsLimit ?? 5, 20));
let target: { id: string; name: string } | null = null;
if (contactId) {
target = await prisma.contact.findFirst({
where: { id: contactId, teamId: input.teamId },
select: { id: true, name: true },
});
}
if (!target && contactRef) {
target = await resolveContact(input.teamId, contactRef);
}
if (!target) {
throw new Error("contact/contactId is required");
}
const from = fromValue ? new Date(fromValue) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const to = toValue ? new Date(toValue) : new Date(Date.now() + 90 * 24 * 60 * 60 * 1000);
if (Number.isNaN(from.getTime()) || Number.isNaN(to.getTime())) {
throw new Error("from/to range is invalid");
}
const contact = await prisma.contact.findFirst({
where: { id: target.id, teamId: input.teamId },
include: {
note: { select: { content: true, updatedAt: true } },
messages: {
select: { id: true, occurredAt: true, channel: true, direction: true, kind: true, content: true, durationSec: true, transcriptJson: true },
orderBy: { occurredAt: "desc" },
take: messagesLimit,
},
events: {
select: { id: true, title: true, startsAt: true, endsAt: true, note: true, isArchived: true },
where: {
startsAt: { gte: from, lte: to },
...(raw.includeArchived ? {} : { isArchived: false }),
},
orderBy: { startsAt: "asc" },
take: eventsLimit,
},
deals: {
select: {
id: true,
title: true,
stage: true,
amount: true,
nextStep: true,
summary: true,
currentStepId: true,
updatedAt: true,
steps: {
select: { id: true, title: true, status: true, dueAt: true, order: true, completedAt: true },
orderBy: [{ order: "asc" }, { createdAt: "asc" }],
},
},
orderBy: { updatedAt: "desc" },
take: dealsLimit,
},
_count: {
select: { messages: true, events: true, deals: true },
},
},
});
if (!contact) throw new Error("contact not found");
return cacheReadResult(
JSON.stringify(
{
contact: {
id: contact.id,
name: contact.name,
company: contact.company,
country: contact.country,
location: contact.location,
email: contact.email,
phone: contact.phone,
updatedAt: contact.updatedAt.toISOString(),
},
summary: contact.note?.content ?? null,
note: contact.note
? {
content: contact.note.content,
updatedAt: contact.note.updatedAt.toISOString(),
}
: null,
messages: contact.messages.map((m) => ({
id: m.id,
occurredAt: m.occurredAt.toISOString(),
channel: m.channel,
direction: m.direction,
kind: m.kind,
content: m.content,
durationSec: m.durationSec,
transcript: m.transcriptJson,
})),
events: contact.events.map((e) => ({
id: e.id,
title: e.title,
startsAt: e.startsAt.toISOString(),
endsAt: (e.endsAt ?? e.startsAt).toISOString(),
note: e.note,
isArchived: e.isArchived,
})),
deals: contact.deals.map((d) => ({
id: d.id,
title: d.title,
stage: d.stage,
amount: d.amount,
nextStep: d.nextStep,
summary: d.summary,
currentStepId: d.currentStepId,
updatedAt: d.updatedAt.toISOString(),
steps: d.steps.map((s) => ({
id: s.id,
title: s.title,
status: s.status,
dueAt: s.dueAt ? s.dueAt.toISOString() : null,
order: s.order,
completedAt: s.completedAt ? s.completedAt.toISOString() : null,
})),
})),
counts: contact._count,
},
null,
2,
),
);
}
if (raw.action === "getUserCalendarWindow") {
const from = fromValue ? new Date(fromValue) : new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const to = toValue ? new Date(toValue) : new Date(Date.now() + 30 * 24 * 60 * 60 * 1000);
if (Number.isNaN(from.getTime()) || Number.isNaN(to.getTime())) {
throw new Error("from/to range is invalid");
}
const limit = Math.max(1, Math.min(raw.limit ?? 100, 500));
const offset = Math.max(0, raw.offset ?? 0);
const where = {
teamId: input.teamId,
startsAt: { gte: from, lte: to },
...(raw.includeArchived ? {} : { isArchived: false }),
};
const [total, items] = await Promise.all([
prisma.calendarEvent.count({ where }),
prisma.calendarEvent.findMany({
where,
orderBy: { startsAt: "asc" },
skip: offset,
take: limit,
include: { contact: { select: { id: true, name: true, company: true } } },
}),
]);
return cacheReadResult(
JSON.stringify(
{
window: { from: from.toISOString(), to: to.toISOString() },
pagination: {
offset,
limit,
returned: items.length,
total,
hasMore: offset + items.length < total,
nextOffset: offset + items.length,
},
items: items.map((e) => ({
id: e.id,
title: e.title,
startsAt: e.startsAt.toISOString(),
endsAt: (e.endsAt ?? e.startsAt).toISOString(),
note: e.note,
isArchived: e.isArchived,
contact: e.contact
? {
id: e.contact.id,
name: e.contact.name,
company: e.contact.company,
}
: null,
})),
},
null,
2,
),
);
}
if (raw.action === "updateContactSummary") {
const contactName = (raw.contact ?? "").trim();
const content = (raw.summary ?? raw.content ?? "").trim();
if (!contactName) throw new Error("contact is required");
if (!content) throw new Error("summary is required");
const contact = await resolveContact(input.teamId, contactName);
if (!contact) throw new Error("contact not found");
await prisma.contactNote.upsert({
where: { contactId: contact.id },
update: { content },
create: { contactId: contact.id, content },
});
invalidateReadCache();
dbWrites.push({ kind: "updateContactSummary", detail: `${contact.name}: summary updated` });
return JSON.stringify({ ok: true, applied: 1 }, null, 2);
}
if (raw.action === "createUserCalendarEvent") {
const title = (raw.title ?? "").trim();
const start = raw.start ? new Date(raw.start) : null;
if (!title) throw new Error("title is required");
if (!start || Number.isNaN(start.getTime())) throw new Error("start is invalid");
const end = raw.end ? new Date(raw.end) : null;
const contactName = (raw.contact ?? "").trim();
const contactId = (raw.contactId ?? "").trim();
const contact =
(contactId
? await prisma.contact.findFirst({
where: { id: contactId, teamId: input.teamId },
select: { id: true, name: true },
})
: null) ||
(contactName ? await resolveContact(input.teamId, contactName) : null);
if (contactId && !contact) throw new Error("contact not found");
const created = await prisma.calendarEvent.create({
data: {
teamId: input.teamId,
title,
startsAt: start,
endsAt: end && !Number.isNaN(end.getTime()) ? end : null,
note: (raw.note ?? "").trim() || null,
isArchived: Boolean(raw.archived),
contactId: contact?.id ?? null,
},
});
invalidateReadCache();
dbWrites.push({ kind: "createUserCalendarEvent", detail: `created event ${created.id}` });
return JSON.stringify({
ok: true,
applied: 1,
eventId: created.id,
contactId: contact?.id ?? null,
}, null, 2);
}
return JSON.stringify({ ok: false, error: "unknown action" });
};
try {
const result = await executeAction();
const run = {
name: toolName,
status: "ok",
input: compact(raw),
output: compact(result),
at: startedAt,
} as const;
toolRuns.push(run);
await emitTrace({
text: `Tool finished: ${toolName}`,
toolRun: run,
});
return result;
} catch (error: any) {
const run = {
name: toolName,
status: "error",
input: compact(raw),
output: compact(error?.message || String(error)),
at: startedAt,
} as const;
toolRuns.push(run);
await emitTrace({
text: `Tool failed: ${toolName}`,
toolRun: run,
});
throw error;
}
},
{
name: "crm",
description:
"CRM tool with exactly five actions: getContactsList, getContactSnapshot, getUserCalendarWindow, updateContactSummary, createUserCalendarEvent.",
schema: CrmToolSchema,
},
);
const snapshot = await buildCrmSnapshot({ teamId: input.teamId });
const snapshotJson = JSON.stringify(snapshot, null, 2);
const model = new ChatOpenAI({
apiKey: llmApiKey,
model: llmModel,
temperature: 0.2,
...(llmReasoningEnabled
? {
modelKwargs: {
reasoning: { enabled: true },
},
}
: {}),
...(llmBaseURL || llmHeaders
? {
configuration: {
...(llmBaseURL ? { baseURL: llmBaseURL } : {}),
...(llmHeaders ? { defaultHeaders: llmHeaders } : {}),
},
}
: {}),
});
const agent = createReactAgent({
llm: model,
tools: [crmTool],
});
const tracingFlag = (process.env.LANGSMITH_TRACING ?? process.env.LANGCHAIN_TRACING_V2 ?? "").trim().toLowerCase();
const tracingEnabled = tracingFlag === "1" || tracingFlag === "true" || tracingFlag === "yes";
const langfuse = getLangfuseClient();
const lfTrace = langfuse?.trace({
id: input.requestId ?? makeId("trace"),
name: "clientsflow.crm_agent_request",
userId: input.userId,
sessionId: input.conversationId ?? undefined,
input: input.userText,
metadata: {
teamId: input.teamId,
userId: input.userId,
requestId: input.requestId ?? null,
conversationId: input.conversationId ?? null,
},
tags: ["clientsflow", "crm-agent", "langgraph"],
});
let finalText = "";
const cycleNotes: string[] = [];
const system = [
"You are Pilot, a CRM assistant.",
"Rules:",
"- Be concrete and complete. Do not cut important details in the final answer.",
"- Work in short iterative cycles. Do not stop after the first thought if the task needs more than one action.",
"- You are given a structured CRM JSON snapshot as baseline context.",
"- Only use these actions: crm.getContactsList, crm.getContactSnapshot, crm.getUserCalendarWindow, crm.updateContactSummary, crm.createUserCalendarEvent.",
"- Use crm.getContactsList first to choose contacts, then crm.getContactSnapshot for deep context, then crm.getUserCalendarWindow for schedule validation.",
"- Avoid repeating identical read calls with the same arguments.",
"- Write actions are immediate DB changes. Do not mention staging or commit.",
"- Do not claim you sent an external message; you can only create CRM records.",
"",
"CRM Snapshot JSON:",
snapshotJson,
].join("\n");
const extractText = (value: unknown, depth = 0): string => {
if (depth > 5 || value == null) return "";
if (typeof value === "string") return value.trim();
if (Array.isArray(value)) {
const parts = value
.map((item) => extractText(item, depth + 1))
.filter(Boolean);
return parts.join("\n").trim();
}
if (typeof value !== "object") return "";
const obj = value as Record<string, unknown>;
for (const key of ["text", "content", "answer", "output_text", "final_text"]) {
const text = extractText(obj[key], depth + 1);
if (text) return text;
}
if (Array.isArray(obj.parts)) {
const text = extractText(obj.parts, depth + 1);
if (text) return text;
}
return "";
};
const messageType = (msg: any): string => {
if (typeof msg?._getType === "function") {
try {
return String(msg._getType() ?? "");
} catch {
// ignore
}
}
return String(msg?.type ?? msg?.role ?? msg?.constructor?.name ?? "");
};
const extractResult = (res: any) => {
const extractedText = (() => {
const messages = Array.isArray(res?.messages) ? res.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const type = messageType(msg).toLowerCase();
if (!type.includes("ai") && !type.includes("assistant")) continue;
const text = extractText(msg?.content) || extractText(msg);
if (text) return text;
}
return extractText(res?.output) || extractText(res?.response) || extractText(res?.finalResponse) || "";
})();
return {
text: extractedText.trim(),
};
};
for (let cycle = 1; ; cycle += 1) {
const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes);
const cycleSpan = lfTrace?.span({
name: "agent.cycle",
input: userPrompt,
metadata: {
cycle,
requestId: input.requestId ?? null,
},
});
await emitTrace({ text: "Анализирую задачу и текущий контекст CRM." });
const beforeRuns = toolRuns.length;
const beforeWrites = dbWrites.length;
let res: any;
try {
const invokeConfig: Record<string, any> = {};
const recursionLimit = Number(process.env.CF_AGENT_RECURSION_LIMIT ?? "1000000");
if (Number.isFinite(recursionLimit) && recursionLimit > 0) {
invokeConfig.recursionLimit = recursionLimit;
}
if (tracingEnabled) {
invokeConfig.runName = "clientsflow.crm_agent_cycle";
invokeConfig.tags = ["clientsflow", "crm-agent", "langgraph"];
invokeConfig.metadata = {
teamId: input.teamId,
userId: input.userId,
requestId: input.requestId ?? null,
conversationId: input.conversationId ?? null,
cycle,
};
}
res = await agent.invoke(
{
messages: [
{ role: "system", content: system },
{ role: "user", content: userPrompt },
],
},
invokeConfig,
);
} catch (e: any) {
await emitTrace({ text: "Один из шагов завершился ошибкой." });
cycleSpan?.end({
output: "error",
level: "ERROR",
statusMessage: String(e?.message ?? e ?? "unknown_error"),
});
throw e;
}
const parsed = extractResult(res);
if (parsed.text) {
finalText = parsed.text;
}
const progressed = toolRuns.length > beforeRuns || dbWrites.length > beforeWrites;
cycleSpan?.end({
output: parsed.text || "",
metadata: {
progressed,
toolRunsDelta: toolRuns.length - beforeRuns,
dbWritesDelta: dbWrites.length - beforeWrites,
},
});
if (progressed) {
cycleNotes.push(`Cycle ${cycle}: updated tools/data state.`);
}
await emitTrace({
text: progressed
? "Продвигаюсь по задаче и обновляю рабочий набор изменений."
: "Промежуточный шаг не дал прогресса, проверяю следующий вариант.",
});
const done = !progressed && cycle > 1;
if (done) {
await emitTrace({ text: "Формирую итоговый ответ." });
break;
}
}
lfTrace?.update({
output: finalText || null,
metadata: {
toolsUsedCount: toolsUsed.length,
toolRunsCount: toolRuns.length,
dbWritesCount: dbWrites.length,
},
});
void langfuse?.flushAsync().catch(() => {});
if (!finalText) {
throw new Error("Model returned empty response");
}
const plan: string[] = [];
return {
text: finalText,
plan,
thinking: [],
tools: toolsUsed,
toolRuns,
dbWrites: dbWrites.length ? dbWrites : undefined,
};
}